changelog shortlog tags branches changeset files revisions annotate raw help

Mercurial > hg > ventivac / appl/lib/venti.b

changeset 118: bd3834e3d91a
parent: 994f59b44578
child: 7f377ffc9ad3
author: Mechiel Lukkien <mechiel@ueber.net>
date: Fri, 17 Aug 2007 22:16:46 +0200
permissions: -rw-r--r--
description: remove unpackroot and unpackentry, they are in vac.m and appl/lib/vac.b (as Entry.unpack and Root.unpack).
1 implement Venti;
2 
3 include "sys.m";
4  sys: Sys;
5 include "venti.m";
6 
7 BIT8SZ: con 1;
8 BIT16SZ: con 2;
9 BIT32SZ: con 4;
10 BIT48SZ: con 6;
11 SCORE: con 20;
12 STR: con BIT16SZ;
13 H: con BIT16SZ+BIT8SZ+BIT8SZ; # minimum header length: size[2] op[1] tid[1]
14 Rootnamelen: con 128;
15 
16 versions := array[] of {"02"};
17 
18 blankroot: Root;
19 blankentry: Entry;
20 
21 init()
22 {
23  sys = load Sys Sys->PATH;
24 }
25 
26 hdrlen := array[Tmax] of {
27 Rerror => H+STR, # size[2] Rerror tid[1] error[s]
28 Tping => H, # size[2] Tping tid[1]
29 Rping => H, # size[2] Rping tid[1]
30 Thello => H+STR+STR+BIT8SZ+BIT8SZ+BIT8SZ, # size[2] Thello tid[1] version[s] uid[s] crypto[1] cryptos[n] codecs[n]
31 Rhello => H+STR+BIT8SZ+BIT8SZ, # size[2] Rhello tid[1] sid[s] crypto[1] codec[1]
32 Tgoodbye => H, # size[2] Tgoodbye tid[1]
33 Tread => H+SCORE+BIT8SZ+BIT8SZ+BIT16SZ, # size[2] Tread tid[1] score[20] type[1] pad[1] n[2]
34 Rread => H, # size[2] Rread tid[1] data
35 Twrite => H+BIT8SZ+3, # size[2] Twrite tid[1] type[1] pad[3]
36 Rwrite => H+SCORE, # size[2] Rwrite tid[1] score[20
37 Tsync => H, # size[2] Tsync tid[1]
38 Rsync => H, # size[2] Rsync tid[1]
39 };
40 
41 tag2type := array[] of {
42 tagof Vmsg.Rerror => Rerror,
43 tagof Vmsg.Tping => Tping,
44 tagof Vmsg.Rping => Rping,
45 tagof Vmsg.Thello => Thello,
46 tagof Vmsg.Rhello => Rhello,
47 tagof Vmsg.Tgoodbye => Tgoodbye,
48 tagof Vmsg.Tread => Tread,
49 tagof Vmsg.Rread => Rread,
50 tagof Vmsg.Twrite => Twrite,
51 tagof Vmsg.Rwrite => Rwrite,
52 tagof Vmsg.Tsync => Tsync,
53 tagof Vmsg.Rsync => Rsync,
54 };
55 
56 msgname := array[] of {
57 tagof Vmsg.Rerror => "Rerror",
58 tagof Vmsg.Tping => "Tping",
59 tagof Vmsg.Rping => "Rping",
60 tagof Vmsg.Thello => "Thello",
61 tagof Vmsg.Rhello => "Rhello",
62 tagof Vmsg.Tgoodbye => "Tgoodbye",
63 tagof Vmsg.Tread => "Tread",
64 tagof Vmsg.Rread => "Rread",
65 tagof Vmsg.Twrite => "Twrite",
66 tagof Vmsg.Rwrite => "Rwrite",
67 tagof Vmsg.Tsync => "Tsync",
68 tagof Vmsg.Rsync => "Rsync",
69 };
70 
71 zero := array[] of {
72  byte 16rda, byte 16r39, byte 16ra3, byte 16ree, byte 16r5e,
73  byte 16r6b, byte 16r4b, byte 16r0d, byte 16r32, byte 16r55,
74  byte 16rbf, byte 16ref, byte 16r95, byte 16r60, byte 16r18,
75  byte 16r90, byte 16raf, byte 16rd8, byte 16r07, byte 16r09
76 };
77 
78 
79 Vmsg.read(fd: ref Sys->FD): (ref Vmsg, string)
80 {
81  (msg, err) := readmsg(fd);
82  if(err != nil)
83  return (nil, err);
84  if(msg == nil)
85  return (nil, "eof reading message");
86  (nil, m) := Vmsg.unpack(msg);
87  if(m == nil)
88  return (nil, sys->sprint("bad venti message format: %r"));
89  return (m, nil);
90 }
91 
92 Vmsg.unpack(f: array of byte): (int, ref Vmsg)
93 {
94  if(len f < H) {
95  sys->werrstr("message too small");
96  return (0, nil);
97  }
98  size := (int f[0] << 8) | int f[1]; # size does not include self
99  size += BIT16SZ;
100  if(len f != size){
101  if(len f < size){
102  sys->werrstr("need more data");
103  return (0, nil); # need more data
104  }
105  f = f[0:size]; # trim to exact length
106  }
107  mtype := int f[2];
108  if(mtype >= len hdrlen || size < hdrlen[mtype]){
109  sys->werrstr("mtype out of range");
110  return (-1, nil);
111  }
112  tid := int f[3];
113  m: ref Vmsg;
114  case mtype {
115  Thello =>
116  uid: string;
117  cryptos, codecs: array of byte;
118 
119  (version, o) := gstring(f, H);
120  (uid, o) = gstring(f, o);
121  if(o < 0 || o >= len f)
122  break;
123  cryptostrength := int f[o++];
124  (cryptos, o) = gbytes(f, o);
125  (codecs, o) = gbytes(f, o);
126  if(o != len f)
127  break;
128  m = ref Vmsg.Thello(1, tid, version, uid, cryptostrength, cryptos, codecs);
129  Tping =>
130  m = ref Vmsg.Tping(1, tid);
131  Tgoodbye =>
132  m = ref Vmsg.Tgoodbye(1, tid);
133  Tread =>
134  score := Score(f[H:H+SCORE]);
135  etype := int f[H+SCORE];
136  n := (int f[H+SCORE+2] << 8) | int f[H+SCORE+3];
137  m = ref Vmsg.Tread(1, tid, score, etype, n);
138  Twrite =>
139  etype := int f[H];
140  m = ref Vmsg.Twrite(1, tid, etype, f[H+4:]);
141  Tsync =>
142  m = ref Vmsg.Tsync(1, tid);
143  Rhello =>
144  (sid, o) := gstring(f, H);
145  if(o+2 != len f)
146  break;
147  crypto := int f[o++];
148  codec := int f[o++];
149  m = ref Vmsg.Rhello(0, tid, sid, crypto, codec);
150  Rping =>
151  m = ref Vmsg.Rping(0, tid);
152  Rread =>
153  m = ref Vmsg.Rread(0, tid, f[H:]);
154  Rwrite =>
155  m = ref Vmsg.Rwrite(0, tid, Score(f[H:H+SCORE]));
156  Rsync =>
157  m = ref Vmsg.Rsync(0, tid);
158  Rerror =>
159  (err, o) := gstring(f, H);
160  if(o < 0)
161  break;
162  m = ref Vmsg.Rerror(0, tid, err);
163  * =>
164  sys->werrstr("unrecognised mtype " + string mtype);
165  return (-1, nil);
166  }
167  if(m == nil) {
168  sys->werrstr("bad message size");
169  return (-1, nil);
170  }
171  return (size, m);
172 }
173 
174 Vmsg.pack(gm: self ref Vmsg): array of byte
175 {
176  if(gm == nil)
177  return nil;
178  ds := gm.packedsize();
179  if(ds <= 0)
180  return nil;
181  d := array[ds] of byte;
182  d[0] = byte ((ds - 2) >> 8);
183  d[1] = byte (ds - 2);
184  d[2] = byte tag2type[tagof gm];
185  d[3] = byte gm.tid;
186  pick m := gm {
187  Thello =>
188  o := pstring(d, H, m.version);
189  o = pstring(d, o, m.uid);
190  d[o++] = byte m.cryptostrength;
191  d[o++] = byte len m.cryptos;
192  d[o:] = m.cryptos;
193  o += len m.cryptos;
194  d[o++] = byte len m.codecs;
195  d[o:] = m.codecs;
196  o += len m.codecs;
197  Tping =>
198  ;
199  Tgoodbye =>
200  ;
201  Tread =>
202  d[H:] = m.score.a;
203  d[H+SCORE] = byte m.etype;
204  d[H+SCORE+2] = byte (m.n >> 8);
205  d[H+SCORE+3] = byte m.n;
206  Twrite =>
207  d[H] = byte m.etype;
208  d[H+4:] = m.data;
209  Tsync =>
210  ;
211  Rhello =>
212  o := pstring(d, H, m.sid);
213  d[o++] = byte m.crypto;
214  d[o++] = byte m.codec;
215  Rping =>
216  ;
217  Rread =>
218  d[H:] = m.data;
219  Rwrite =>
220  d[H:] = m.score.a;
221  Rsync =>
222  ;
223  Rerror =>
224  pstring(d, H, m.e);
225  * =>
226  return nil;
227  }
228  return d;
229 }
230 
231 Vmsg.packedsize(gm: self ref Vmsg): int
232 {
233  mtype := tag2type[tagof gm];
234  if(mtype <= 0)
235  return 0;
236  ml := hdrlen[mtype];
237  pick m := gm {
238  Thello =>
239  ml += utflen(m.version) + utflen(m.uid) + len m.cryptos + len m.codecs;
240  Rhello =>
241  ml += utflen(m.sid);
242  Rread =>
243  ml += len m.data;
244  Twrite =>
245  ml += len m.data;
246  Rerror =>
247  ml += utflen(m.e);
248  }
249  return ml;
250 }
251 
252 Vmsg.text(gm: self ref Vmsg): string
253 {
254  if(gm == nil)
255  return "(nil)";
256  s := sys->sprint("%s(%d", msgname[tagof gm], gm.tid);
257  pick m := gm {
258  * =>
259  s += ",ILLEGAL";
260  Thello =>
261  s += sys->sprint(", %#q, %#q, %d, [", m.version, m.uid, m.cryptostrength);
262  if(len m.cryptos > 0){
263  s += string int m.cryptos[0];
264  for(i := 1; i < len m.cryptos; i++)
265  s += "," + string int m.cryptos[i];
266  }
267  s += "], [";
268  if(len m.codecs > 0){
269  s += string int m.codecs[0];
270  for(i := 1; i < len m.codecs; i++)
271  s += "," + string int m.codecs[i];
272  }
273  s += "]";
274  Tping =>
275  ;
276  Tgoodbye =>
277  ;
278  Tread =>
279  s += sys->sprint(", %s, %d, %d", m.score.text(), m.etype, m.n);
280  Twrite =>
281  s += sys->sprint(", %d, data[%d]", m.etype, len m.data);
282  Tsync =>
283  ;
284  Rhello =>
285  s += sys->sprint(", %#q, %d, %d", m.sid, m.crypto, m.codec);
286  Rping =>
287  Rread =>
288  s += sys->sprint(", data[%d]", len m.data);
289  Rwrite =>
290  s += ", " + m.score.text();
291  Rsync =>
292  ;
293  Rerror =>
294  s += sys->sprint(", %#q", m.e);
295  }
296  return s + ")";
297 }
298 
299 Session.new(fd: ref Sys->FD): ref Session
300 {
301  s := "venti-";
302  for(i := 0; i < len versions; i++){
303  if(i != 0)
304  s[len s] = ':';
305  s += versions[i];
306  }
307  s += "-libventi\n";
308  d := array of byte s;
309  if(sys->write(fd, d, len d) != len d)
310  return nil;
311  version := readversion(fd, "venti-", versions);
312  if(version == nil)
313  return nil;
314  session := ref Session(fd, version);
315  (r, e) := session.rpc(ref Vmsg.Thello(1, 0, version, nil, 0, nil, nil));
316  if(r == nil){
317  sys->werrstr("hello failed: " + e);
318  return nil;
319  }
320  return ref Session(fd, version);
321 }
322 
323 Session.read(s: self ref Session, score: Score, etype: int, maxn: int): array of byte
324 {
325  (gm, err) := s.rpc(ref Vmsg.Tread(1, 0, score, etype, maxn));
326  if(gm == nil){
327  sys->werrstr(err);
328  return nil;
329  }
330  pick m := gm {
331  Rread =>
332  return m.data;
333  }
334  return nil;
335 }
336 
337 Session.write(s: self ref Session, etype: int, data: array of byte): (int, Score)
338 {
339  (gm, err) := s.rpc(ref Vmsg.Twrite(1, 0, etype, data));
340  if(gm == nil){
341  sys->werrstr(err);
342  return (-1, Score(nil));
343  }
344  pick m := gm {
345  Rwrite =>
346  return (0, m.score);
347  }
348  return (-1, Score(nil));
349 }
350 
351 Session.sync(s: self ref Session): int
352 {
353  (gm, err) := s.rpc(ref Vmsg.Tsync(1, 0));
354  if(gm == nil){
355  sys->werrstr(err);
356  return -1;
357  }
358  return 0;
359 }
360 
361 Session.rpc(s: self ref Session, m: ref Vmsg): (ref Vmsg, string)
362 {
363  d := m.pack();
364  if(sys->write(s.fd, d, len d) != len d)
365  return (nil, "write failed");
366  (grm, err) := Vmsg.read(s.fd);
367  if(grm == nil)
368  return (nil, err);
369  if(grm.tid != m.tid)
370  return (nil, "message tags don't match");
371  if(grm.istmsg)
372  return (nil, "reply message is a t-message");
373  pick rm := grm {
374  Rerror =>
375  return (nil, rm.e);
376  }
377  if(tagof(grm) != tagof(m) + 1)
378  return (nil, "reply message is of wrong type");
379  return (grm, nil);
380 }
381 
382 readversion(fd: ref Sys->FD, prefix: string, versions: array of string): string
383 {
384  buf := array[Maxstringsize] of byte;
385  i := 0;
386  for(;;){
387  if(i >= len buf){
388  sys->werrstr("initial version string too long");
389  return nil;
390  }
391  if(readn(fd, buf[i:], 1) != 1){
392  sys->werrstr("eof on version string");
393  return nil;
394  }
395  c := int buf[i];
396  if(c == '\n')
397  break;
398  if(c < ' ' || c > 16r7f || i < len prefix && prefix[i] != c){
399  sys->werrstr("bad version string");
400  return nil;
401  }
402  i++;
403  }
404  if(i < len prefix){
405  sys->werrstr("bad version string");
406  return nil;
407  }
408 #sys->fprint(sys->fildes(2), "read version %#q\n", string buf[0:i]);
409  v := string buf[len prefix:i];
410  i = 0;
411  for(;;){
412  for(j := i; j < len v && v[j] != ':' && v[j] != '-'; j++)
413  ;
414  vv := v[i:j];
415 #sys->fprint(sys->fildes(2), "checking %#q\n", vv);
416  for(k := 0; k < len versions; k++)
417  if(versions[k] == vv)
418  return vv;
419  i = j;
420  if(i >= len v || v[i] != ':'){
421  sys->werrstr("unknown version");
422  return nil;
423  }
424  i++;
425  }
426  sys->werrstr("unknown version");
427  return nil;
428 }
429 
430 
431 Score.eq(a: self Score, b: Score): int
432 {
433  for(i := 0; i < SCORE; i++)
434  if(a.a[i] != b.a[i])
435  return 0;
436  return 1;
437 }
438 
439 Score.zero(): Score
440 {
441  return Score(zero);
442 }
443 
444 Score.parse(s: string): (int, Score)
445 {
446  if(len s != Scoresize * 2)
447  return (-1, Score(nil));
448  score := array[Scoresize] of {* => byte 0};
449  for(i := 0; i < len s; i++){
450  c := s[i];
451  case s[i] {
452  '0' to '9' =>
453  c -= '0';
454  'a' to 'f' =>
455  c -= 'a' - 10;
456  'A' to 'F' =>
457  c -= 'A' - 10;
458  * =>
459  return (-1, Score(nil));
460  }
461  if((i & 1) == 0)
462  c <<= 4;
463  score[i>>1] |= byte c;
464  }
465  return (0, Score(score));
466 }
467 
468 Score.text(a: self Score): string
469 {
470  s := "";
471  for(i := 0; i < SCORE; i++)
472  s += sys->sprint("%.2ux", int a.a[i]);
473  return s;
474 }
475 
476 readn(fd: ref Sys->FD, buf: array of byte, nb: int): int
477 {
478  for(nr := 0; nr < nb;){
479  n := sys->read(fd, buf[nr:], nb-nr);
480  if(n <= 0){
481  if(nr == 0)
482  return n;
483  break;
484  }
485  nr += n;
486  }
487  return nr;
488 }
489 
490 readmsg(fd: ref Sys->FD): (array of byte, string)
491 {
492  sbuf := array[BIT16SZ] of byte;
493  if((n := readn(fd, sbuf, BIT16SZ)) != BIT16SZ){
494  if(n == 0)
495  return (nil, nil);
496  return (nil, sys->sprint("%r"));
497  }
498  ml := (int sbuf[0] << 8) | int sbuf[1];
499  if(ml < BIT16SZ)
500  return (nil, "invalid venti message size");
501  buf := array[ml + BIT16SZ] of byte;
502  buf[0:] = sbuf;
503  if((n = readn(fd, buf[BIT16SZ:], ml)) != ml){
504  if(n == 0)
505  return (nil, "venti message truncated");
506  return (nil, sys->sprint("%r"));
507  }
508  return (buf, nil);
509 }
510 
511 pstring(a: array of byte, o: int, s: string): int
512 {
513  sa := array of byte s; # could do conversion ourselves
514  n := len sa;
515  a[o] = byte (n >> 8);
516  a[o+1] = byte n;
517  a[o+2:] = sa;
518  return o+STR+n;
519 }
520 
521 gstring(a: array of byte, o: int): (string, int)
522 {
523  if(o < 0 || o+STR > len a)
524  return (nil, -1);
525  l := (int a[o] << 8) | int a[o+1];
526  if(l > Maxstringsize)
527  return (nil, -1);
528  o += STR;
529  e := o+l;
530  if(e > len a)
531  return (nil, -1);
532  return (string a[o:e], e);
533 }
534 
535 gbytes(a: array of byte, o: int): (array of byte, int)
536 {
537  if(o < 0 || o+1 > len a)
538  return (nil, -1);
539  n := int a[o];
540  if(1+n > len a)
541  return (nil, -1);
542  no := o+1+n;
543  return (a[o+1:no], no);
544 }
545 
546 utflen(s: string): int
547 {
548  # the domain is 16-bit unicode only, which is all that Inferno now implements
549  n := l := len s;
550  for(i:=0; i<l; i++)
551  if((c := s[i]) > 16r7F){
552  n++;
553  if(c > 16r7FF)
554  n++;
555  }
556  return n;
557 }