changelog shortlog tags branches changeset files revisions annotate raw help

Mercurial > hg > ventivac / appl/cmd/ventisrv.b

changeset 111: 8fd6132b12be
parent: 301697094735
child: 636d77883598
author: Mechiel Lukkien <mechiel@ueber.net>
date: Fri, 17 Aug 2007 17:27:01 +0200
permissions: -rw-r--r--
description: some more about ventisrv design. remove unused ticker function. mention that there is no auth or raw disk support in manual page.
1 # ventisrv stores a part of each stored score in memory. at the
2 # command-line, the maximum data file size and mean block size (thus
3 # expected number of scores) is given. the scores are assumed to be
4 # distributed evenly, so the number of bits of each score to keep in memory
5 # in order to make the probability of a collision <0.001 is calculated.
6 # now all stored scores are read from the index file and partially stored
7 # in main memory.
8 
9 # a venti block read can result in 0 hits (data not present) or 1 or
10 # more hits. for each of the hits, the full score has to be read from
11 # disk until a match is found (in which case the data will be returned and
12 # further reading stopped). it is possible none of the in-memory "hits"
13 # are a hit in the data file.
14 
15 # a venti block write will perform the same steps as the read. if the
16 # data is not present, the header+data is written to the data file,
17 # another header is written to the index file.
18 
19 # at startup, the index file and data file are checked. if headers
20 # are missing from the index file, they are synced with headers from the
21 # data file. missing headers in the data file are errors and ventisrv
22 # will stop with a dianostic message.
23 
24 # possible improvements:
25 # - speedup syncing index file from data file at startup?
26 # - queueing index writes may help when filesystem does synchronous writes
27 # - make startup faster by reading partial scores into memory more
28 # efficiently: insert non-sorted at startup, when all has been read,
29 # sort the lists. faster than inserting each block sorted.
30 
31 implement Ventisrv;
32 
33 include "sys.m";
34 include "draw.m";
35 include "arg.m";
36 include "daytime.m";
37 include "string.m";
38 include "keyring.m";
39 include "filter.m";
40 include "lock.m";
41 include "venti.m";
42 
43 sys: Sys;
44 daytime: Daytime;
45 str: String;
46 keyring: Keyring;
47 venti: Venti;
48 deflate, inflate: Filter;
49 lock: Lock;
50 
51 sprint, fprint, print, fildes: import sys;
52 Score, Scoresize, Vmsg: import venti;
53 Semaphore: import lock;
54 
55 
56 Ventisrv: module {
57  init: fn(nil: ref Draw->Context, args: list of string);
58 };
59 
60 Ihdr: adt {
61  halfscore: array of byte;
62  dtype: int;
63  offset: big;
64  compressed: int;
65 
66  unpack: fn(d: array of byte): ref Ihdr;
67  pack: fn(ih: self ref Ihdr, d: array of byte);
68 };
69 
70 Dhdr: adt {
71  score: Score;
72  dtype: int;
73  size: int;
74  conntime: big;
75 
76  unpack: fn(d: array of byte): (ref Dhdr, string);
77  pack: fn(dh: self ref Dhdr, d: array of byte);
78 };
79 
80 Chain: adt {
81  d: array of byte;
82  used: int;
83  next: cyclic ref Chain;
84 
85  mk: fn(): ref Chain;
86  getaddr: fn(c: self ref Chain, i: int): (int, big);
87  lookup: fn(c: self ref Chain, d: array of byte, l: list of (int, big)): list of (int, big);
88  insert: fn(c: self ref Chain, ih: ref Ihdr);
89 };
90 
91 Client: adt {
92  rpid, wpid: int;
93  respc: chan of ref Vmsg;
94  inuse: int;
95  conntime: big;
96  readonly: int;
97 };
98 
99 Lookup: adt {
100  c: ref Client;
101  tid: int;
102  addrs: list of (int, big);
103  pick {
104  Read =>
105  score: Score;
106  dtype: int;
107  size: int;
108  Write =>
109  b: ref Block;
110  }
111 };
112 
113 Store: adt {
114  c: ref Client;
115  pick {
116  Sync =>
117  tid: int;
118  Write =>
119  b: ref Block;
120  }
121 };
122 
123 Block: adt
124 {
125  score: Score;
126  dtype: int;
127  d: array of byte;
128 };
129 
130 Fhdr: adt {
131  blocks: array of ref Dhdr;
132  hsize: int;
133  dsize: int;
134 
135  unpack: fn(d: array of byte): (ref Fhdr, array of int, string);
136  pack: fn(fh: self ref Fhdr, d: array of byte);
137 };
138 
139 Queue: adt
140 {
141  a: array of ref Block;
142  n: int;
143 
144  lookup: fn(q: self ref Queue, s: Score, dtype: int): array of byte;
145  insert: fn(q: self ref Queue, b: ref Block);
146  remove: fn(q: self ref Queue, b: ref Block);
147 };
148 
149 Fifo: adt
150 {
151  a: array of ref Block;
152  n, f: int;
153  sem: ref Semaphore;
154 
155  lookup: fn(l: self ref Fifo, s: Score, dtype: int): array of byte;
156  insert: fn(l: self ref Fifo, b: ref Block);
157 };
158 
159 Flate: adt
160 {
161  rqc: chan of ref Filter->Rq;
162  lastfill: ref Filter->Rq.Fill;
163  d: array of byte;
164  nd: int;
165  pid: int;
166 
167  new: fn(n: int): ref Flate;
168  write: fn(f: self ref Flate, d: array of byte): string;
169  finish: fn(f: self ref Flate): (array of byte, string);
170 };
171 
172 Eperm: con "permission denied";
173 
174 Indexscoresize: con 8;
175 Icomprmask: con big 1<<(6*8-1);
176 Ihdrsize: con Indexscoresize+1+6; # scoresize+typesize+addrsize
177 Iverify: con 128; # check the half scores of the last n entries in the index file against the data file
178 Ichunksize: con ((128*1024)/Ihdrsize)*Ihdrsize; # chunk in bytes of index entries to read at a time
179 Nqueuechunks: con 32; # queue n Ichunksize blocks when reading index at startup
180 Dhdrmagic: con big 16r2f9d81e5;
181 Fhdrmagic: con big 16r78c66a15;
182 Dhdrsize: con 4+Scoresize+1+2+4; # magic+score+type+size+conntime
183 Fhdrsize: con 4+1+2; # magic+count+size
184 Fbhdrsize: con 20+1+2+4; # score+type+size+conntime
185 Maxreaders: con 32;
186 Maxblocksize: con Dhdrsize+Venti->Maxlumpsize;
187 
188 typebits: con 8; # must be 8
189 
190 chainblocks, headbits, nheads: int;
191 scorebits, addrbits: int;
192 comprmask: big;
193 membytes, headbytes, scorebytes: int;
194 scorebytemask: byte;
195 maxdatasize: big;
196 
197 Arraysize: con 4+4+4;
198 Refsize: con 4;
199 
200 listenaddr: con "net!*!venti";
201 indexfile := "index";
202 datafile := "data";
203 statsdir := "/chan/";
204 statsfile := "ventisrvstats";
205 debug, Cflag, cflag, qflag, verbose: int;
206 raddrs, waddrs: list of string;
207 
208 indexfd, datafd: ref Sys->FD;
209 indexsize, datasize: big;
210 
211 zeroscore: Score;
212 
213 
214 heads: array of ref Chain;
215 reqc: chan of (ref Vmsg, ref Client);
216 wrotec: chan of (int, ref Client);
217 lookupc: chan of ref Lookup;
218 lookupdonec: chan of (int, ref Vmsg, ref Client);
219 syncdonec: chan of (ref Vmsg, ref Client);
220 storec: chan of ref Store;
221 writererrorc: chan of string;
222 sem: ref Semaphore;
223 statsem: ref Semaphore;
224 
225 writequeue: ref Queue;
226 flatequeue: ref Fifo;
227 
228 initheap, configheap: big;
229 nreads, nwrites, nwritesdup, nsyncs, npings: int;
230 nblocks, nflateblocks: int;
231 lookupcollisions: int;
232 
233 
234 init(nil: ref Draw->Context, args: list of string)
235 {
236  sys = load Sys Sys->PATH;
237  arg := load Arg Arg->PATH;
238  daytime = load Daytime Daytime->PATH;
239  str = load String String->PATH;
240  keyring = load Keyring Keyring->PATH;
241  lock = load Lock Lock->PATH;
242  lock->init();
243  deflate = load Filter Filter->DEFLATEPATH;
244  inflate = load Filter Filter->INFLATEPATH;
245  deflate->init();
246  inflate->init();
247  venti = load Venti Venti->PATH;
248  venti->init();
249 
250  arg->init(args);
251  arg->setusage(arg->progname()+ " [-DcCqv] [-i index] [-d data] [-s statsfile] [-r addr] [-w addr] maxdatasize meanblocksize");
252  while((c := arg->opt()) != 0)
253  case c {
254  'D' => debug++;
255  'c' => cflag++;
256  Cflag++;
257  'C' => Cflag++;
258  'd' => datafile = arg->earg();
259  'i' => indexfile = arg->earg();
260  'q' => qflag++;
261  verbose++;
262  'v' => verbose++;
263  's' => (statsdir, statsfile) = str->splitstrr(arg->earg(), "/");
264  if(statsfile == nil) {
265  fprint(fildes(2), "bad stats file");
266  arg->usage();
267  }
268  'r' => raddrs = arg->earg()::raddrs;
269  'w' => waddrs = arg->earg()::waddrs;
270  * =>
271  fprint(fildes(2), "bad option: -%c\n", c);
272  arg->usage();
273  }
274 
275  args = arg->argv();
276  verbose += debug;
277  if(len args != 2)
278  arg->usage();
279 
280  if(raddrs == nil && waddrs == nil)
281  waddrs = listenaddr::nil;
282 
283  initheap = heapused();
284 
285  maxdatasize = suffix(hd args);
286  meanblocksize := int suffix(hd tl args);
287  maxblocks := maxdatasize/big (meanblocksize+Dhdrsize);
288 
289  addrbits = log2(maxdatasize);
290  if(Cflag)
291  addrbits += 1;
292  if(addrbits > 48)
293  fail("maxdatasize too large");
294  comprmask = (big 1<<(addrbits-1));
295  headbits = log2(maxblocks/big 1000);
296  minscorebits := log2(maxblocks)+log2(big 1000);
297  totalbits := ((addrbits+typebits+minscorebits-headbits+8-1)/8)*8;
298  scorebits = totalbits-addrbits-typebits;
299 
300  chainblocks = 256;
301  membytes = (typebits+scorebits+addrbits+8-1)/8;
302  headbytes = (headbits+8-1)/8;
303  scorebytes = (scorebits+8-1)/8;
304  scorebytemask = byte ((1<<(scorebits % 8)) - 1);
305 
306  zeroscore = Score.zero();
307  nheads = 1<<headbits;
308  heads = array[nheads] of ref Chain;
309  reqc = chan[256] of (ref Vmsg, ref Client);
310  wrotec = chan of (int, ref Client);
311  lookupc = chan of ref Lookup;
312  lookupdonec = chan[1] of (int, ref Vmsg, ref Client);
313  syncdonec = chan[1] of (ref Vmsg, ref Client);
314  storec = chan[32] of ref Store;
315  writererrorc = chan of string;
316  sem = Semaphore.new();
317  statsem = Semaphore.new();
318 
319  writequeue = ref Queue(array[2] of ref Block, 0);
320  flatequeue = ref Fifo(array[64] of ref Block, 0, 0, Semaphore.new());
321 
322  chainsize := Arraysize+4+Refsize + chainblocks*membytes; # this probably takes more overhead, internal memory allocation at least
323  blocksperhead := int (maxblocks/big nheads);
324  chainsperhead := (blocksperhead+chainblocks-1)/chainblocks;
325  totalchains := chainsperhead*nheads;
326  headsize := Arraysize+nheads*Refsize;
327  maxwasted := nheads*chainsize;
328  maxmemusage := big headsize+big chainsize*big totalchains;
329  max := heapmax();
330  if(maxmemusage > max)
331  say(sprint("maximum memory usage (%bd bytes) larger than available memory (%bd bytes)", maxmemusage, max));
332  if(debug) {
333  say(sprint("typebits=%d scorebits=%d addrbits=%d headbits=%d",
334  typebits, scorebits, addrbits, headbits));
335  say(sprint("nheads=%d membytes=%d headbytes=%d scorebytes=%d scorebytemask=%d",
336  nheads, membytes, headbytes, scorebytes, int scorebytemask));
337  say(sprint("headsize=%d chainsize=%d blocksperhead=%d totalchains=%d maxwasted=%d",
338  headsize, chainsize, blocksperhead, totalchains, maxwasted));
339  }
340  if(verbose)
341  say(sprint("maxmemusage=%bd maxblocks=%bd", maxmemusage, maxblocks));
342  if(qflag)
343  return;
344 
345  conns: list of (Sys->Connection, int);
346  conns = announce(raddrs, 1, conns);
347  conns = announce(waddrs, 0, conns);
348  config();
349  listen(conns);
350  main();
351 }
352 
353 announce(addrs: list of string, readonly: int, conns: list of (Sys->Connection, int)): list of (Sys->Connection, int)
354 {
355  for(l := addrs; l != nil; l = tl l) {
356  (ok, conn) := sys->announce(hd l);
357  if(ok < 0)
358  fail(sprint("announce %s: %r", hd l));
359  conns = (conn, readonly)::conns;
360  }
361  return conns;
362 }
363 
364 listen(conns: list of (Sys->Connection, int))
365 {
366  for(l := conns; l != nil; l = tl l) {
367  (conn, readonly) := hd l;
368  spawn listener(conn, readonly);
369  if(debug) say("listener spawned");
370  }
371 }
372 
373 listener(aconn: Sys->Connection, readonly: int)
374 {
375  for(;;) {
376  if(debug) say("listener: listening");
377  (ok, conn) := sys->listen(aconn);
378  if(ok < 0)
379  fail(sprint("listen: %r"));
380  if(debug) say("listener: have client");
381  dfd := sys->open(conn.dir+"/data", sys->ORDWR);
382  if(dfd == nil) {
383  if(verbose) say(sprint("opening connection data file: %r"));
384  } else
385  spawn client(dfd, readonly);
386  }
387 }
388 
389 filesize(fd: ref Sys->FD): big
390 {
391  (ok, d) := sys->fstat(fd);
392  if(ok != 0)
393  fail(sprint("fstat on index or data file: %r"));
394  return d.length;
395 }
396 
397 config()
398 {
399  indexfd = sys->open(indexfile, sys->ORDWR);
400  if(indexfd == nil)
401  fail(sprint("opening %s: %r", indexfile));
402  datafd = sys->open(datafile, sys->ORDWR);
403  if(datafd == nil)
404  fail(sprint("opening %s: %r", datafile));
405 
406  indexsize = filesize(indexfd);
407  datasize = filesize(datafd);
408  if(indexsize % big Ihdrsize != big 0)
409  fail(sprint("index file not multiple of iheadersize (indexsize=%bd iheadersize=%d)", indexsize, Ihdrsize));
410 
411  doffset := big 0;
412  io := indexsize-big (Iverify*Ihdrsize);
413  if(io < big 0)
414  io = big 0;
415  if(indexsize > big 0) {
416  if(debug) say(sprint("config: verifying last entries in index file at offset=%bd", io));
417  ih := getihdr(io);
418  if(ih == nil)
419  fail(sprint("premature eof on index file at offset=%bd", io));
420  while(ih.compressed) {
421  io += big Ihdrsize;
422  nih := getihdr(io);
423  if(nih.offset != ih.offset)
424  break;
425  ih = nih;
426  }
427  while(io < indexsize) {
428  (ndoffset, nio) := indexverify(io);
429  if(nio == big -1) {
430  dir := sys->nulldir;
431  dir.length = io;
432  if(sys->fwstat(indexfd, dir) != 0)
433  fail(sprint("truncating index file to remove trailing entries for compressed blocks: %r"));
434  indexsize = io;
435  break;
436  }
437  (doffset, io) = (ndoffset, nio);
438  }
439  }
440 
441  if(debug) say("config: syncing index file to data file");
442  nadd := ncompr := 0;
443  while(doffset < datasize) {
444  (dhdrs, compr, noffset, err) := offsetread(doffset);
445  if(err != nil)
446  fail(sprint("syncing index from data at offset=%bd: %s", doffset, err));
447  for(i := 0; i < len dhdrs; i++) {
448  dhdr := dhdrs[i];
449  err = indexstore(ref Ihdr(dhdr.score.a[:Indexscoresize], dhdr.dtype, doffset, compr));
450  if(err != nil)
451  fail("syncing index from data: "+err);
452  nadd++;
453  }
454  if(compr)
455  ncompr = len dhdrs;
456  doffset = noffset;
457  }
458  if(verbose) say(sprint("config: added %d entries to index file of which %d compressed", nadd, ncompr));
459 
460  if(debug) say("config: reading entries into memory");
461  indexsize = filesize(indexfd);
462  t0 := sys->millisec();
463 
464  blockc := chan[Nqueuechunks] of (array of byte, big);
465  donec := chan of int;
466  spawn indexunpack(blockc, donec);
467  for(o := big 0; o < indexsize; o += big Ichunksize) {
468  d := array[Ichunksize] of byte;
469  n := preadn(indexfd, d, len d, o);
470  if(n < 0 || n % Ihdrsize != 0) {
471  if(n > 0)
472  sys->werrstr("bytes read not multiple of iheadersize");
473  fail(sprint("reading index entries chunk at offset=%bd: %r", o));
474  }
475  if(n > 0)
476  blockc<- = (d[:n], o);
477  if(n < Ichunksize)
478  break;
479  }
480  blockc<- = (nil, big 0);
481  <-donec;
482  t1 := sys->millisec();
483  nblocks = int (indexsize/big Ihdrsize);
484 
485  if(verbose) say(sprint("config: done, loaded %d entries (%bd bytes) of which %d compressed in memory in %0.2f s",
486  nblocks, indexsize, nflateblocks, real (t1-t0)/1000.0));
487 }
488 
489 checkheaders(ih: ref Ihdr, dh: ref Dhdr)
490 {
491  if(halfcmp(ih.halfscore, dh.score.a, ih.dtype, dh.dtype) == 0)
492  return;
493  fail(sprint("partial score or type index file does not match block in data file at offset=%bd, index: score=%s type=%d compressed=%d, data: score=%s type=%d",
494  ih.offset, scorestr(ih.halfscore), ih.dtype, ih.compressed, dh.score.text(), dh.dtype));
495 }
496 
497 getihdr(o: big): ref Ihdr
498 {
499  d := array[Ihdrsize] of byte;
500  n := preadn(indexfd, d, len d, o);
501  if(n < 0)
502  fail(sprint("reading at offset=%bd: %r", o));
503  if(n == 0)
504  return nil;
505  if(n != len d)
506  fail(sprint("short read on index file at offset=%bd", o));
507  ih := Ihdr.unpack(d);
508  if(ih.offset > datasize)
509  fail(sprint("index entry at offset=%bd points past end of data file (offset=%bd)", o, ih.offset));
510  return ih;
511 }
512 
513 indexverify(o: big): (big, big)
514 {
515  ih := getihdr(o);
516  (dhs, nil, noffset, berr) := offsetread(ih.offset);
517  if(berr != nil)
518  fail("reading block pointed to by one of last entries in index file: "+berr);
519  checkheaders(ih, dhs[0]);
520  o += big Ihdrsize;
521  for(i := 1; i < len dhs; i++) {
522  ih = getihdr(o);
523  if(ih == nil)
524  return (big -1, big -1);
525  checkheaders(ih, dhs[i]);
526  o += big Ihdrsize;
527  }
528  return (noffset, o);
529 }
530 
531 indexunpack(blockc: chan of (array of byte, big), donec: chan of int)
532 {
533  for(;;) {
534  (d, o) := <-blockc;
535  if(d == nil)
536  break;
537 
538  for(i := 0; i < len d; i += Ihdrsize) {
539  ih := Ihdr.unpack(d[i:i+Ihdrsize]);
540  meminsert(ih);
541  if(ih.compressed) {
542  if(!Cflag)
543  fail(sprint("compressed block at index file offset=%bd but -C not specified", o));
544  nflateblocks++;
545  }
546  }
547  }
548  donec<- = 0;
549 }
550 
551 killc(c: ref Client)
552 {
553  killpid(c.rpid);
554  killpid(c.wpid);
555  c.rpid = c.wpid = -1;
556 }
557 
558 killpid(pid: int)
559 {
560  fd := sys->open("/prog/"+string pid+"/ctl", sys->OWRITE);
561  if(fd != nil)
562  sys->fprint(fd, "kill\n");
563 }
564 
565 decompress(src, dst: array of byte): string
566 {
567 if(debug) say(sprint("decompress len src=%d len dst=%d", len src, len dst));
568  rqc := inflate->start("");
569  <-rqc;
570 
571 done:
572  for(;;) {
573  pick r := <-rqc {
574  Fill =>
575  n := len src;
576  if(len r.buf < n)
577  n = len r.buf;
578  r.buf[:] = src[:n];
579  src = src[n:];
580  r.reply<- = n;
581  Result =>
582  if(len r.buf > len dst) {
583  r.reply<- = -1;
584  return "inflated data too large";
585  }
586  dst[:] = r.buf;
587  dst = dst[len r.buf:];
588  r.reply<- = 0;
589  Finished =>
590  if(len r.buf != 0)
591  return "leftover data at end of inflate";
592  break done;
593  Error =>
594  return r.e;
595  }
596  }
597  if(len dst != 0)
598  return "inflated data smaller than expected";
599  return nil;
600 }
601 
602 
603 flateadd(f: ref Flate, d: array of byte)
604 {
605  if(f.nd+len d <= len f.d)
606  f.d[f.nd:] = d;
607  f.nd += len d;
608 }
609 
610 Flate.new(n: int): ref Flate
611 {
612  f := ref Flate(deflate->start(""), nil, array[n] of byte, 0, 0);
613  pick r := <-f.rqc {
614  Start => f.pid = r.pid;
615  * => fail("bad first message from deflate");
616  }
617  pick r := <-f.rqc {
618  Fill => f.lastfill = r;
619  * => fail("bad second message from deflate");
620  }
621  return f;
622 }
623 
624 Flate.write(f: self ref Flate, d: array of byte): string
625 {
626  if(f.nd > len d)
627  return nil;
628 
629  n := len d;
630  if(len f.lastfill.buf < n)
631  n = len f.lastfill.buf;
632  f.lastfill.buf[:] = d[:n];
633  f.lastfill.reply<- = n;
634  f.lastfill = nil;
635 
636  for(;;) {
637  pick r := <-f.rqc {
638  Fill =>
639  f.lastfill = r;
640  return nil;
641  Result =>
642  flateadd(f, r.buf);
643  Error =>
644  return r.e;
645  * => fail("bad rq from deflate");
646  }
647  }
648  return nil;
649 }
650 
651 Flate.finish(f: self ref Flate): (array of byte, string)
652 {
653  if(f.nd > len f.d) {
654  killpid(f.pid);
655  return (nil, nil);
656  }
657  f.lastfill.reply<- = 0;
658 done:
659  for(;;) {
660  pick r := <-f.rqc {
661  Result =>
662  flateadd(f, r.buf);
663  r.reply<- = 0;
664  Finished =>
665  if(len r.buf != 0)
666  fail("deflate left data uncompressed");
667  break done;
668  Error =>
669  return (nil, r.e);
670  * =>
671  fail("bad response from deflate");
672  }
673  }
674  if(f.nd > len f.d)
675  return (nil, nil);
676  return (f.d[:f.nd], nil);
677 }
678 
679 reader()
680 {
681  for(;;) {
682  pick w := <-lookupc {
683  Read =>
684  if(debug) say("reader: have read message");
685 
686  (nil, d, err) := datalookup(w.addrs, w.score, w.dtype, 1, w.size);
687  if(len d > w.size)
688  err = "data larger than requested";
689  if(err == nil && d == nil)
690  err = "no such score/type";
691  rmsg: ref Vmsg;
692  if(err == nil) {
693  rmsg = ref Vmsg.Rread(0, w.tid, d);
694  } else {
695  rmsg = ref Vmsg.Rerror(0, w.tid, err);
696  if(verbose) say("reader: error reading data: "+err);
697  }
698  if(debug) say("reader: read handled");
699  lookupdonec<- = (0, rmsg, w.c);
700 
701  Write =>
702  if(debug) say(sprint("reader: have write message"));
703 
704  (hit, nil, err) := datalookup(w.addrs, w.b.score, w.b.dtype, 0, len w.b.d);
705  if(err != nil) {
706  if(debug) say("reader: datalookup failed: "+err);
707  continue;
708  }
709  if(hit) {
710 if(debug) say("reader: have hit, already in datafile");
711  sem.obtain();
712  writequeue.remove(w.b);
713  sem.release();
714  statsem.obtain();
715  nwritesdup++;
716  statsem.release();
717  } else {
718 if(debug) say("reader: no hit, need to write block");
719  if(debug) say("reader: data not yet present for write");
720  storec<- = ref Store.Write(w.c, w.b);
721  }
722  if(debug) say("reader: write handled");
723  lookupdonec<- = (1, nil, w.c);
724  }
725  }
726 }
727 
728 flate: ref Flate;
729 Maxfblocks: con 256;
730 rawsize: int;
731 nfblocks: int;
732 fblocks: array of (ref Block, big);
733 
734 
735 flatefinish(): string
736 {
737 if(debug) say(sprint("flatefinish: rawsize=%d nfblocks=%d", rawsize, nfblocks));
738 
739  (d, err) := flate.finish();
740  if(err != nil)
741  return "flatefinish: "+err;
742 
743  if(d == nil || Fhdrsize+nfblocks*Fbhdrsize+len d > Maxblocksize) {
744  if(debug) say("flatefinish: data too large, writing as normal blocks");
745 
746  l := array[nfblocks] of (ref Ihdr, ref Block);
747  for(i := 0; i < nfblocks; i++) {
748  (b, conntime) := fblocks[i];
749  dh := ref Dhdr(b.score, b.dtype, len b.d, conntime);
750  offset: big;
751  (offset, err) = datastore(dh, b.d);
752  ih := ref Ihdr(b.score.a[:Indexscoresize], b.dtype, offset, 0);
753  if(err == nil)
754  err = indexstore(ih);
755  if(err != nil)
756  return err;
757  l[i] = (ih, b);
758  }
759  sem.obtain();
760  for(i = 0; i < nfblocks; i++) {
761  (ih, b) := l[i];
762  meminsert(ih);
763  writequeue.remove(b);
764  }
765  sem.release();
766  } else {
767  if(debug) say("flatefinish: writing deflated block");
768 
769  dhs := array[nfblocks] of ref Dhdr;
770  for(i := 0; i < nfblocks; i++) {
771  (b, conntime) := fblocks[i];
772  dhs[i] = ref Dhdr(b.score, b.dtype, len b.d, conntime);
773  }
774  offset: big;
775  (offset, err) = datastoreflate(dhs, d);
776  if(err != nil)
777  return err;
778  ihs := array[nfblocks] of ref Ihdr;
779  for(i = 0; i < nfblocks; i++) {
780  (b, nil) := fblocks[i];
781  ihs[i] = ref Ihdr(b.score.a[:Indexscoresize], b.dtype, offset, 1);
782  err = indexstore(ihs[i]);
783  if(err != nil)
784  return err;
785  }
786  sem.obtain();
787  for(i = 0; i < nfblocks; i++) {
788  (b, nil) := fblocks[i];
789  meminsert(ihs[i]);
790  writequeue.remove(b);
791  }
792  sem.release();
793 if(debug) say("flatefinish: flated block done");
794  }
795  for(i := 0; i < nfblocks; i++)
796  fblocks[i] = (nil, big 0);
797  nfblocks = 0;
798  rawsize = 0;
799  flate = Flate.new(Maxblocksize);
800 if(debug) say("flatefinish: done");
801  return nil;
802 }
803 
804 lasterr: string;
805 
806 seterror(s: string)
807 {
808  if(lasterr == nil || verbose)
809  say("write error: "+s);
810  if(lasterr == nil) {
811  lasterr = s;
812  writererrorc<- = s;
813  }
814 }
815 
816 writer()
817 {
818  flate = Flate.new(Maxblocksize);
819  fblocks = array[Maxfblocks] of (ref Block, big);
820 
821  for(;;) {
822  pick st := <-storec {
823  Write =>
824  if(lasterr != nil)
825  continue;
826  b := st.b;
827  if(cflag) {
828  max := Fhdrsize+(nfblocks+1)*Fbhdrsize+(rawsize+len b.d)*9/10;
829  if(nfblocks == Maxfblocks || max > Maxblocksize) {
830  err := flatefinish();
831  if(err != nil) {
832  seterror(err);
833  continue;
834  }
835  }
836  err := flate.write(b.d);
837  if(err != nil) {
838  seterror(err);
839  continue;
840  }
841  fblocks[nfblocks++] = (b, st.c.conntime);
842  rawsize += len b.d;
843 
844  } else {
845  dh := ref Dhdr(b.score, b.dtype, len b.d, st.c.conntime);
846  (offset, err) := datastore(dh, b.d);
847  ih := ref Ihdr(b.score.a[:Indexscoresize], b.dtype, offset, 0);
848  if(err == nil)
849  err = indexstore(ih);
850  if(err != nil) {
851  seterror(err);
852  continue;
853  }
854  sem.obtain();
855  meminsert(ih);
856  writequeue.remove(b);
857  sem.release();
858  if(debug) say("writer: data now on file");
859  }
860  Sync =>
861  if(lasterr != nil) {
862  if(st.c != nil)
863  syncdonec<- = (ref Vmsg.Rerror(0, st.tid, lasterr), st.c);
864  continue;
865  }
866  rmsg: ref Vmsg;
867  if(cflag && nfblocks > 0) {
868  err := flatefinish();
869  if(err != nil) {
870  err = "compress/write: "+err;
871  seterror(err);
872  rmsg = ref Vmsg.Rerror(0, st.tid, err);
873  if(st.c != nil)
874  syncdonec<- = (rmsg, st.c);
875  continue;
876  }
877  }
878  if(sys->fwstat(datafd, sys->nulldir) == 0) {
879  rmsg = ref Vmsg.Rsync(0, st.tid);
880  } else {
881  rmsg = ref Vmsg.Rerror(0, st.tid, sprint("syncing: %r"));
882  seterror(sprint("syncing: %r"));
883  }
884  if(st.c != nil)
885  syncdonec<- = (rmsg, st.c);
886  if(debug) say("writer: sync is done");
887  }
888  }
889 }
890 
891 nreaders, nbusyreaders, nreaderwrites: int;
892 
893 lookupsend(w: ref Lookup)
894 {
895  if(nbusyreaders >= nreaders && nreaders < Maxreaders) {
896  spawn reader();
897  nreaders++;
898  }
899  if(nbusyreaders >= nreaders) {
900  (iswrite, rm, rc) := <-lookupdonec;
901  lookupdone(iswrite, rm, rc);
902  }
903  lookupc<- = w;
904  nbusyreaders++;
905  if(tagof w == tagof Lookup.Write)
906  nreaderwrites++;
907  if(debug) say("main: message sent to reader");
908 }
909 
910 lookupdone(iswrite: int, rmsg: ref Vmsg, c: ref Client)
911 {
912  nbusyreaders--;
913  if(iswrite)
914  nreaderwrites--;
915  if(c.rpid != -1 && rmsg != nil)
916  c.respc<- = rmsg;
917  if(debug) say("main: rmsg from reader handled");
918 }
919 
920 takewrites()
921 {
922  while(nreaderwrites > 0) {
923  (iswrite, rm, rc) := <-lookupdonec;
924  lookupdone(iswrite, rm, rc);
925  }
926 }
927 
928 main()
929 {
930  writeerror: string;
931 
932  fio := sys->file2chan(statsdir, statsfile);
933  if(fio == nil) {
934  say(sprint("file2chan: %r; not serving statistics"));
935  fio = ref sys->FileIO(chan of (int, int, int, sys->Rread), chan of (int, array of byte, int, sys->Rwrite));
936  } else
937  if(debug) say(sprint("file2chan: serving %s%s", statsdir, statsfile));
938 
939  spawn writer();
940 
941  configheap = heapused();
942  if(verbose)
943  say(sprint("heap used after startup=%bd", configheap-initheap));
944 
945  for(;;) alt {
946  (offset, nil, nil, rc) := <- fio.read =>
947  if(rc == nil)
948  continue;
949 
950  statsem.obtain();
951  lnblocks := nblocks;
952  lnflateblocks := nflateblocks;
953  ldatasize := datasize;
954  lnwritesdup := nwritesdup;
955  statsem.release();
956 
957  h := heapused();
958  mean := 0;
959  if(lnblocks > 0)
960  mean = int (ldatasize/big lnblocks);
961  buf := array of byte sprint(
962  "%14d reads\n%14d writes\n%14d syncs\n%14d pings\n"+
963  "%14d dupwrites\n%14d lookupcollisions\n%14d blocks\n%14d flateblocks\n%14d meanblocksize\n"+
964  "%14d readerprocs\n%14d busyreaderprocs\n%14d readerwrites\n"+
965  "%14bd totalheap\n%14bd newheap\n%14bd datasize\n%s\n",
966  nreads, nwrites, nsyncs, npings,
967  lnwritesdup, lookupcollisions, lnblocks, lnflateblocks, mean,
968  nreaders, nbusyreaders, nreaderwrites,
969  h, h-configheap, ldatasize, writeerror);
970 
971  if(offset > len buf)
972  offset = len buf;
973  rc <-= (buf[offset:], nil);
974 
975  (nil, nil, nil, wc) := <- fio.write =>
976  if(wc == nil)
977  continue;
978  if(debug) say("main: file2chan write");
979  wc <-= (0, Eperm);
980 
981  (iswrite, rmsg, c) := <-lookupdonec =>
982  lookupdone(iswrite, rmsg, c);
983 
984  err := <-writererrorc =>
985  if(debug) say("main: writer error: "+err);
986  writeerror = err;
987 
988  (rmsg, c) := <-syncdonec =>
989  if(c.rpid != -1)
990  c.respc<- = rmsg;
991  if(debug) say("main: sync rmsg from writer handled");
992 
993  (ok, c) := <-wrotec =>
994  if(debug) say("main: client wrote vmsg");
995  if(ok == 0) {
996  if(writeerror == nil && !c.readonly) {
997  takewrites();
998  storec<- = ref Store.Sync(nil, 0);
999  }
1000  killc(c);
1001  if(debug) say("main: client killed after write error");
1002  } else
1003  c.inuse--;
1004 
1005  (vmsg, c) := <-reqc =>
1006  if(debug) say("main: client read message: "+vmsg.text());
1007  if(vmsg == nil) {
1008  if(writeerror == nil && !c.readonly) {
1009  takewrites();
1010  storec<- = ref Store.Sync(nil, 0);
1011  }
1012  killc(c);
1013  if(debug) say("main: client killed after read error");
1014  continue;
1015  }
1016  if(c.inuse >= 256+1) {
1017  killc(c);
1018  if(debug) say("main: client killed after too many tids in use");
1019  continue;
1020  }
1021  c.inuse++;
1022 
1023  pick tmsg := vmsg {
1024  Tread =>
1025  nreads++;
1026  if(tmsg.score.eq(zeroscore)) {
1027  c.respc<- = ref Vmsg.Rread(0, tmsg.tid, array[0] of byte);
1028  continue;
1029  }
1030 
1031  sem.obtain();
1032  d := queuelookup(tmsg.score, tmsg.etype);
1033  if(d != nil) {
1034  sem.release();
1035  c.respc<- = ref Vmsg.Rread(0, tmsg.tid, d);
1036  continue;
1037  }
1038  addrs := memlookup(tmsg.score, tmsg.etype);
1039  sem.release();
1040  if(len addrs == 0) {
1041  c.respc<- = ref Vmsg.Rerror(0, tmsg.tid, "no such score/type");
1042  continue;
1043  }
1044 
1045  lookupsend(ref Lookup.Read(c, tmsg.tid, addrs, tmsg.score, tmsg.etype, tmsg.n));
1046 
1047  Twrite =>
1048  nwrites++;
1049  nowritemsg: string;
1050  if(writeerror != nil)
1051  nowritemsg = writeerror;
1052  if(c.readonly)
1053  nowritemsg = "writes not allowed";
1054  if(nowritemsg != nil) {
1055  c.respc<- = ref Vmsg.Rerror(0, tmsg.tid, nowritemsg);
1056  continue;
1057  }
1058 
1059  score := Score(sha1(tmsg.data));
1060  c.respc<- = ref Vmsg.Rwrite(0, tmsg.tid, score);
1061 
1062  if(score.eq(zeroscore))
1063  continue;
1064 
1065  sem.obtain();
1066  d := queuelookup(score, tmsg.etype);
1067  if(d != nil) {
1068  sem.release();
1069  continue;
1070  }
1071  addrs := memlookup(score, tmsg.etype);
1072  writequeue.insert(b := ref Block(score, tmsg.etype, tmsg.data));
1073  sem.release();
1074 
1075  if(len addrs > 0)
1076  lookupsend(ref Lookup.Write(c, tmsg.tid, addrs, b));
1077  else
1078  storec<- = ref Store.Write(c, b);
1079 
1080  Tsync =>
1081  nsyncs++;
1082  nowritemsg: string;
1083  if(writeerror != nil)
1084  nowritemsg = writeerror;
1085  if(c.readonly)
1086  nowritemsg = "writes not allowed";
1087  if(nowritemsg != nil) {
1088  c.respc<- = ref Vmsg.Rerror(0, tmsg.tid, nowritemsg);
1089  } else {
1090  takewrites();
1091  storec<- = ref Store.Sync(c, tmsg.tid);
1092  }
1093 
1094  Tping =>
1095  npings++;
1096  c.respc<- = ref Vmsg.Rping(0, tmsg.tid);
1097 
1098  * =>
1099  if(debug) say(sprint("main: bad tmsg, tag=%d", tagof vmsg));
1100  }
1101  }
1102 }
1103 
1104 readline(fd: ref Sys->FD): array of byte
1105 {
1106  buf := array[128] of byte;
1107  for(i := 0; i < len buf; i++) {
1108  if(sys->read(fd, buf[i:], 1) != 1)
1109  return nil;
1110  if(buf[i] == byte '\n')
1111  return buf[:i];
1112  }
1113  sys->werrstr("version line too long");
1114  return nil;
1115 }
1116 
1117 handshake(fd: ref Sys->FD): string
1118 {
1119  if(fprint(fd, "venti-02-ventisrv\n") < 0)
1120  return sprint("writing version: %r");
1121 
1122  d := readline(fd);
1123  if(d == nil)
1124  return sprint("bad version (%r)");
1125  if(!str->prefix("venti-02-", string d))
1126  return sprint("bad version (%s)", string d);
1127 
1128  (tvmsg, terr) := Vmsg.read(fd);
1129  if(terr != nil)
1130  return "reading thello: "+terr;
1131 
1132  if(tagof tvmsg != tagof Vmsg.Thello)
1133  return "first message not thello";
1134 
1135  rvmsg := ref Vmsg.Rhello(0, 0, nil, 0, 0);
1136  md := rvmsg.pack();
1137  if(md == nil || sys->write(fd, md, len md) != len md)
1138  return sprint("writing rhello: %r");
1139  return nil;
1140 }
1141 
1142 client(fd: ref Sys->FD, readonly: int)
1143 {
1144  conntime := big daytime->now();
1145  herr := handshake(fd);
1146  if(herr != nil) {
1147  if(debug) say("handshake failed: "+herr);
1148  return;
1149  }
1150 
1151  c := ref Client(sys->pctl(0, nil), 0, chan[256+1] of ref Vmsg, 0, conntime, readonly);
1152  spawn cwriter(fd, pidc := chan of int, c);
1153  c.wpid = <-pidc;
1154 
1155  for(;;) {
1156  (vmsg, err) := Vmsg.read(fd);
1157  if(vmsg != nil && !vmsg.istmsg)
1158  err = "message not tmsg";
1159  if(vmsg != nil && tagof vmsg == tagof Vmsg.Tgoodbye)
1160  err = "closing down";
1161  if(err != nil || vmsg == nil) {
1162  if(debug) say("client: reading: "+err);
1163  break;
1164  }
1165 
1166  reqc<- = (vmsg, c);
1167  }
1168  reqc<- = (nil, c);
1169 }
1170 
1171 cwriter(fd: ref Sys->FD, pidc: chan of int, c: ref Client)
1172 {
1173  pidc<- = sys->pctl(0, nil);
1174  for(;;) {
1175  rmsg := <-c.respc;
1176  if(rmsg == nil) {
1177  if(debug) say("client: have nil on respc, closing");
1178  break;
1179  }
1180  d := rmsg.pack();
1181  n := sys->write(fd, d, len d);
1182  if(n != len d) {
1183  if(debug) say(sprint("client: writing rmsg: %r"));
1184  break;
1185  }
1186  wrotec<- = (1, c);
1187  }
1188  if(debug) say("client: exiting");
1189  wrotec<- = (0, c);
1190 }
1191 
1192 halfcmp(hs: array of byte, s: array of byte, hdt, dt: int): int
1193 {
1194  if(hdt != dt)
1195  return hdt - dt;
1196  for(i := 0; i < len hs; i++)
1197  if(hs[i] != s[i])
1198  return int hs[i] - int s[i];
1199  return 0;
1200 }
1201 
1202 indexstore(ih: ref Ihdr): string
1203 {
1204  d := array[Ihdrsize] of byte;
1205  ih.pack(d);
1206  n := sys->pwrite(indexfd, d, len d, indexsize);
1207  if(n != len d)
1208  return sprint("writing index at offset=%bd: %r", indexsize);
1209 if(debug) say(sprint("indexstore: stored at indexsize=%bd data offset=%bd", indexsize, ih.offset));
1210  indexsize += big n;
1211  return nil;
1212 }
1213 
1214 datastoreflate(blocks: array of ref Dhdr, d: array of byte): (big, string)
1215 {
1216 if(debug) say(sprint("datastoreflate: len blocks=%d len d=%d", len blocks, len d));
1217  fh := ref Fhdr(blocks, len blocks*Fbhdrsize, len d);
1218  fhbuf := array[Fhdrsize+fh.hsize] of byte;
1219  fh.pack(fhbuf);
1220  if(big (len fhbuf+len d)+datasize > maxdatasize)
1221  return (datasize, sprint("data file full"));
1222 
1223  n := sys->pwrite(datafd, fhbuf, len fhbuf, datasize);
1224  if(n != len fhbuf)
1225  return (datasize, sprint("writing flateblock header: %r"));
1226 
1227  n = sys->pwrite(datafd, d, len d, datasize+big len fhbuf);
1228  if(n != len d)
1229  return (datasize, sprint("writing flateblock data: %r"));
1230 
1231  offset := datasize;
1232  statsem.obtain();
1233  datasize += big (len fhbuf+len d);
1234  nblocks += len blocks;
1235  nflateblocks += len blocks;
1236  statsem.release();
1237 if(debug) say(sprint("datastoreflate: stored at offset=%bd datasize=%bd len blocks=%d comprsize=%d", offset, datasize, len blocks, len d));
1238  return (offset, nil);
1239 }
1240 
1241 
1242 datastore(dh: ref Dhdr, d: array of byte): (big, string)
1243 {
1244  if(dh.size != len d)
1245  fail(sprint("datastore: refusing to write block, header says %d bytes, data is %d bytes", dh.size, len d));
1246  if(big (Dhdrsize+dh.size)+datasize > maxdatasize)
1247  return (datasize, sprint("data file full"));
1248 
1249  dhbuf := array[Dhdrsize] of byte;
1250  dh.pack(dhbuf);
1251  n := sys->pwrite(datafd, dhbuf, len dhbuf, datasize);
1252  if(n != len dhbuf)
1253  return (datasize, sprint("writing data header: %r"));
1254 
1255  n = sys->pwrite(datafd, d, len d, datasize+big len dhbuf);
1256  if(n != len d)
1257  return (datasize, sprint("writing data: %r"));
1258 
1259  offset := datasize;
1260  statsem.obtain();
1261  datasize += big (len dhbuf+len d);
1262  nblocks++;
1263  statsem.release();
1264 if(debug) say(sprint("datastore: stored at offset=%bd datasize=%bd blocksize=%d", offset, datasize, dh.size));
1265  return (offset, nil);
1266 }
1267 
1268 offsetread(offset: big): (array of ref Dhdr, int, big, string)
1269 {
1270  d := array[Maxblocksize] of byte;
1271  n := preadn(datafd, d, len d, offset);
1272  if(n < 0)
1273  return (nil, 0, big 0, sprint("reading: %r"));
1274  if(n < 4)
1275  return (nil, 0, big 0, "short header");
1276  case get32(d, 0) {
1277  Dhdrmagic =>
1278  if(len d < Dhdrsize)
1279  return (nil, 0, big 0, "short dheader");
1280 
1281  (dhdr, err) := Dhdr.unpack(d);
1282  if(err != nil)
1283  return (nil, 0, big 0, "unpack dheader: "+err);
1284  if(Dhdrsize+dhdr.size > len d)
1285  return (nil, 0, big 0, "dheader points outside data file");
1286 
1287  score := Score(sha1(d[Dhdrsize:Dhdrsize+dhdr.size]));
1288  if(!score.eq(dhdr.score))
1289  return (nil, 0, big 0, sprint("dheader score (%s) does not match actual score (%s) at offset=%bd",
1290  dhdr.score.text(), score.text(), offset));
1291  return (array[1] of {dhdr}, 0, offset+big (Dhdrsize+dhdr.size), nil);
1292 
1293  Fhdrmagic =>
1294  if(len d < Fhdrsize)
1295  return (nil, 0, big 0, "short fheader");
1296 
1297  (fhdr, o, err) := Fhdr.unpack(d);
1298  if(err != nil)
1299  return (nil, 0, big 0, "unpack fheader: "+err);
1300  if(Fhdrsize+fhdr.hsize+fhdr.dsize > len d)
1301  return (nil, 0, big 0, "fheader points outside data file");
1302 
1303  nb := len fhdr.blocks;
1304  lb := fhdr.blocks[nb-1];
1305  uncsize := o[nb-1]+lb.size;
1306  dst := array[uncsize] of byte;
1307  s := Fhdrsize+fhdr.hsize;
1308  err = decompress(d[s:s+fhdr.dsize], dst);
1309  if(err != nil)
1310  return (nil, 0, big 0, "decompressing: "+err);
1311 
1312  for(i := 0; i < len fhdr.blocks; i++) {
1313  dhdr := fhdr.blocks[i];
1314  off := o[i];
1315  score := Score(sha1(dst[off:off+dhdr.size]));
1316  if(!score.eq(dhdr.score))
1317  return (nil, 0, big 0, sprint("fheader score (%s, i=%d) does not match actual score (%s) at offset=%bd",
1318  dhdr.score.text(), i, score.text(), offset));
1319  }
1320  return (fhdr.blocks, 1, offset+big (Fhdrsize+fhdr.hsize+fhdr.dsize), nil);
1321 
1322  * =>
1323  return (nil, 0, big 0, "bad magic");
1324  }
1325 }
1326 
1327 dblockget(offset: big, score: Score, dtype, readdata, size: int): (int, array of byte, string)
1328 {
1329 if(debug) say(sprint("blockread offset=%bd", offset));
1330  rsize := size;
1331  if(rsize == Venti->Maxlumpsize)
1332  rsize = 8*1024;
1333  buf := array[Dhdrsize+rsize] of byte;
1334  n := preadn(datafd, buf, len buf, offset);
1335  if(n < Dhdrsize)
1336  return (0, nil, sprint("reading block header at offset=%bd: %r", offset));
1337 
1338  (dh, err) := Dhdr.unpack(buf[:Dhdrsize]);
1339  if(err != nil)
1340  return (0, nil, err);
1341 
1342  d: array of byte;
1343  if(readdata) {
1344  d = buf[Dhdrsize:];
1345  if(len d > dh.size)
1346  d = d[:dh.size];
1347  if(len d < dh.size) {
1348  if(size == rsize)
1349  return (0, nil, sprint("data (%d bytes) larger than requested (%d bytes)", dh.size, size));
1350  newd := array[dh.size] of byte;
1351  newd[:] = d;
1352  want := len newd-len d;
1353  n = preadn(datafd, newd[len d:], want, offset+big len buf);
1354  if(n >= 0 && n < want)
1355  sys->werrstr("short read");
1356  if(n != want)
1357  return (0, nil, sprint("reading block at offset=%bd: %r", offset));
1358  d = newd;
1359  }
1360  ds := Score(sha1(d[:dh.size]));
1361  if(!dh.score.eq(ds))
1362  return (0, nil, sprint("mismatching score for block at offset=%bd: header=%s data=%s", offset, dh.score.text(), ds.text()));
1363  }
1364 
1365  if(!score.eq(dh.score) || dh.dtype != dtype)
1366  return (0, nil, nil);
1367  return (1, d, nil);
1368 }
1369 
1370 flatequeueadd(blocks: array of ref Dhdr, o: array of int, d: array of byte)
1371 {
1372  flatequeue.sem.obtain();
1373  for(i := 0; i < len blocks; i++) {
1374  dh := blocks[i];
1375  off := o[i];
1376  flatequeue.insert(ref Block(dh.score, dh.dtype, d[off:off+dh.size]));
1377  }
1378  flatequeue.sem.release();
1379 }
1380 
1381 fblockget(offset: big, score: Score, dtype, readdata, size: int): (int, array of byte, string)
1382 {
1383  buf := array[Maxblocksize] of byte;
1384  n := preadn(datafd, buf, len buf, offset);
1385  if(n < 0)
1386  return (0, nil, sprint("reading: %r"));
1387  buf = buf[:n];
1388  (f, o, err) := Fhdr.unpack(buf);
1389  if(err != nil)
1390  return (0, nil, err);
1391  for(i := 0; i < len f.blocks; i++) {
1392  dh := f.blocks[i];
1393  if(dh.dtype == dtype && dh.score.eq(score)) {
1394  if(dh.size > size)
1395  return (0, nil, sprint("data (%d) larger than requested (%d)", dh.size, size));
1396 if(debug) say("flateblockget: hit");
1397  if(!readdata)
1398  return (1, nil, nil);
1399  nb := len f.blocks;
1400  lb := f.blocks[nb-1];
1401  dst := array[o[nb-1]+lb.size] of byte;
1402  s := Fhdrsize+f.hsize;
1403  if(len buf < s+f.dsize)
1404  return (0, nil, sprint("fheader points outside datafile at offset=%bd need=%d have=%d", offset, s+f.dsize, n));
1405  err = decompress(buf[s:s+f.dsize], dst);
1406  if(err != nil)
1407  return (0, nil, "decompressing: "+err);
1408  flatequeueadd(f.blocks, o, dst);
1409  off := o[i];
1410  return (1, dst[off:off+dh.size], nil);
1411  }
1412  }
1413 if(debug) say("flateblockget: miss");
1414  return (0, nil, nil);
1415 }
1416 
1417 blockget(compressed: int, offset: big, score: Score, dtype: int, readdata: int, size: int): (int, array of byte, string)
1418 {
1419  if(compressed)
1420  return fblockget(offset, score, dtype, readdata, size);
1421  return dblockget(offset, score, dtype, readdata, size);
1422 }
1423 
1424 datalookup(offsets: list of (int, big), score: Score, dtype: int, readdata: int, size: int): (int, array of byte, string)
1425 {
1426  err: string;
1427  for(l := offsets; l != nil; l = tl l) {
1428  (compr, o) := hd l;
1429  (hit, d, derr) := blockget(compr, o, score, dtype, readdata, size);
1430  if(hit)
1431  return (hit, d, nil);
1432  if(derr != nil)
1433  err = derr;
1434  }
1435  return (0, nil, err);
1436 }
1437 
1438 getscore(c: ref Chain, i: int, a: array of byte)
1439 {
1440  b := (i+1)*membytes-1-scorebytes;
1441  a[0] = c.d[b]&scorebytemask;
1442  for(j := 1; j < scorebytes; j++)
1443  a[j] = c.d[b+j];
1444 }
1445 
1446 dumpchain(c: ref Chain)
1447 {
1448  for(i := 0; i < c.used; i++) {
1449  l := (i+1)*membytes-1;
1450  dtype := c.d[l];
1451  getscore(c, i, d := array[scorebytes] of byte);
1452  (compr, addr) := c.getaddr(i);
1453  say(sprint(" i=%d dtype=%d addr=%bd compr=%d score=%s", i, int dtype, addr, compr, scorestr(d)));
1454  }
1455 }
1456 
1457 dumphead(score: Score)
1458 {
1459  i := 0;
1460  for(c := heads[head(score.a)]; c != nil; c = c.next) {
1461  say(sprint("chain %d, c.used=%d", i++, c.used));
1462  dumpchain(c);
1463  }
1464 }
1465 
1466 blockcmp(c: ref Chain, j: int, d: array of byte): int
1467 {
1468  o := (j+1)*membytes-1;
1469  for(i := len d-1; i > 0; i--) {
1470  if(c.d[o] != d[i])
1471  return int c.d[o]-int d[i];
1472  o--;
1473  }
1474  return int (c.d[o]&scorebytemask)-int (d[0]&scorebytemask);
1475 }
1476 
1477 find(c: ref Chain, d: array of byte): int
1478 {
1479  f := 0;
1480  l := c.used;
1481  while(f < l) {
1482  m := (f+l)/2;
1483  cmp := blockcmp(c, m, d);
1484  if(cmp < 0)
1485  f = m+1;
1486  else
1487  l = m;
1488  }
1489  return l;
1490 }
1491 
1492 find0(c: ref Chain, d: array of byte): int
1493 {
1494  for(i := 0; i < c.used; i++)
1495  if(blockcmp(c, i, d) >= 0)
1496  return i;
1497  return i;
1498 }
1499 
1500 Chain.getaddr(c: self ref Chain, j: int): (int, big)
1501 {
1502  o := j*membytes;
1503  v := big 0;
1504  for(b := addrbits; b >= 8; b -= 8)
1505  v |= big c.d[o++] << (b-8);
1506  if(b > 0)
1507  v |= big c.d[o] >> (8-b);
1508  iscompr := Cflag && int ((v&comprmask)>>(addrbits-1));
1509  if(iscompr)
1510  v &= ~comprmask;
1511  return (iscompr, v);
1512 }
1513 
1514 Chain.lookup(c: self ref Chain, d: array of byte, l: list of (int, big)): list of (int, big)
1515 {
1516  for(i := find(c, d); i < c.used && blockcmp(c, i, d) == 0; i++)
1517  l = c.getaddr(i)::l;
1518  return l;
1519 }
1520 
1521 lookup0(c: ref Chain, d: array of byte, l: list of (int, big)): list of (int, big)
1522 {
1523  for(i := find0(c, d); i < c.used && blockcmp(c, i, d) == 0; i++)
1524  l = c.getaddr(i)::l;
1525  return l;
1526 }
1527 
1528 Chain.insert(c: self ref Chain, ih: ref Ihdr)
1529 {
1530  d := array[membytes] of byte;
1531  mkentry(ih, d);
1532  i := find(c, d[len d-(scorebytes+1):]);
1533  c.d[(i+1)*membytes:] = c.d[i*membytes:c.used*membytes];
1534  c.d[i*membytes:] = d;
1535  c.used++;
1536 }
1537 
1538 Chain.mk(): ref Chain
1539 {
1540  return ref Chain(array[chainblocks*membytes] of byte, 0, nil);
1541 }
1542 
1543 sha1(a: array of byte): array of byte
1544 {
1545  r := array[keyring->SHA1dlen] of byte;
1546  keyring->sha1(a, len a, r, nil);
1547  return r;
1548 }
1549 
1550 scorestr(d: array of byte): string
1551 {
1552  s := "";
1553  for(i := 0; i < len d; i++)
1554  s += sprint("%02x", int d[i]);
1555  return s;
1556 }
1557 
1558 
1559 Fifo.lookup(l: self ref Fifo, s: Score, dtype: int): array of byte
1560 {
1561  for(i := 0; i < l.n; i++) {
1562  b := l.a[i];
1563  if(b.dtype == dtype && b.score.eq(s))
1564  return b.d;
1565  }
1566  return nil;
1567 }
1568 
1569 Fifo.insert(l: self ref Fifo, b: ref Block)
1570 {
1571  l.a[l.f] = b;
1572  l.f = (l.f+1)%len l.a;
1573  if(l.n < len l.a)
1574  l.n++;
1575 }
1576 
1577 
1578 Queue.lookup(q: self ref Queue, s: Score, dtype: int): array of byte
1579 {
1580  for(i := 0; i < q.n; i++)
1581  if(q.a[i].dtype == dtype && q.a[i].score.eq(s))
1582  return q.a[i].d;
1583  return nil;
1584 }
1585 
1586 Queue.insert(q: self ref Queue, b: ref Block)
1587 {
1588  if(q.n >= len q.a) {
1589  newa := array[2*len q.a] of ref Block;
1590  newa[:] = q.a;
1591  q.a = newa;
1592  }
1593  q.a[q.n++] = b;
1594 }
1595 
1596 Queue.remove(q: self ref Queue, b: ref Block)
1597 {
1598  for(i := 0; i < q.n; i++) {
1599  if(q.a[i] == b) {
1600  q.a[i:] = q.a[i+1:q.n];
1601  q.a[--q.n] = nil;
1602  return;
1603  }
1604  }
1605 fail("did not remove anything");
1606 }
1607 
1608 queuelookup(score: Score, dtype: int): array of byte
1609 {
1610  d := writequeue.lookup(score, dtype);
1611  if(d == nil)
1612  d = flatequeue.lookup(score, dtype);
1613  return d;
1614 }
1615 
1616 head(d: array of byte): int
1617 {
1618  return ((int d[0]<<0)|(int d[1]<<8)|(int d[2]<<16)) % len heads;
1619 }
1620 
1621 chain(d: array of byte): ref Chain
1622 {
1623  c := heads[h := head(d)];
1624  if(c == nil)
1625  c = heads[h] = Chain.mk();
1626  while(c.next != nil)
1627  c = c.next;
1628  if(c.used >= chainblocks) {
1629  c.next = Chain.mk();
1630  c = c.next;
1631  }
1632  return c;
1633 }
1634 
1635 meminsert(ih: ref Ihdr)
1636 {
1637  if(debug) say(sprint("insert: adding halfscore=%s type=%d offset=%bd compressed=%d", scorestr(ih.halfscore), ih.dtype, ih.offset, ih.compressed));
1638  chain(ih.halfscore).insert(ih);
1639 }
1640 
1641 mkentry(ih: ref Ihdr, d: array of byte)
1642 {
1643  mkmem(ih.halfscore, ih.dtype, d[len d-(scorebytes+1):]);
1644  o := 0;
1645  offset := ih.offset;
1646  if(ih.compressed)
1647  offset |= comprmask;
1648  for(b := addrbits; b >= 8; b-= 8)
1649  d[o++] = byte (offset>>(b-8));
1650  if(b > 0)
1651  d[o++] |= byte (offset<<(8-b));
1652 }
1653 
1654 mkmem(a: array of byte, dtype: int, d: array of byte)
1655 {
1656  d[:] = a[headbytes:headbytes+scorebytes];
1657  d[0] &= scorebytemask;
1658  d[len d-1] = byte dtype;
1659 }
1660 
1661 memlookup(score: Score, dtype: int): list of (int, big)
1662 {
1663  if(debug) say(sprint("lookup: looking for score=%s type=%d", score.text(), dtype));
1664 
1665  mkmem(score.a, dtype, d := array[scorebytes+1] of byte);
1666  addrs: list of (int, big);
1667  for(c := heads[head(score.a)]; c != nil; c = c.next)
1668  addrs = c.lookup(d, addrs);
1669  if(len addrs > 1)
1670  lookupcollisions += len addrs-1;
1671  return addrs;
1672 }
1673 
1674 memlookup0(score: Score, dtype: int): list of (int, big)
1675 {
1676  if(debug) say(sprint("lookup: looking for score=%s type=%d", score.text(), dtype));
1677 
1678  mkmem(score.a, dtype, d := array[scorebytes+1] of byte);
1679  addrs: list of (int, big);
1680  for(c := heads[head(score.a)]; c != nil; c = c.next)
1681  addrs = lookup0(c, d, addrs);
1682  return addrs;
1683 }
1684 
1685 preadn(fd: ref Sys->FD, d: array of byte, want: int, offset: big): int
1686 {
1687  have := 0;
1688  while(want - have > 0) {
1689  n := sys->pread(fd, d[have:], want-have, offset+big have);
1690  if(n < 0)
1691  return -1;
1692  if(n == 0)
1693  break;
1694  have += n;
1695  }
1696  return have;
1697 }
1698 
1699 get16(d: array of byte, i: int): int
1700 {
1701  return (int d[i]<<8) | (int d[i+1]<<0);
1702 }
1703 
1704 get32(d: array of byte, i: int): big
1705 {
1706  return (big get16(d, i)<<16)|big get16(d, i+2);
1707 }
1708 
1709 get48(d: array of byte, i: int): big
1710 {
1711  return (big get16(d, i)<<32)|get32(d, i+2);
1712 }
1713 
1714 put16(d: array of byte, i: int, v: int)
1715 {
1716  d[i+0] = byte (v>>8);
1717  d[i+1] = byte (v>>0);
1718 }
1719 
1720 put32(d: array of byte, i: int, v: big)
1721 {
1722  put16(d, i+0, int (v>>16));
1723  put16(d, i+2, int (v>>0));
1724 }
1725 
1726 put48(d: array of byte, i: int, v: big)
1727 {
1728  put16(d, i+0, int(v>>32));
1729  put32(d, i+2, v>>0);
1730 }
1731 
1732 Ihdr.unpack(d: array of byte): ref Ihdr
1733 {
1734  o := 0;
1735  halfscore := d[o:o+Indexscoresize];
1736  o += Indexscoresize;
1737  dtype := int d[o];
1738  o += 1;
1739  offset := get48(d, o);
1740  compressed := 0;
1741  if((offset&Icomprmask) == Icomprmask)
1742  compressed = 1;
1743  offset &= ~Icomprmask;
1744  o += 6;
1745  if(o != Ihdrsize)
1746  fail("bad iheader.unpack");
1747  return ref Ihdr(halfscore, dtype, offset, compressed);
1748 }
1749 
1750 Ihdr.pack(ih: self ref Ihdr, d: array of byte)
1751 {
1752  o := 0;
1753  d[o:] = ih.halfscore;
1754  o += len ih.halfscore;
1755  d[o] = byte ih.dtype;
1756  o += 1;
1757  offset := ih.offset;
1758  if(ih.compressed)
1759  offset |= Icomprmask;
1760  put48(d, o, offset);
1761  o += 6;
1762  if(o != Ihdrsize)
1763  fail("bad iheader.pack");
1764 }
1765 
1766 Dhdr.unpack(d: array of byte): (ref Dhdr, string)
1767 {
1768  o := 0;
1769  if(get32(d, o) != Dhdrmagic)
1770  return (nil, "bad dhdr magic");
1771  o += 4;
1772  score := d[o:o+Scoresize];
1773  o += Scoresize;
1774  dtype := int d[o];
1775  o += 1;
1776  size := get16(d, o);
1777  o += 2;
1778  conntime := get32(d, o);
1779  o += 4;
1780  if(o != Dhdrsize)
1781  fail("bad dheader.unpack");
1782  return (ref Dhdr(Score(score), dtype, size, conntime), nil);
1783 }
1784 
1785 Dhdr.pack(dh: self ref Dhdr, d: array of byte)
1786 {
1787  o := 0;
1788  put32(d, o, Dhdrmagic);
1789  o += 4;
1790  d[o:] = dh.score.a;
1791  o += Scoresize;
1792  d[o] = byte dh.dtype;
1793  o += 1;
1794  put16(d, o, dh.size);
1795  o += 2;
1796  put32(d, o, dh.conntime);
1797  o += 4;
1798  if(o != Dhdrsize)
1799  fail("bad dheader.pack");
1800 }
1801 
1802 
1803 Fhdr.unpack(d: array of byte): (ref Fhdr, array of int, string)
1804 {
1805  o := 0;
1806  if(get32(d, o) != Fhdrmagic)
1807  return (nil, nil, "bad fhdr magic");
1808  o += 4;
1809  nb := int d[o++];
1810  hsize := nb*Fbhdrsize;
1811  if(o+hsize > len d)
1812  return (nil, nil, "header points outside buffer");
1813  dsize := get16(d, o);
1814  o += 2;
1815 
1816  offset := 0;
1817  b := array[nb] of ref Dhdr;
1818  offsets := array[nb] of int;
1819  for(i := 0; i < nb; i++) {
1820  score := d[o:o+Scoresize];
1821  o += Scoresize;
1822  dtype := int d[o];
1823  o += 1;
1824  size := get16(d, o);
1825  o += 2;
1826  conntime := get32(d, o);
1827  o += 4;
1828  b[i] = ref Dhdr(Score(score), dtype, size, conntime);
1829  offsets[i] = offset;
1830  offset += size;
1831  }
1832  return (ref Fhdr(b, hsize, dsize), offsets, nil);
1833 }
1834 
1835 Fhdr.pack(fh: self ref Fhdr, d: array of byte)
1836 {
1837  o := 0;
1838  put32(d, o, Fhdrmagic);
1839  o += 4;
1840  d[o] = byte len fh.blocks;
1841  o += 1;
1842  put16(d, o, fh.dsize);
1843  o += 2;
1844  for(i := 0; i < len fh.blocks; i++) {
1845  b := fh.blocks[i];
1846  d[o:] = b.score.a;
1847  o += Scoresize;
1848  d[o] = byte b.dtype;
1849  o += 1;
1850  put16(d, o, b.size);
1851  o += 2;
1852  put32(d, o, b.conntime);
1853  o += 4;
1854  }
1855  if(o != Fhdrsize+len fh.blocks*Fbhdrsize)
1856  fail("bad fhdr.pack");
1857 }
1858 
1859 mfd: ref Sys->FD;
1860 Current, Max: con iota;
1861 
1862 heapinfo(which: int): big
1863 {
1864  if(mfd == nil)
1865  mfd = sys->open("/dev/memory", sys->OREAD);
1866  if(mfd == nil)
1867  fail(sprint("open /dev/memory: %r"));
1868  sys->seek(mfd, big 0, Sys->SEEKSTART);
1869 
1870  buf := array[400] of byte;
1871  n := sys->read(mfd, buf, len buf);
1872  if(n <= 0)
1873  fail(sprint("reading /dev/memory: %r"));
1874 
1875  (nil, l) := sys->tokenize(string buf[0:n], "\n");
1876  for(; l != nil; l = tl l)
1877  if((hd l)[7*12:] == "heap" && which == Current)
1878  return big ((hd l)[0:12]);
1879  else if ((hd l)[7*12:] == "heap" && which == Max)
1880  return big ((hd l)[12:24]);
1881  fail("missing heap line in /dev/memory");
1882  return big 0;
1883 }
1884 
1885 heapmax(): big
1886 {
1887  return heapinfo(Max);
1888 }
1889 
1890 heapused(): big
1891 {
1892  return heapinfo(Current);
1893 }
1894 
1895 suffix(s: string): big
1896 {
1897  l := array[] of {'k', 'm', 'g', 't', 'p'};
1898  mult := big 1;
1899  if(s == nil)
1900  return big 0;
1901  s = str->tolower(s);
1902  for(i := 0; i < len l; i++) {
1903  mult *= big 1024;
1904  if(s[len s-1] == l[i])
1905  return mult * big s[:len s-1];
1906  }
1907  return big s;
1908 }
1909 
1910 log2(v: big): int
1911 {
1912  for(bits := 0; (big 1<<bits) < v; bits++)
1913  ;
1914  return bits;
1915 }
1916 
1917 fail(s: string)
1918 {
1919  fprint(fildes(2), "%s\n", s);
1920  raise "fail:"+s;
1921 }
1922 
1923 say(s: string)
1924 {
1925  fprint(fildes(2), "%s\n", s);
1926 }