changelog shortlog tags branches changeset files revisions annotate raw help

Mercurial > hg > ventivac / appl/cmd/ventisrv.b

changeset 129: 636d77883598
parent: 8fd6132b12be
author: Mechiel Lukkien <mechiel@ueber.net>
date: Mon, 20 Aug 2007 14:24:44 +0200
permissions: -rw-r--r--
description: add option -I indexoffset to ventisrv. this allows it to start verifying the index file at the given offset. can be used to make it find and diagnose errors in the index/data file.
1 # ventisrv stores a part of each stored score in memory. at the
2 # command-line, the maximum data file size and mean block size (thus
3 # expected number of scores) is given. the scores are assumed to be
4 # distributed evenly, so the number of bits of each score to keep in memory
5 # in order to make the probability of a collision <0.001 is calculated.
6 # now all stored scores are read from the index file and partially stored
7 # in main memory.
8 
9 # a venti block read can result in 0 hits (data not present) or 1 or
10 # more hits. for each of the hits, the full score has to be read from
11 # disk until a match is found (in which case the data will be returned and
12 # further reading stopped). it is possible none of the in-memory "hits"
13 # are a hit in the data file.
14 
15 # a venti block write will perform the same steps as the read. if the
16 # data is not present, the header+data is written to the data file,
17 # another header is written to the index file.
18 
19 # at startup, the index file and data file are checked. if headers
20 # are missing from the index file, they are synced with headers from the
21 # data file. missing headers in the data file are errors and ventisrv
22 # will stop with a dianostic message.
23 
24 # possible improvements:
25 # - speedup syncing index file from data file at startup?
26 # - queueing index writes may help when filesystem does synchronous writes
27 # - make startup faster by reading partial scores into memory more
28 # efficiently: insert non-sorted at startup, when all has been read,
29 # sort the lists. faster than inserting each block sorted.
30 
31 implement Ventisrv;
32 
33 include "sys.m";
34 include "draw.m";
35 include "arg.m";
36 include "daytime.m";
37 include "string.m";
38 include "keyring.m";
39 include "filter.m";
40 include "lock.m";
41 include "venti.m";
42 
43 sys: Sys;
44 daytime: Daytime;
45 str: String;
46 keyring: Keyring;
47 venti: Venti;
48 deflate, inflate: Filter;
49 lock: Lock;
50 
51 sprint, fprint, print, fildes: import sys;
52 Score, Scoresize, Vmsg: import venti;
53 Semaphore: import lock;
54 
55 
56 Ventisrv: module {
57  init: fn(nil: ref Draw->Context, args: list of string);
58 };
59 
60 Ihdr: adt {
61  halfscore: array of byte;
62  dtype: int;
63  offset: big;
64  compressed: int;
65 
66  unpack: fn(d: array of byte): ref Ihdr;
67  pack: fn(ih: self ref Ihdr, d: array of byte);
68 };
69 
70 Dhdr: adt {
71  score: Score;
72  dtype: int;
73  size: int;
74  conntime: big;
75 
76  unpack: fn(d: array of byte): (ref Dhdr, string);
77  pack: fn(dh: self ref Dhdr, d: array of byte);
78 };
79 
80 Chain: adt {
81  d: array of byte;
82  used: int;
83  next: cyclic ref Chain;
84 
85  mk: fn(): ref Chain;
86  getaddr: fn(c: self ref Chain, i: int): (int, big);
87  lookup: fn(c: self ref Chain, d: array of byte, l: list of (int, big)): list of (int, big);
88  insert: fn(c: self ref Chain, ih: ref Ihdr);
89 };
90 
91 Client: adt {
92  rpid, wpid: int;
93  respc: chan of ref Vmsg;
94  inuse: int;
95  conntime: big;
96  readonly: int;
97 };
98 
99 Lookup: adt {
100  c: ref Client;
101  tid: int;
102  addrs: list of (int, big);
103  pick {
104  Read =>
105  score: Score;
106  dtype: int;
107  size: int;
108  Write =>
109  b: ref Block;
110  }
111 };
112 
113 Store: adt {
114  c: ref Client;
115  pick {
116  Sync =>
117  tid: int;
118  Write =>
119  b: ref Block;
120  }
121 };
122 
123 Block: adt
124 {
125  score: Score;
126  dtype: int;
127  d: array of byte;
128 };
129 
130 Fhdr: adt {
131  blocks: array of ref Dhdr;
132  hsize: int;
133  dsize: int;
134 
135  unpack: fn(d: array of byte): (ref Fhdr, array of int, string);
136  pack: fn(fh: self ref Fhdr, d: array of byte);
137 };
138 
139 Queue: adt
140 {
141  a: array of ref Block;
142  n: int;
143 
144  lookup: fn(q: self ref Queue, s: Score, dtype: int): array of byte;
145  insert: fn(q: self ref Queue, b: ref Block);
146  remove: fn(q: self ref Queue, b: ref Block);
147 };
148 
149 Fifo: adt
150 {
151  a: array of ref Block;
152  n, f: int;
153  sem: ref Semaphore;
154 
155  lookup: fn(l: self ref Fifo, s: Score, dtype: int): array of byte;
156  insert: fn(l: self ref Fifo, b: ref Block);
157 };
158 
159 Flate: adt
160 {
161  rqc: chan of ref Filter->Rq;
162  lastfill: ref Filter->Rq.Fill;
163  d: array of byte;
164  nd: int;
165  pid: int;
166 
167  new: fn(n: int): ref Flate;
168  write: fn(f: self ref Flate, d: array of byte): string;
169  finish: fn(f: self ref Flate): (array of byte, string);
170 };
171 
172 Eperm: con "permission denied";
173 
174 Indexscoresize: con 8;
175 Icomprmask: con big 1<<(6*8-1);
176 Ihdrsize: con Indexscoresize+1+6; # scoresize+typesize+addrsize
177 Iverify: con 128; # check the half scores of the last n entries in the index file against the data file
178 Ichunksize: con ((128*1024)/Ihdrsize)*Ihdrsize; # chunk in bytes of index entries to read at a time
179 Nqueuechunks: con 32; # queue n Ichunksize blocks when reading index at startup
180 Dhdrmagic: con big 16r2f9d81e5;
181 Fhdrmagic: con big 16r78c66a15;
182 Dhdrsize: con 4+Scoresize+1+2+4; # magic+score+type+size+conntime
183 Fhdrsize: con 4+1+2; # magic+count+size
184 Fbhdrsize: con 20+1+2+4; # score+type+size+conntime
185 Maxreaders: con 32;
186 Maxblocksize: con Dhdrsize+Venti->Maxlumpsize;
187 
188 typebits: con 8; # must be 8
189 
190 chainblocks, headbits, nheads: int;
191 scorebits, addrbits: int;
192 comprmask: big;
193 membytes, headbytes, scorebytes: int;
194 scorebytemask: byte;
195 maxdatasize: big;
196 
197 Arraysize: con 4+4+4;
198 Refsize: con 4;
199 
200 listenaddr: con "net!*!venti";
201 indexfile := "index";
202 datafile := "data";
203 statsdir := "/chan/";
204 statsfile := "ventisrvstats";
205 debug, Cflag, cflag, qflag, verbose: int;
206 verifyoffset := big -1;
207 raddrs, waddrs: list of string;
208 
209 indexfd, datafd: ref Sys->FD;
210 indexsize, datasize: big;
211 
212 zeroscore: Score;
213 
214 
215 heads: array of ref Chain;
216 reqc: chan of (ref Vmsg, ref Client);
217 wrotec: chan of (int, ref Client);
218 lookupc: chan of ref Lookup;
219 lookupdonec: chan of (int, ref Vmsg, ref Client);
220 syncdonec: chan of (ref Vmsg, ref Client);
221 storec: chan of ref Store;
222 writererrorc: chan of string;
223 sem: ref Semaphore;
224 statsem: ref Semaphore;
225 
226 writequeue: ref Queue;
227 flatequeue: ref Fifo;
228 
229 initheap, configheap: big;
230 nreads, nwrites, nwritesdup, nsyncs, npings: int;
231 nblocks, nflateblocks: int;
232 lookupcollisions: int;
233 
234 
235 init(nil: ref Draw->Context, args: list of string)
236 {
237  sys = load Sys Sys->PATH;
238  arg := load Arg Arg->PATH;
239  daytime = load Daytime Daytime->PATH;
240  str = load String String->PATH;
241  keyring = load Keyring Keyring->PATH;
242  lock = load Lock Lock->PATH;
243  lock->init();
244  deflate = load Filter Filter->DEFLATEPATH;
245  inflate = load Filter Filter->INFLATEPATH;
246  deflate->init();
247  inflate->init();
248  venti = load Venti Venti->PATH;
249  venti->init();
250 
251  arg->init(args);
252  arg->setusage(arg->progname()+ " [-DcCqv] [-I indexoffset] [-i index] [-d data] [-s statsfile] [-r addr] [-w addr] maxdatasize meanblocksize");
253  while((c := arg->opt()) != 0)
254  case c {
255  'D' => debug++;
256  'I' => verifyoffset = big arg->earg();
257  'c' => cflag++;
258  Cflag++;
259  'C' => Cflag++;
260  'd' => datafile = arg->earg();
261  'i' => indexfile = arg->earg();
262  'q' => qflag++;
263  verbose++;
264  'v' => verbose++;
265  's' => (statsdir, statsfile) = str->splitstrr(arg->earg(), "/");
266  if(statsfile == nil) {
267  fprint(fildes(2), "bad stats file");
268  arg->usage();
269  }
270  'r' => raddrs = arg->earg()::raddrs;
271  'w' => waddrs = arg->earg()::waddrs;
272  * =>
273  fprint(fildes(2), "bad option: -%c\n", c);
274  arg->usage();
275  }
276 
277  args = arg->argv();
278  verbose += debug;
279  if(len args != 2)
280  arg->usage();
281 
282  if(raddrs == nil && waddrs == nil)
283  waddrs = listenaddr::nil;
284 
285  initheap = heapused();
286 
287  maxdatasize = suffix(hd args);
288  meanblocksize := int suffix(hd tl args);
289  maxblocks := maxdatasize/big (meanblocksize+Dhdrsize);
290 
291  addrbits = log2(maxdatasize);
292  if(Cflag)
293  addrbits += 1;
294  if(addrbits > 48)
295  fail("maxdatasize too large");
296  comprmask = (big 1<<(addrbits-1));
297  headbits = log2(maxblocks/big 1000);
298  minscorebits := log2(maxblocks)+log2(big 1000);
299  totalbits := ((addrbits+typebits+minscorebits-headbits+8-1)/8)*8;
300  scorebits = totalbits-addrbits-typebits;
301 
302  chainblocks = 256;
303  membytes = (typebits+scorebits+addrbits+8-1)/8;
304  headbytes = (headbits+8-1)/8;
305  scorebytes = (scorebits+8-1)/8;
306  scorebytemask = byte ((1<<(scorebits % 8)) - 1);
307 
308  zeroscore = Score.zero();
309  nheads = 1<<headbits;
310  heads = array[nheads] of ref Chain;
311  reqc = chan[256] of (ref Vmsg, ref Client);
312  wrotec = chan of (int, ref Client);
313  lookupc = chan of ref Lookup;
314  lookupdonec = chan[1] of (int, ref Vmsg, ref Client);
315  syncdonec = chan[1] of (ref Vmsg, ref Client);
316  storec = chan[32] of ref Store;
317  writererrorc = chan of string;
318  sem = Semaphore.new();
319  statsem = Semaphore.new();
320 
321  writequeue = ref Queue(array[2] of ref Block, 0);
322  flatequeue = ref Fifo(array[64] of ref Block, 0, 0, Semaphore.new());
323 
324  chainsize := Arraysize+4+Refsize + chainblocks*membytes; # this probably takes more overhead, internal memory allocation at least
325  blocksperhead := int (maxblocks/big nheads);
326  chainsperhead := (blocksperhead+chainblocks-1)/chainblocks;
327  totalchains := chainsperhead*nheads;
328  headsize := Arraysize+nheads*Refsize;
329  maxwasted := nheads*chainsize;
330  maxmemusage := big headsize+big chainsize*big totalchains;
331  max := heapmax();
332  if(maxmemusage > max)
333  say(sprint("maximum memory usage (%bd bytes) larger than available memory (%bd bytes)", maxmemusage, max));
334  if(debug) {
335  say(sprint("typebits=%d scorebits=%d addrbits=%d headbits=%d",
336  typebits, scorebits, addrbits, headbits));
337  say(sprint("nheads=%d membytes=%d headbytes=%d scorebytes=%d scorebytemask=%d",
338  nheads, membytes, headbytes, scorebytes, int scorebytemask));
339  say(sprint("headsize=%d chainsize=%d blocksperhead=%d totalchains=%d maxwasted=%d",
340  headsize, chainsize, blocksperhead, totalchains, maxwasted));
341  }
342  if(verbose)
343  say(sprint("maxmemusage=%bd maxblocks=%bd", maxmemusage, maxblocks));
344  if(qflag)
345  return;
346 
347  conns: list of (Sys->Connection, int);
348  conns = announce(raddrs, 1, conns);
349  conns = announce(waddrs, 0, conns);
350  config();
351  listen(conns);
352  main();
353 }
354 
355 announce(addrs: list of string, readonly: int, conns: list of (Sys->Connection, int)): list of (Sys->Connection, int)
356 {
357  for(l := addrs; l != nil; l = tl l) {
358  (ok, conn) := sys->announce(hd l);
359  if(ok < 0)
360  fail(sprint("announce %s: %r", hd l));
361  conns = (conn, readonly)::conns;
362  }
363  return conns;
364 }
365 
366 listen(conns: list of (Sys->Connection, int))
367 {
368  for(l := conns; l != nil; l = tl l) {
369  (conn, readonly) := hd l;
370  spawn listener(conn, readonly);
371  if(debug) say("listener spawned");
372  }
373 }
374 
375 listener(aconn: Sys->Connection, readonly: int)
376 {
377  for(;;) {
378  if(debug) say("listener: listening");
379  (ok, conn) := sys->listen(aconn);
380  if(ok < 0)
381  fail(sprint("listen: %r"));
382  if(debug) say("listener: have client");
383  dfd := sys->open(conn.dir+"/data", sys->ORDWR);
384  if(dfd == nil) {
385  if(verbose) say(sprint("opening connection data file: %r"));
386  } else
387  spawn client(dfd, readonly);
388  }
389 }
390 
391 filesize(fd: ref Sys->FD): big
392 {
393  (ok, d) := sys->fstat(fd);
394  if(ok != 0)
395  fail(sprint("fstat on index or data file: %r"));
396  return d.length;
397 }
398 
399 config()
400 {
401  indexfd = sys->open(indexfile, sys->ORDWR);
402  if(indexfd == nil)
403  fail(sprint("opening %s: %r", indexfile));
404  datafd = sys->open(datafile, sys->ORDWR);
405  if(datafd == nil)
406  fail(sprint("opening %s: %r", datafile));
407 
408  indexsize = filesize(indexfd);
409  datasize = filesize(datafd);
410  if(indexsize % big Ihdrsize != big 0)
411  fail(sprint("index file not multiple of iheadersize (indexsize=%bd iheadersize=%d)", indexsize, Ihdrsize));
412 
413  doffset := big 0;
414  io := verifyoffset;
415  if(io < big 0) {
416  io = indexsize-big (Iverify*Ihdrsize);
417  if(io < big 0)
418  io = big 0;
419  } else if(io > indexsize)
420  fail(sprint("index file offset to verify at lies outsize index file"));
421  else
422  if(verbose) say(sprint("starting index file verification at offset=%bd", io));
423 
424  if(indexsize > big 0) {
425  if(debug) say(sprint("config: verifying last entries in index file at offset=%bd", io));
426  ih := getihdr(io);
427  if(ih == nil)
428  fail(sprint("premature eof on index file at offset=%bd", io));
429  while(ih.compressed) {
430  io += big Ihdrsize;
431  nih := getihdr(io);
432  if(nih.offset != ih.offset)
433  break;
434  ih = nih;
435  }
436  while(io < indexsize) {
437  (ndoffset, nio) := indexverify(io);
438  if(nio == big -1) {
439  dir := sys->nulldir;
440  dir.length = io;
441  if(sys->fwstat(indexfd, dir) != 0)
442  fail(sprint("truncating index file to remove trailing entries for compressed blocks: %r"));
443  indexsize = io;
444  break;
445  }
446  (doffset, io) = (ndoffset, nio);
447  }
448  }
449 
450  if(debug) say("config: syncing index file to data file");
451  nadd := ncompr := 0;
452  while(doffset < datasize) {
453  (dhdrs, compr, noffset, err) := offsetread(doffset);
454  if(err != nil)
455  fail(sprint("syncing index from data at offset=%bd: %s", doffset, err));
456  for(i := 0; i < len dhdrs; i++) {
457  dhdr := dhdrs[i];
458  err = indexstore(ref Ihdr(dhdr.score.a[:Indexscoresize], dhdr.dtype, doffset, compr));
459  if(err != nil)
460  fail("syncing index from data: "+err);
461  nadd++;
462  }
463  if(compr)
464  ncompr = len dhdrs;
465  doffset = noffset;
466  }
467  if(verbose) say(sprint("config: added %d entries to index file of which %d compressed", nadd, ncompr));
468 
469  if(debug) say("config: reading entries into memory");
470  indexsize = filesize(indexfd);
471  t0 := sys->millisec();
472 
473  blockc := chan[Nqueuechunks] of (array of byte, big);
474  donec := chan of int;
475  spawn indexunpack(blockc, donec);
476  for(o := big 0; o < indexsize; o += big Ichunksize) {
477  d := array[Ichunksize] of byte;
478  n := preadn(indexfd, d, len d, o);
479  if(n < 0 || n % Ihdrsize != 0) {
480  if(n > 0)
481  sys->werrstr("bytes read not multiple of iheadersize");
482  fail(sprint("reading index entries chunk at offset=%bd: %r", o));
483  }
484  if(n > 0)
485  blockc<- = (d[:n], o);
486  if(n < Ichunksize)
487  break;
488  }
489  blockc<- = (nil, big 0);
490  <-donec;
491  t1 := sys->millisec();
492  nblocks = int (indexsize/big Ihdrsize);
493 
494  if(verbose) say(sprint("config: done, loaded %d entries (%bd bytes) of which %d compressed in memory in %0.2f s",
495  nblocks, indexsize, nflateblocks, real (t1-t0)/1000.0));
496 }
497 
498 checkheaders(ih: ref Ihdr, dh: ref Dhdr)
499 {
500  if(halfcmp(ih.halfscore, dh.score.a, ih.dtype, dh.dtype) == 0)
501  return;
502  fail(sprint("partial score or type index file does not match block in data file at offset=%bd, index: score=%s type=%d compressed=%d, data: score=%s type=%d",
503  ih.offset, scorestr(ih.halfscore), ih.dtype, ih.compressed, dh.score.text(), dh.dtype));
504 }
505 
506 getihdr(o: big): ref Ihdr
507 {
508  d := array[Ihdrsize] of byte;
509  n := preadn(indexfd, d, len d, o);
510  if(n < 0)
511  fail(sprint("reading at offset=%bd: %r", o));
512  if(n == 0)
513  return nil;
514  if(n != len d)
515  fail(sprint("short read on index file at offset=%bd", o));
516  ih := Ihdr.unpack(d);
517  if(ih.offset > datasize)
518  fail(sprint("index entry at offset=%bd points past end of data file (offset=%bd)", o, ih.offset));
519  return ih;
520 }
521 
522 indexverify(o: big): (big, big)
523 {
524  ih := getihdr(o);
525  (dhs, nil, noffset, berr) := offsetread(ih.offset);
526  if(berr != nil)
527  fail("reading block pointed to by one of last entries in index file: "+berr);
528  checkheaders(ih, dhs[0]);
529  o += big Ihdrsize;
530  for(i := 1; i < len dhs; i++) {
531  ih = getihdr(o);
532  if(ih == nil)
533  return (big -1, big -1);
534  checkheaders(ih, dhs[i]);
535  o += big Ihdrsize;
536  }
537  return (noffset, o);
538 }
539 
540 indexunpack(blockc: chan of (array of byte, big), donec: chan of int)
541 {
542  for(;;) {
543  (d, o) := <-blockc;
544  if(d == nil)
545  break;
546 
547  for(i := 0; i < len d; i += Ihdrsize) {
548  ih := Ihdr.unpack(d[i:i+Ihdrsize]);
549  meminsert(ih);
550  if(ih.compressed) {
551  if(!Cflag)
552  fail(sprint("compressed block at index file offset=%bd but -C not specified", o));
553  nflateblocks++;
554  }
555  }
556  }
557  donec<- = 0;
558 }
559 
560 killc(c: ref Client)
561 {
562  killpid(c.rpid);
563  killpid(c.wpid);
564  c.rpid = c.wpid = -1;
565 }
566 
567 killpid(pid: int)
568 {
569  fd := sys->open("/prog/"+string pid+"/ctl", sys->OWRITE);
570  if(fd != nil)
571  sys->fprint(fd, "kill\n");
572 }
573 
574 decompress(src, dst: array of byte): string
575 {
576 if(debug) say(sprint("decompress len src=%d len dst=%d", len src, len dst));
577  rqc := inflate->start("");
578  <-rqc;
579 
580 done:
581  for(;;) {
582  pick r := <-rqc {
583  Fill =>
584  n := len src;
585  if(len r.buf < n)
586  n = len r.buf;
587  r.buf[:] = src[:n];
588  src = src[n:];
589  r.reply<- = n;
590  Result =>
591  if(len r.buf > len dst) {
592  r.reply<- = -1;
593  return "inflated data too large";
594  }
595  dst[:] = r.buf;
596  dst = dst[len r.buf:];
597  r.reply<- = 0;
598  Finished =>
599  if(len r.buf != 0)
600  return "leftover data at end of inflate";
601  break done;
602  Error =>
603  return r.e;
604  }
605  }
606  if(len dst != 0)
607  return "inflated data smaller than expected";
608  return nil;
609 }
610 
611 
612 flateadd(f: ref Flate, d: array of byte)
613 {
614  if(f.nd+len d <= len f.d)
615  f.d[f.nd:] = d;
616  f.nd += len d;
617 }
618 
619 Flate.new(n: int): ref Flate
620 {
621  f := ref Flate(deflate->start(""), nil, array[n] of byte, 0, 0);
622  pick r := <-f.rqc {
623  Start => f.pid = r.pid;
624  * => fail("bad first message from deflate");
625  }
626  pick r := <-f.rqc {
627  Fill => f.lastfill = r;
628  * => fail("bad second message from deflate");
629  }
630  return f;
631 }
632 
633 Flate.write(f: self ref Flate, d: array of byte): string
634 {
635  if(f.nd > len d)
636  return nil;
637 
638  n := len d;
639  if(len f.lastfill.buf < n)
640  n = len f.lastfill.buf;
641  f.lastfill.buf[:] = d[:n];
642  f.lastfill.reply<- = n;
643  f.lastfill = nil;
644 
645  for(;;) {
646  pick r := <-f.rqc {
647  Fill =>
648  f.lastfill = r;
649  return nil;
650  Result =>
651  flateadd(f, r.buf);
652  Error =>
653  return r.e;
654  * => fail("bad rq from deflate");
655  }
656  }
657  return nil;
658 }
659 
660 Flate.finish(f: self ref Flate): (array of byte, string)
661 {
662  if(f.nd > len f.d) {
663  killpid(f.pid);
664  return (nil, nil);
665  }
666  f.lastfill.reply<- = 0;
667 done:
668  for(;;) {
669  pick r := <-f.rqc {
670  Result =>
671  flateadd(f, r.buf);
672  r.reply<- = 0;
673  Finished =>
674  if(len r.buf != 0)
675  fail("deflate left data uncompressed");
676  break done;
677  Error =>
678  return (nil, r.e);
679  * =>
680  fail("bad response from deflate");
681  }
682  }
683  if(f.nd > len f.d)
684  return (nil, nil);
685  return (f.d[:f.nd], nil);
686 }
687 
688 reader()
689 {
690  for(;;) {
691  pick w := <-lookupc {
692  Read =>
693  if(debug) say("reader: have read message");
694 
695  (nil, d, err) := datalookup(w.addrs, w.score, w.dtype, 1, w.size);
696  if(len d > w.size)
697  err = "data larger than requested";
698  if(err == nil && d == nil)
699  err = "no such score/type";
700  rmsg: ref Vmsg;
701  if(err == nil) {
702  rmsg = ref Vmsg.Rread(0, w.tid, d);
703  } else {
704  rmsg = ref Vmsg.Rerror(0, w.tid, err);
705  if(verbose) say("reader: error reading data: "+err);
706  }
707  if(debug) say("reader: read handled");
708  lookupdonec<- = (0, rmsg, w.c);
709 
710  Write =>
711  if(debug) say(sprint("reader: have write message"));
712 
713  (hit, nil, err) := datalookup(w.addrs, w.b.score, w.b.dtype, 0, len w.b.d);
714  if(err != nil) {
715  if(debug) say("reader: datalookup failed: "+err);
716  continue;
717  }
718  if(hit) {
719 if(debug) say("reader: have hit, already in datafile");
720  sem.obtain();
721  writequeue.remove(w.b);
722  sem.release();
723  statsem.obtain();
724  nwritesdup++;
725  statsem.release();
726  } else {
727 if(debug) say("reader: no hit, need to write block");
728  if(debug) say("reader: data not yet present for write");
729  storec<- = ref Store.Write(w.c, w.b);
730  }
731  if(debug) say("reader: write handled");
732  lookupdonec<- = (1, nil, w.c);
733  }
734  }
735 }
736 
737 flate: ref Flate;
738 Maxfblocks: con 256;
739 rawsize: int;
740 nfblocks: int;
741 fblocks: array of (ref Block, big);
742 
743 
744 flatefinish(): string
745 {
746 if(debug) say(sprint("flatefinish: rawsize=%d nfblocks=%d", rawsize, nfblocks));
747 
748  (d, err) := flate.finish();
749  if(err != nil)
750  return "flatefinish: "+err;
751 
752  if(d == nil || Fhdrsize+nfblocks*Fbhdrsize+len d > Maxblocksize) {
753  if(debug) say("flatefinish: data too large, writing as normal blocks");
754 
755  l := array[nfblocks] of (ref Ihdr, ref Block);
756  for(i := 0; i < nfblocks; i++) {
757  (b, conntime) := fblocks[i];
758  dh := ref Dhdr(b.score, b.dtype, len b.d, conntime);
759  offset: big;
760  (offset, err) = datastore(dh, b.d);
761  ih := ref Ihdr(b.score.a[:Indexscoresize], b.dtype, offset, 0);
762  if(err == nil)
763  err = indexstore(ih);
764  if(err != nil)
765  return err;
766  l[i] = (ih, b);
767  }
768  sem.obtain();
769  for(i = 0; i < nfblocks; i++) {
770  (ih, b) := l[i];
771  meminsert(ih);
772  writequeue.remove(b);
773  }
774  sem.release();
775  } else {
776  if(debug) say("flatefinish: writing deflated block");
777 
778  dhs := array[nfblocks] of ref Dhdr;
779  for(i := 0; i < nfblocks; i++) {
780  (b, conntime) := fblocks[i];
781  dhs[i] = ref Dhdr(b.score, b.dtype, len b.d, conntime);
782  }
783  offset: big;
784  (offset, err) = datastoreflate(dhs, d);
785  if(err != nil)
786  return err;
787  ihs := array[nfblocks] of ref Ihdr;
788  for(i = 0; i < nfblocks; i++) {
789  (b, nil) := fblocks[i];
790  ihs[i] = ref Ihdr(b.score.a[:Indexscoresize], b.dtype, offset, 1);
791  err = indexstore(ihs[i]);
792  if(err != nil)
793  return err;
794  }
795  sem.obtain();
796  for(i = 0; i < nfblocks; i++) {
797  (b, nil) := fblocks[i];
798  meminsert(ihs[i]);
799  writequeue.remove(b);
800  }
801  sem.release();
802 if(debug) say("flatefinish: flated block done");
803  }
804  for(i := 0; i < nfblocks; i++)
805  fblocks[i] = (nil, big 0);
806  nfblocks = 0;
807  rawsize = 0;
808  flate = Flate.new(Maxblocksize);
809 if(debug) say("flatefinish: done");
810  return nil;
811 }
812 
813 lasterr: string;
814 
815 seterror(s: string)
816 {
817  if(lasterr == nil || verbose)
818  say("write error: "+s);
819  if(lasterr == nil) {
820  lasterr = s;
821  writererrorc<- = s;
822  }
823 }
824 
825 writer()
826 {
827  flate = Flate.new(Maxblocksize);
828  fblocks = array[Maxfblocks] of (ref Block, big);
829 
830  for(;;) {
831  pick st := <-storec {
832  Write =>
833  if(lasterr != nil)
834  continue;
835  b := st.b;
836  if(cflag) {
837  max := Fhdrsize+(nfblocks+1)*Fbhdrsize+(rawsize+len b.d)*9/10;
838  if(nfblocks == Maxfblocks || max > Maxblocksize) {
839  err := flatefinish();
840  if(err != nil) {
841  seterror(err);
842  continue;
843  }
844  }
845  err := flate.write(b.d);
846  if(err != nil) {
847  seterror(err);
848  continue;
849  }
850  fblocks[nfblocks++] = (b, st.c.conntime);
851  rawsize += len b.d;
852 
853  } else {
854  dh := ref Dhdr(b.score, b.dtype, len b.d, st.c.conntime);
855  (offset, err) := datastore(dh, b.d);
856  ih := ref Ihdr(b.score.a[:Indexscoresize], b.dtype, offset, 0);
857  if(err == nil)
858  err = indexstore(ih);
859  if(err != nil) {
860  seterror(err);
861  continue;
862  }
863  sem.obtain();
864  meminsert(ih);
865  writequeue.remove(b);
866  sem.release();
867  if(debug) say("writer: data now on file");
868  }
869  Sync =>
870  if(lasterr != nil) {
871  if(st.c != nil)
872  syncdonec<- = (ref Vmsg.Rerror(0, st.tid, lasterr), st.c);
873  continue;
874  }
875  rmsg: ref Vmsg;
876  if(cflag && nfblocks > 0) {
877  err := flatefinish();
878  if(err != nil) {
879  err = "compress/write: "+err;
880  seterror(err);
881  rmsg = ref Vmsg.Rerror(0, st.tid, err);
882  if(st.c != nil)
883  syncdonec<- = (rmsg, st.c);
884  continue;
885  }
886  }
887  if(sys->fwstat(datafd, sys->nulldir) == 0) {
888  rmsg = ref Vmsg.Rsync(0, st.tid);
889  } else {
890  rmsg = ref Vmsg.Rerror(0, st.tid, sprint("syncing: %r"));
891  seterror(sprint("syncing: %r"));
892  }
893  if(st.c != nil)
894  syncdonec<- = (rmsg, st.c);
895  if(debug) say("writer: sync is done");
896  }
897  }
898 }
899 
900 nreaders, nbusyreaders, nreaderwrites: int;
901 
902 lookupsend(w: ref Lookup)
903 {
904  if(nbusyreaders >= nreaders && nreaders < Maxreaders) {
905  spawn reader();
906  nreaders++;
907  }
908  if(nbusyreaders >= nreaders) {
909  (iswrite, rm, rc) := <-lookupdonec;
910  lookupdone(iswrite, rm, rc);
911  }
912  lookupc<- = w;
913  nbusyreaders++;
914  if(tagof w == tagof Lookup.Write)
915  nreaderwrites++;
916  if(debug) say("main: message sent to reader");
917 }
918 
919 lookupdone(iswrite: int, rmsg: ref Vmsg, c: ref Client)
920 {
921  nbusyreaders--;
922  if(iswrite)
923  nreaderwrites--;
924  if(c.rpid != -1 && rmsg != nil)
925  c.respc<- = rmsg;
926  if(debug) say("main: rmsg from reader handled");
927 }
928 
929 takewrites()
930 {
931  while(nreaderwrites > 0) {
932  (iswrite, rm, rc) := <-lookupdonec;
933  lookupdone(iswrite, rm, rc);
934  }
935 }
936 
937 main()
938 {
939  writeerror: string;
940 
941  fio := sys->file2chan(statsdir, statsfile);
942  if(fio == nil) {
943  say(sprint("file2chan: %r; not serving statistics"));
944  fio = ref sys->FileIO(chan of (int, int, int, sys->Rread), chan of (int, array of byte, int, sys->Rwrite));
945  } else
946  if(debug) say(sprint("file2chan: serving %s%s", statsdir, statsfile));
947 
948  spawn writer();
949 
950  configheap = heapused();
951  if(verbose)
952  say(sprint("heap used after startup=%bd", configheap-initheap));
953 
954  for(;;) alt {
955  (offset, nil, nil, rc) := <- fio.read =>
956  if(rc == nil)
957  continue;
958 
959  statsem.obtain();
960  lnblocks := nblocks;
961  lnflateblocks := nflateblocks;
962  ldatasize := datasize;
963  lnwritesdup := nwritesdup;
964  statsem.release();
965 
966  h := heapused();
967  mean := 0;
968  if(lnblocks > 0)
969  mean = int (ldatasize/big lnblocks);
970  buf := array of byte sprint(
971  "%14d reads\n%14d writes\n%14d syncs\n%14d pings\n"+
972  "%14d dupwrites\n%14d lookupcollisions\n%14d blocks\n%14d flateblocks\n%14d meanblocksize\n"+
973  "%14d readerprocs\n%14d busyreaderprocs\n%14d readerwrites\n"+
974  "%14bd totalheap\n%14bd newheap\n%14bd datasize\n%s\n",
975  nreads, nwrites, nsyncs, npings,
976  lnwritesdup, lookupcollisions, lnblocks, lnflateblocks, mean,
977  nreaders, nbusyreaders, nreaderwrites,
978  h, h-configheap, ldatasize, writeerror);
979 
980  if(offset > len buf)
981  offset = len buf;
982  rc <-= (buf[offset:], nil);
983 
984  (nil, nil, nil, wc) := <- fio.write =>
985  if(wc == nil)
986  continue;
987  if(debug) say("main: file2chan write");
988  wc <-= (0, Eperm);
989 
990  (iswrite, rmsg, c) := <-lookupdonec =>
991  lookupdone(iswrite, rmsg, c);
992 
993  err := <-writererrorc =>
994  if(debug) say("main: writer error: "+err);
995  writeerror = err;
996 
997  (rmsg, c) := <-syncdonec =>
998  if(c.rpid != -1)
999  c.respc<- = rmsg;
1000  if(debug) say("main: sync rmsg from writer handled");
1001 
1002  (ok, c) := <-wrotec =>
1003  if(debug) say("main: client wrote vmsg");
1004  if(ok == 0) {
1005  if(writeerror == nil && !c.readonly) {
1006  takewrites();
1007  storec<- = ref Store.Sync(nil, 0);
1008  }
1009  killc(c);
1010  if(debug) say("main: client killed after write error");
1011  } else
1012  c.inuse--;
1013 
1014  (vmsg, c) := <-reqc =>
1015  if(debug) say("main: client read message: "+vmsg.text());
1016  if(vmsg == nil) {
1017  if(writeerror == nil && !c.readonly) {
1018  takewrites();
1019  storec<- = ref Store.Sync(nil, 0);
1020  }
1021  killc(c);
1022  if(debug) say("main: client killed after read error");
1023  continue;
1024  }
1025  if(c.inuse >= 256+1) {
1026  killc(c);
1027  if(debug) say("main: client killed after too many tids in use");
1028  continue;
1029  }
1030  c.inuse++;
1031 
1032  pick tmsg := vmsg {
1033  Tread =>
1034  nreads++;
1035  if(tmsg.score.eq(zeroscore)) {
1036  c.respc<- = ref Vmsg.Rread(0, tmsg.tid, array[0] of byte);
1037  continue;
1038  }
1039 
1040  sem.obtain();
1041  d := queuelookup(tmsg.score, tmsg.etype);
1042  if(d != nil) {
1043  sem.release();
1044  c.respc<- = ref Vmsg.Rread(0, tmsg.tid, d);
1045  continue;
1046  }
1047  addrs := memlookup(tmsg.score, tmsg.etype);
1048  sem.release();
1049  if(len addrs == 0) {
1050  c.respc<- = ref Vmsg.Rerror(0, tmsg.tid, "no such score/type");
1051  continue;
1052  }
1053 
1054  lookupsend(ref Lookup.Read(c, tmsg.tid, addrs, tmsg.score, tmsg.etype, tmsg.n));
1055 
1056  Twrite =>
1057  nwrites++;
1058  nowritemsg: string;
1059  if(writeerror != nil)
1060  nowritemsg = writeerror;
1061  if(c.readonly)
1062  nowritemsg = "writes not allowed";
1063  if(nowritemsg != nil) {
1064  c.respc<- = ref Vmsg.Rerror(0, tmsg.tid, nowritemsg);
1065  continue;
1066  }
1067 
1068  score := Score(sha1(tmsg.data));
1069  c.respc<- = ref Vmsg.Rwrite(0, tmsg.tid, score);
1070 
1071  if(score.eq(zeroscore))
1072  continue;
1073 
1074  sem.obtain();
1075  d := queuelookup(score, tmsg.etype);
1076  if(d != nil) {
1077  sem.release();
1078  continue;
1079  }
1080  addrs := memlookup(score, tmsg.etype);
1081  writequeue.insert(b := ref Block(score, tmsg.etype, tmsg.data));
1082  sem.release();
1083 
1084  if(len addrs > 0)
1085  lookupsend(ref Lookup.Write(c, tmsg.tid, addrs, b));
1086  else
1087  storec<- = ref Store.Write(c, b);
1088 
1089  Tsync =>
1090  nsyncs++;
1091  nowritemsg: string;
1092  if(writeerror != nil)
1093  nowritemsg = writeerror;
1094  if(c.readonly)
1095  nowritemsg = "writes not allowed";
1096  if(nowritemsg != nil) {
1097  c.respc<- = ref Vmsg.Rerror(0, tmsg.tid, nowritemsg);
1098  } else {
1099  takewrites();
1100  storec<- = ref Store.Sync(c, tmsg.tid);
1101  }
1102 
1103  Tping =>
1104  npings++;
1105  c.respc<- = ref Vmsg.Rping(0, tmsg.tid);
1106 
1107  * =>
1108  if(debug) say(sprint("main: bad tmsg, tag=%d", tagof vmsg));
1109  }
1110  }
1111 }
1112 
1113 readline(fd: ref Sys->FD): array of byte
1114 {
1115  buf := array[128] of byte;
1116  for(i := 0; i < len buf; i++) {
1117  if(sys->read(fd, buf[i:], 1) != 1)
1118  return nil;
1119  if(buf[i] == byte '\n')
1120  return buf[:i];
1121  }
1122  sys->werrstr("version line too long");
1123  return nil;
1124 }
1125 
1126 handshake(fd: ref Sys->FD): string
1127 {
1128  if(fprint(fd, "venti-02-ventisrv\n") < 0)
1129  return sprint("writing version: %r");
1130 
1131  d := readline(fd);
1132  if(d == nil)
1133  return sprint("bad version (%r)");
1134  if(!str->prefix("venti-02-", string d))
1135  return sprint("bad version (%s)", string d);
1136 
1137  (tvmsg, terr) := Vmsg.read(fd);
1138  if(terr != nil)
1139  return "reading thello: "+terr;
1140 
1141  if(tagof tvmsg != tagof Vmsg.Thello)
1142  return "first message not thello";
1143 
1144  rvmsg := ref Vmsg.Rhello(0, 0, nil, 0, 0);
1145  md := rvmsg.pack();
1146  if(md == nil || sys->write(fd, md, len md) != len md)
1147  return sprint("writing rhello: %r");
1148  return nil;
1149 }
1150 
1151 client(fd: ref Sys->FD, readonly: int)
1152 {
1153  conntime := big daytime->now();
1154  herr := handshake(fd);
1155  if(herr != nil) {
1156  if(debug) say("handshake failed: "+herr);
1157  return;
1158  }
1159 
1160  c := ref Client(sys->pctl(0, nil), 0, chan[256+1] of ref Vmsg, 0, conntime, readonly);
1161  spawn cwriter(fd, pidc := chan of int, c);
1162  c.wpid = <-pidc;
1163 
1164  for(;;) {
1165  (vmsg, err) := Vmsg.read(fd);
1166  if(vmsg != nil && !vmsg.istmsg)
1167  err = "message not tmsg";
1168  if(vmsg != nil && tagof vmsg == tagof Vmsg.Tgoodbye)
1169  err = "closing down";
1170  if(err != nil || vmsg == nil) {
1171  if(debug) say("client: reading: "+err);
1172  break;
1173  }
1174 
1175  reqc<- = (vmsg, c);
1176  }
1177  reqc<- = (nil, c);
1178 }
1179 
1180 cwriter(fd: ref Sys->FD, pidc: chan of int, c: ref Client)
1181 {
1182  pidc<- = sys->pctl(0, nil);
1183  for(;;) {
1184  rmsg := <-c.respc;
1185  if(rmsg == nil) {
1186  if(debug) say("client: have nil on respc, closing");
1187  break;
1188  }
1189  d := rmsg.pack();
1190  n := sys->write(fd, d, len d);
1191  if(n != len d) {
1192  if(debug) say(sprint("client: writing rmsg: %r"));
1193  break;
1194  }
1195  wrotec<- = (1, c);
1196  }
1197  if(debug) say("client: exiting");
1198  wrotec<- = (0, c);
1199 }
1200 
1201 halfcmp(hs: array of byte, s: array of byte, hdt, dt: int): int
1202 {
1203  if(hdt != dt)
1204  return hdt - dt;
1205  for(i := 0; i < len hs; i++)
1206  if(hs[i] != s[i])
1207  return int hs[i] - int s[i];
1208  return 0;
1209 }
1210 
1211 indexstore(ih: ref Ihdr): string
1212 {
1213  d := array[Ihdrsize] of byte;
1214  ih.pack(d);
1215  n := sys->pwrite(indexfd, d, len d, indexsize);
1216  if(n != len d)
1217  return sprint("writing index at offset=%bd: %r", indexsize);
1218 if(debug) say(sprint("indexstore: stored at indexsize=%bd data offset=%bd", indexsize, ih.offset));
1219  indexsize += big n;
1220  return nil;
1221 }
1222 
1223 datastoreflate(blocks: array of ref Dhdr, d: array of byte): (big, string)
1224 {
1225 if(debug) say(sprint("datastoreflate: len blocks=%d len d=%d", len blocks, len d));
1226  fh := ref Fhdr(blocks, len blocks*Fbhdrsize, len d);
1227  fhbuf := array[Fhdrsize+fh.hsize] of byte;
1228  fh.pack(fhbuf);
1229  if(big (len fhbuf+len d)+datasize > maxdatasize)
1230  return (datasize, sprint("data file full"));
1231 
1232  n := sys->pwrite(datafd, fhbuf, len fhbuf, datasize);
1233  if(n != len fhbuf)
1234  return (datasize, sprint("writing flateblock header: %r"));
1235 
1236  n = sys->pwrite(datafd, d, len d, datasize+big len fhbuf);
1237  if(n != len d)
1238  return (datasize, sprint("writing flateblock data: %r"));
1239 
1240  offset := datasize;
1241  statsem.obtain();
1242  datasize += big (len fhbuf+len d);
1243  nblocks += len blocks;
1244  nflateblocks += len blocks;
1245  statsem.release();
1246 if(debug) say(sprint("datastoreflate: stored at offset=%bd datasize=%bd len blocks=%d comprsize=%d", offset, datasize, len blocks, len d));
1247  return (offset, nil);
1248 }
1249 
1250 
1251 datastore(dh: ref Dhdr, d: array of byte): (big, string)
1252 {
1253  if(dh.size != len d)
1254  fail(sprint("datastore: refusing to write block, header says %d bytes, data is %d bytes", dh.size, len d));
1255  if(big (Dhdrsize+dh.size)+datasize > maxdatasize)
1256  return (datasize, sprint("data file full"));
1257 
1258  dhbuf := array[Dhdrsize] of byte;
1259  dh.pack(dhbuf);
1260  n := sys->pwrite(datafd, dhbuf, len dhbuf, datasize);
1261  if(n != len dhbuf)
1262  return (datasize, sprint("writing data header: %r"));
1263 
1264  n = sys->pwrite(datafd, d, len d, datasize+big len dhbuf);
1265  if(n != len d)
1266  return (datasize, sprint("writing data: %r"));
1267 
1268  offset := datasize;
1269  statsem.obtain();
1270  datasize += big (len dhbuf+len d);
1271  nblocks++;
1272  statsem.release();
1273 if(debug) say(sprint("datastore: stored at offset=%bd datasize=%bd blocksize=%d", offset, datasize, dh.size));
1274  return (offset, nil);
1275 }
1276 
1277 offsetread(offset: big): (array of ref Dhdr, int, big, string)
1278 {
1279  d := array[Maxblocksize] of byte;
1280  n := preadn(datafd, d, len d, offset);
1281  if(n < 0)
1282  return (nil, 0, big 0, sprint("reading: %r"));
1283  if(n < 4)
1284  return (nil, 0, big 0, "short header");
1285  case get32(d, 0) {
1286  Dhdrmagic =>
1287  if(len d < Dhdrsize)
1288  return (nil, 0, big 0, "short dheader");
1289 
1290  (dhdr, err) := Dhdr.unpack(d);
1291  if(err != nil)
1292  return (nil, 0, big 0, "unpack dheader: "+err);
1293  if(Dhdrsize+dhdr.size > len d)
1294  return (nil, 0, big 0, "dheader points outside data file");
1295 
1296  score := Score(sha1(d[Dhdrsize:Dhdrsize+dhdr.size]));
1297  if(!score.eq(dhdr.score))
1298  return (nil, 0, big 0, sprint("dheader score (%s) does not match actual score (%s) at offset=%bd",
1299  dhdr.score.text(), score.text(), offset));
1300  return (array[1] of {dhdr}, 0, offset+big (Dhdrsize+dhdr.size), nil);
1301 
1302  Fhdrmagic =>
1303  if(len d < Fhdrsize)
1304  return (nil, 0, big 0, "short fheader");
1305 
1306  (fhdr, o, err) := Fhdr.unpack(d);
1307  if(err != nil)
1308  return (nil, 0, big 0, "unpack fheader: "+err);
1309  if(Fhdrsize+fhdr.hsize+fhdr.dsize > len d)
1310  return (nil, 0, big 0, "fheader points outside data file");
1311 
1312  nb := len fhdr.blocks;
1313  lb := fhdr.blocks[nb-1];
1314  uncsize := o[nb-1]+lb.size;
1315  dst := array[uncsize] of byte;
1316  s := Fhdrsize+fhdr.hsize;
1317  err = decompress(d[s:s+fhdr.dsize], dst);
1318  if(err != nil)
1319  return (nil, 0, big 0, "decompressing: "+err);
1320 
1321  for(i := 0; i < len fhdr.blocks; i++) {
1322  dhdr := fhdr.blocks[i];
1323  off := o[i];
1324  score := Score(sha1(dst[off:off+dhdr.size]));
1325  if(!score.eq(dhdr.score))
1326  return (nil, 0, big 0, sprint("fheader score (%s, i=%d) does not match actual score (%s) at offset=%bd",
1327  dhdr.score.text(), i, score.text(), offset));
1328  }
1329  return (fhdr.blocks, 1, offset+big (Fhdrsize+fhdr.hsize+fhdr.dsize), nil);
1330 
1331  * =>
1332  return (nil, 0, big 0, "bad magic");
1333  }
1334 }
1335 
1336 dblockget(offset: big, score: Score, dtype, readdata, size: int): (int, array of byte, string)
1337 {
1338 if(debug) say(sprint("blockread offset=%bd", offset));
1339  rsize := size;
1340  if(rsize == Venti->Maxlumpsize)
1341  rsize = 8*1024;
1342  buf := array[Dhdrsize+rsize] of byte;
1343  n := preadn(datafd, buf, len buf, offset);
1344  if(n < Dhdrsize)
1345  return (0, nil, sprint("reading block header at offset=%bd: %r", offset));
1346 
1347  (dh, err) := Dhdr.unpack(buf[:Dhdrsize]);
1348  if(err != nil)
1349  return (0, nil, err);
1350 
1351  d: array of byte;
1352  if(readdata) {
1353  d = buf[Dhdrsize:];
1354  if(len d > dh.size)
1355  d = d[:dh.size];
1356  if(len d < dh.size) {
1357  if(size == rsize)
1358  return (0, nil, sprint("data (%d bytes) larger than requested (%d bytes)", dh.size, size));
1359  newd := array[dh.size] of byte;
1360  newd[:] = d;
1361  want := len newd-len d;
1362  n = preadn(datafd, newd[len d:], want, offset+big len buf);
1363  if(n >= 0 && n < want)
1364  sys->werrstr("short read");
1365  if(n != want)
1366  return (0, nil, sprint("reading block at offset=%bd: %r", offset));
1367  d = newd;
1368  }
1369  ds := Score(sha1(d[:dh.size]));
1370  if(!dh.score.eq(ds))
1371  return (0, nil, sprint("mismatching score for block at offset=%bd: header=%s data=%s", offset, dh.score.text(), ds.text()));
1372  }
1373 
1374  if(!score.eq(dh.score) || dh.dtype != dtype)
1375  return (0, nil, nil);
1376  return (1, d, nil);
1377 }
1378 
1379 flatequeueadd(blocks: array of ref Dhdr, o: array of int, d: array of byte)
1380 {
1381  flatequeue.sem.obtain();
1382  for(i := 0; i < len blocks; i++) {
1383  dh := blocks[i];
1384  off := o[i];
1385  flatequeue.insert(ref Block(dh.score, dh.dtype, d[off:off+dh.size]));
1386  }
1387  flatequeue.sem.release();
1388 }
1389 
1390 fblockget(offset: big, score: Score, dtype, readdata, size: int): (int, array of byte, string)
1391 {
1392  buf := array[Maxblocksize] of byte;
1393  n := preadn(datafd, buf, len buf, offset);
1394  if(n < 0)
1395  return (0, nil, sprint("reading: %r"));
1396  buf = buf[:n];
1397  (f, o, err) := Fhdr.unpack(buf);
1398  if(err != nil)
1399  return (0, nil, err);
1400  for(i := 0; i < len f.blocks; i++) {
1401  dh := f.blocks[i];
1402  if(dh.dtype == dtype && dh.score.eq(score)) {
1403  if(dh.size > size)
1404  return (0, nil, sprint("data (%d) larger than requested (%d)", dh.size, size));
1405 if(debug) say("flateblockget: hit");
1406  if(!readdata)
1407  return (1, nil, nil);
1408  nb := len f.blocks;
1409  lb := f.blocks[nb-1];
1410  dst := array[o[nb-1]+lb.size] of byte;
1411  s := Fhdrsize+f.hsize;
1412  if(len buf < s+f.dsize)
1413  return (0, nil, sprint("fheader points outside datafile at offset=%bd need=%d have=%d", offset, s+f.dsize, n));
1414  err = decompress(buf[s:s+f.dsize], dst);
1415  if(err != nil)
1416  return (0, nil, "decompressing: "+err);
1417  flatequeueadd(f.blocks, o, dst);
1418  off := o[i];
1419  return (1, dst[off:off+dh.size], nil);
1420  }
1421  }
1422 if(debug) say("flateblockget: miss");
1423  return (0, nil, nil);
1424 }
1425 
1426 blockget(compressed: int, offset: big, score: Score, dtype: int, readdata: int, size: int): (int, array of byte, string)
1427 {
1428  if(compressed)
1429  return fblockget(offset, score, dtype, readdata, size);
1430  return dblockget(offset, score, dtype, readdata, size);
1431 }
1432 
1433 datalookup(offsets: list of (int, big), score: Score, dtype: int, readdata: int, size: int): (int, array of byte, string)
1434 {
1435  err: string;
1436  for(l := offsets; l != nil; l = tl l) {
1437  (compr, o) := hd l;
1438  (hit, d, derr) := blockget(compr, o, score, dtype, readdata, size);
1439  if(hit)
1440  return (hit, d, nil);
1441  if(derr != nil)
1442  err = derr;
1443  }
1444  return (0, nil, err);
1445 }
1446 
1447 getscore(c: ref Chain, i: int, a: array of byte)
1448 {
1449  b := (i+1)*membytes-1-scorebytes;
1450  a[0] = c.d[b]&scorebytemask;
1451  for(j := 1; j < scorebytes; j++)
1452  a[j] = c.d[b+j];
1453 }
1454 
1455 dumpchain(c: ref Chain)
1456 {
1457  for(i := 0; i < c.used; i++) {
1458  l := (i+1)*membytes-1;
1459  dtype := c.d[l];
1460  getscore(c, i, d := array[scorebytes] of byte);
1461  (compr, addr) := c.getaddr(i);
1462  say(sprint(" i=%d dtype=%d addr=%bd compr=%d score=%s", i, int dtype, addr, compr, scorestr(d)));
1463  }
1464 }
1465 
1466 dumphead(score: Score)
1467 {
1468  i := 0;
1469  for(c := heads[head(score.a)]; c != nil; c = c.next) {
1470  say(sprint("chain %d, c.used=%d", i++, c.used));
1471  dumpchain(c);
1472  }
1473 }
1474 
1475 blockcmp(c: ref Chain, j: int, d: array of byte): int
1476 {
1477  o := (j+1)*membytes-1;
1478  for(i := len d-1; i > 0; i--) {
1479  if(c.d[o] != d[i])
1480  return int c.d[o]-int d[i];
1481  o--;
1482  }
1483  return int (c.d[o]&scorebytemask)-int (d[0]&scorebytemask);
1484 }
1485 
1486 find(c: ref Chain, d: array of byte): int
1487 {
1488  f := 0;
1489  l := c.used;
1490  while(f < l) {
1491  m := (f+l)/2;
1492  cmp := blockcmp(c, m, d);
1493  if(cmp < 0)
1494  f = m+1;
1495  else
1496  l = m;
1497  }
1498  return l;
1499 }
1500 
1501 find0(c: ref Chain, d: array of byte): int
1502 {
1503  for(i := 0; i < c.used; i++)
1504  if(blockcmp(c, i, d) >= 0)
1505  return i;
1506  return i;
1507 }
1508 
1509 Chain.getaddr(c: self ref Chain, j: int): (int, big)
1510 {
1511  o := j*membytes;
1512  v := big 0;
1513  for(b := addrbits; b >= 8; b -= 8)
1514  v |= big c.d[o++] << (b-8);
1515  if(b > 0)
1516  v |= big c.d[o] >> (8-b);
1517  iscompr := Cflag && int ((v&comprmask)>>(addrbits-1));
1518  if(iscompr)
1519  v &= ~comprmask;
1520  return (iscompr, v);
1521 }
1522 
1523 Chain.lookup(c: self ref Chain, d: array of byte, l: list of (int, big)): list of (int, big)
1524 {
1525  for(i := find(c, d); i < c.used && blockcmp(c, i, d) == 0; i++)
1526  l = c.getaddr(i)::l;
1527  return l;
1528 }
1529 
1530 lookup0(c: ref Chain, d: array of byte, l: list of (int, big)): list of (int, big)
1531 {
1532  for(i := find0(c, d); i < c.used && blockcmp(c, i, d) == 0; i++)
1533  l = c.getaddr(i)::l;
1534  return l;
1535 }
1536 
1537 Chain.insert(c: self ref Chain, ih: ref Ihdr)
1538 {
1539  d := array[membytes] of byte;
1540  mkentry(ih, d);
1541  i := find(c, d[len d-(scorebytes+1):]);
1542  c.d[(i+1)*membytes:] = c.d[i*membytes:c.used*membytes];
1543  c.d[i*membytes:] = d;
1544  c.used++;
1545 }
1546 
1547 Chain.mk(): ref Chain
1548 {
1549  return ref Chain(array[chainblocks*membytes] of byte, 0, nil);
1550 }
1551 
1552 sha1(a: array of byte): array of byte
1553 {
1554  r := array[keyring->SHA1dlen] of byte;
1555  keyring->sha1(a, len a, r, nil);
1556  return r;
1557 }
1558 
1559 scorestr(d: array of byte): string
1560 {
1561  s := "";
1562  for(i := 0; i < len d; i++)
1563  s += sprint("%02x", int d[i]);
1564  return s;
1565 }
1566 
1567 
1568 Fifo.lookup(l: self ref Fifo, s: Score, dtype: int): array of byte
1569 {
1570  for(i := 0; i < l.n; i++) {
1571  b := l.a[i];
1572  if(b.dtype == dtype && b.score.eq(s))
1573  return b.d;
1574  }
1575  return nil;
1576 }
1577 
1578 Fifo.insert(l: self ref Fifo, b: ref Block)
1579 {
1580  l.a[l.f] = b;
1581  l.f = (l.f+1)%len l.a;
1582  if(l.n < len l.a)
1583  l.n++;
1584 }
1585 
1586 
1587 Queue.lookup(q: self ref Queue, s: Score, dtype: int): array of byte
1588 {
1589  for(i := 0; i < q.n; i++)
1590  if(q.a[i].dtype == dtype && q.a[i].score.eq(s))
1591  return q.a[i].d;
1592  return nil;
1593 }
1594 
1595 Queue.insert(q: self ref Queue, b: ref Block)
1596 {
1597  if(q.n >= len q.a) {
1598  newa := array[2*len q.a] of ref Block;
1599  newa[:] = q.a;
1600  q.a = newa;
1601  }
1602  q.a[q.n++] = b;
1603 }
1604 
1605 Queue.remove(q: self ref Queue, b: ref Block)
1606 {
1607  for(i := 0; i < q.n; i++) {
1608  if(q.a[i] == b) {
1609  q.a[i:] = q.a[i+1:q.n];
1610  q.a[--q.n] = nil;
1611  return;
1612  }
1613  }
1614 fail("did not remove anything");
1615 }
1616 
1617 queuelookup(score: Score, dtype: int): array of byte
1618 {
1619  d := writequeue.lookup(score, dtype);
1620  if(d == nil)
1621  d = flatequeue.lookup(score, dtype);
1622  return d;
1623 }
1624 
1625 head(d: array of byte): int
1626 {
1627  return ((int d[0]<<0)|(int d[1]<<8)|(int d[2]<<16)) % len heads;
1628 }
1629 
1630 chain(d: array of byte): ref Chain
1631 {
1632  c := heads[h := head(d)];
1633  if(c == nil)
1634  c = heads[h] = Chain.mk();
1635  while(c.next != nil)
1636  c = c.next;
1637  if(c.used >= chainblocks) {
1638  c.next = Chain.mk();
1639  c = c.next;
1640  }
1641  return c;
1642 }
1643 
1644 meminsert(ih: ref Ihdr)
1645 {
1646  if(debug) say(sprint("insert: adding halfscore=%s type=%d offset=%bd compressed=%d", scorestr(ih.halfscore), ih.dtype, ih.offset, ih.compressed));
1647  chain(ih.halfscore).insert(ih);
1648 }
1649 
1650 mkentry(ih: ref Ihdr, d: array of byte)
1651 {
1652  mkmem(ih.halfscore, ih.dtype, d[len d-(scorebytes+1):]);
1653  o := 0;
1654  offset := ih.offset;
1655  if(ih.compressed)
1656  offset |= comprmask;
1657  for(b := addrbits; b >= 8; b-= 8)
1658  d[o++] = byte (offset>>(b-8));
1659  if(b > 0)
1660  d[o++] |= byte (offset<<(8-b));
1661 }
1662 
1663 mkmem(a: array of byte, dtype: int, d: array of byte)
1664 {
1665  d[:] = a[headbytes:headbytes+scorebytes];
1666  d[0] &= scorebytemask;
1667  d[len d-1] = byte dtype;
1668 }
1669 
1670 memlookup(score: Score, dtype: int): list of (int, big)
1671 {
1672  if(debug) say(sprint("lookup: looking for score=%s type=%d", score.text(), dtype));
1673 
1674  mkmem(score.a, dtype, d := array[scorebytes+1] of byte);
1675  addrs: list of (int, big);
1676  for(c := heads[head(score.a)]; c != nil; c = c.next)
1677  addrs = c.lookup(d, addrs);
1678  if(len addrs > 1)
1679  lookupcollisions += len addrs-1;
1680  return addrs;
1681 }
1682 
1683 memlookup0(score: Score, dtype: int): list of (int, big)
1684 {
1685  if(debug) say(sprint("lookup: looking for score=%s type=%d", score.text(), dtype));
1686 
1687  mkmem(score.a, dtype, d := array[scorebytes+1] of byte);
1688  addrs: list of (int, big);
1689  for(c := heads[head(score.a)]; c != nil; c = c.next)
1690  addrs = lookup0(c, d, addrs);
1691  return addrs;
1692 }
1693 
1694 preadn(fd: ref Sys->FD, d: array of byte, want: int, offset: big): int
1695 {
1696  have := 0;
1697  while(want - have > 0) {
1698  n := sys->pread(fd, d[have:], want-have, offset+big have);
1699  if(n < 0)
1700  return -1;
1701  if(n == 0)
1702  break;
1703  have += n;
1704  }
1705  return have;
1706 }
1707 
1708 get16(d: array of byte, i: int): int
1709 {
1710  return (int d[i]<<8) | (int d[i+1]<<0);
1711 }
1712 
1713 get32(d: array of byte, i: int): big
1714 {
1715  return (big get16(d, i)<<16)|big get16(d, i+2);
1716 }
1717 
1718 get48(d: array of byte, i: int): big
1719 {
1720  return (big get16(d, i)<<32)|get32(d, i+2);
1721 }
1722 
1723 put16(d: array of byte, i: int, v: int)
1724 {
1725  d[i+0] = byte (v>>8);
1726  d[i+1] = byte (v>>0);
1727 }
1728 
1729 put32(d: array of byte, i: int, v: big)
1730 {
1731  put16(d, i+0, int (v>>16));
1732  put16(d, i+2, int (v>>0));
1733 }
1734 
1735 put48(d: array of byte, i: int, v: big)
1736 {
1737  put16(d, i+0, int(v>>32));
1738  put32(d, i+2, v>>0);
1739 }
1740 
1741 Ihdr.unpack(d: array of byte): ref Ihdr
1742 {
1743  o := 0;
1744  halfscore := d[o:o+Indexscoresize];
1745  o += Indexscoresize;
1746  dtype := int d[o];
1747  o += 1;
1748  offset := get48(d, o);
1749  compressed := 0;
1750  if((offset&Icomprmask) == Icomprmask)
1751  compressed = 1;
1752  offset &= ~Icomprmask;
1753  o += 6;
1754  if(o != Ihdrsize)
1755  fail("bad iheader.unpack");
1756  return ref Ihdr(halfscore, dtype, offset, compressed);
1757 }
1758 
1759 Ihdr.pack(ih: self ref Ihdr, d: array of byte)
1760 {
1761  o := 0;
1762  d[o:] = ih.halfscore;
1763  o += len ih.halfscore;
1764  d[o] = byte ih.dtype;
1765  o += 1;
1766  offset := ih.offset;
1767  if(ih.compressed)
1768  offset |= Icomprmask;
1769  put48(d, o, offset);
1770  o += 6;
1771  if(o != Ihdrsize)
1772  fail("bad iheader.pack");
1773 }
1774 
1775 Dhdr.unpack(d: array of byte): (ref Dhdr, string)
1776 {
1777  o := 0;
1778  if(get32(d, o) != Dhdrmagic)
1779  return (nil, "bad dhdr magic");
1780  o += 4;
1781  score := d[o:o+Scoresize];
1782  o += Scoresize;
1783  dtype := int d[o];
1784  o += 1;
1785  size := get16(d, o);
1786  o += 2;
1787  conntime := get32(d, o);
1788  o += 4;
1789  if(o != Dhdrsize)
1790  fail("bad dheader.unpack");
1791  return (ref Dhdr(Score(score), dtype, size, conntime), nil);
1792 }
1793 
1794 Dhdr.pack(dh: self ref Dhdr, d: array of byte)
1795 {
1796  o := 0;
1797  put32(d, o, Dhdrmagic);
1798  o += 4;
1799  d[o:] = dh.score.a;
1800  o += Scoresize;
1801  d[o] = byte dh.dtype;
1802  o += 1;
1803  put16(d, o, dh.size);
1804  o += 2;
1805  put32(d, o, dh.conntime);
1806  o += 4;
1807  if(o != Dhdrsize)
1808  fail("bad dheader.pack");
1809 }
1810 
1811 
1812 Fhdr.unpack(d: array of byte): (ref Fhdr, array of int, string)
1813 {
1814  o := 0;
1815  if(get32(d, o) != Fhdrmagic)
1816  return (nil, nil, "bad fhdr magic");
1817  o += 4;
1818  nb := int d[o++];
1819  hsize := nb*Fbhdrsize;
1820  if(o+hsize > len d)
1821  return (nil, nil, "header points outside buffer");
1822  dsize := get16(d, o);
1823  o += 2;
1824 
1825  offset := 0;
1826  b := array[nb] of ref Dhdr;
1827  offsets := array[nb] of int;
1828  for(i := 0; i < nb; i++) {
1829  score := d[o:o+Scoresize];
1830  o += Scoresize;
1831  dtype := int d[o];
1832  o += 1;
1833  size := get16(d, o);
1834  o += 2;
1835  conntime := get32(d, o);
1836  o += 4;
1837  b[i] = ref Dhdr(Score(score), dtype, size, conntime);
1838  offsets[i] = offset;
1839  offset += size;
1840  }
1841  return (ref Fhdr(b, hsize, dsize), offsets, nil);
1842 }
1843 
1844 Fhdr.pack(fh: self ref Fhdr, d: array of byte)
1845 {
1846  o := 0;
1847  put32(d, o, Fhdrmagic);
1848  o += 4;
1849  d[o] = byte len fh.blocks;
1850  o += 1;
1851  put16(d, o, fh.dsize);
1852  o += 2;
1853  for(i := 0; i < len fh.blocks; i++) {
1854  b := fh.blocks[i];
1855  d[o:] = b.score.a;
1856  o += Scoresize;
1857  d[o] = byte b.dtype;
1858  o += 1;
1859  put16(d, o, b.size);
1860  o += 2;
1861  put32(d, o, b.conntime);
1862  o += 4;
1863  }
1864  if(o != Fhdrsize+len fh.blocks*Fbhdrsize)
1865  fail("bad fhdr.pack");
1866 }
1867 
1868 mfd: ref Sys->FD;
1869 Current, Max: con iota;
1870 
1871 heapinfo(which: int): big
1872 {
1873  if(mfd == nil)
1874  mfd = sys->open("/dev/memory", sys->OREAD);
1875  if(mfd == nil)
1876  fail(sprint("open /dev/memory: %r"));
1877  sys->seek(mfd, big 0, Sys->SEEKSTART);
1878 
1879  buf := array[400] of byte;
1880  n := sys->read(mfd, buf, len buf);
1881  if(n <= 0)
1882  fail(sprint("reading /dev/memory: %r"));
1883 
1884  (nil, l) := sys->tokenize(string buf[0:n], "\n");
1885  for(; l != nil; l = tl l)
1886  if((hd l)[7*12:] == "heap" && which == Current)
1887  return big ((hd l)[0:12]);
1888  else if ((hd l)[7*12:] == "heap" && which == Max)
1889  return big ((hd l)[12:24]);
1890  fail("missing heap line in /dev/memory");
1891  return big 0;
1892 }
1893 
1894 heapmax(): big
1895 {
1896  return heapinfo(Max);
1897 }
1898 
1899 heapused(): big
1900 {
1901  return heapinfo(Current);
1902 }
1903 
1904 suffix(s: string): big
1905 {
1906  l := array[] of {'k', 'm', 'g', 't', 'p'};
1907  mult := big 1;
1908  if(s == nil)
1909  return big 0;
1910  s = str->tolower(s);
1911  for(i := 0; i < len l; i++) {
1912  mult *= big 1024;
1913  if(s[len s-1] == l[i])
1914  return mult * big s[:len s-1];
1915  }
1916  return big s;
1917 }
1918 
1919 log2(v: big): int
1920 {
1921  for(bits := 0; (big 1<<bits) < v; bits++)
1922  ;
1923  return bits;
1924 }
1925 
1926 fail(s: string)
1927 {
1928  fprint(fildes(2), "%s\n", s);
1929  raise "fail:"+s;
1930 }
1931 
1932 say(s: string)
1933 {
1934  fprint(fildes(2), "%s\n", s);
1935 }