chiark / gitweb /
94a7f76a904fd4f7de81477d34c6f8a2985bd85e
[chiark-tcl.git] / cdb / writeable.c
1 /**/
2
3 #include "chiark_tcl_cdb.h"
4
5 #define ftello ftell
6 #define fseeko fseek
7
8 /*---------- Forward declarations ----------*/
9
10 struct ht_forall_ctx;
11
12 /*---------- Useful routines ----------*/
13
14 static void maybe_close(int fd) {
15   if (fd>=0) close(fd);
16 }
17
18 /*==================== Subsystems and subtypes ====================*/
19
20 /*---------- Pathbuf ----------*/
21
22 typedef struct Pathbuf {
23   char *buf, *sfx;
24 } Pathbuf;
25
26 #define MAX_SUFFIX 5
27
28 static void pathbuf_init(Pathbuf *pb, const char *pathb) {
29   int l= strlen(pathb);
30   pb->buf= TALLOC(l + MAX_SUFFIX + 1);
31   memcpy(pb->buf, pathb, l);
32   pb->sfx= pb->buf + l;
33 }
34 static const char *pathbuf_sfx(Pathbuf *pb, const char *suffix) {
35   assert(strlen(suffix) <= MAX_SUFFIX);
36   strcpy(pb->sfx, suffix);
37   return pb->buf;
38 }
39 static void pathbuf_free(Pathbuf *pb) {
40   TFREE(pb->buf);
41   pb->buf= 0;
42 }
43
44 /*---------- Our hash table ----------*/
45
46 typedef struct HashTable {
47   Tcl_HashTable t;
48   Byte padding[128]; /* allow for expansion by Tcl, urgh */
49   Byte confound[16];
50 } HashTable;
51
52 typedef struct HashValue {
53   int len;
54   Byte data[1];
55 } HashValue;
56
57 static HashValue *htv_prep(int len) {
58   HashValue *hd;
59   hd= TALLOC((hd->data - (Byte*)hd) + len);
60   hd->len= len;
61   return hd;
62 }  
63 static Byte *htv_fillptr(HashValue *hd) {
64   return hd->data;
65 }
66
67 static void ht_setup(HashTable *ht) {
68   Tcl_InitHashTable(&ht->t, TCL_STRING_KEYS);
69 }
70 static void ht_update(HashTable *ht, const char *key, HashValue *val_eat) {
71   Tcl_HashEntry *he;
72   int new;
73
74   he= Tcl_CreateHashEntry(&ht->t, (char*)key, &new);
75   if (!new) TFREE(Tcl_GetHashValue(he));
76   Tcl_SetHashValue(he, val_eat);
77     /* eats the value since the data structure owns the memory */
78 }
79 static void ht_maybeupdate(HashTable *ht, const char *key,
80                            HashValue *val_eat) {
81   /* like ht_update except does not overwrite existing values */
82   Tcl_HashEntry *he;
83   int new;
84
85   he= Tcl_CreateHashEntry(&ht->t, (char*)key, &new);
86   if (!new) { TFREE(val_eat); return; }
87   Tcl_SetHashValue(he, val_eat);
88 }
89
90 static const HashValue *ht_lookup(HashTable *ht, const char *key) {
91   Tcl_HashEntry *he;
92   
93   he= Tcl_FindHashEntry(&ht->t, key);
94   if (!he) return 0;
95   
96   return Tcl_GetHashValue(he);
97 }
98
99 static int ht_forall(HashTable *ht,
100                      int (*fn)(const char *key, HashValue *val,
101                                struct ht_forall_ctx *ctx),
102                      struct ht_forall_ctx *ctx) {
103   /* Returns first positive value returned by any call to fn, or 0. */
104   Tcl_HashSearch sp;
105   Tcl_HashEntry *he;
106   const char *key;
107   HashValue *val;
108   int r;
109   
110   for (he= Tcl_FirstHashEntry(&ht->t, &sp);
111        he;
112        he= Tcl_NextHashEntry(&sp)) {
113     val= Tcl_GetHashValue(he);
114     if (!val->len) continue;
115
116     key= Tcl_GetHashKey(&ht->t, he);
117     
118     r= fn(key, val, ctx);
119     if (r) return r;
120   }
121   return 0;
122 }
123
124 static void ht_destroy(HashTable *ht) {
125   Tcl_HashSearch sp;
126   Tcl_HashEntry *he;
127   
128   for (he= Tcl_FirstHashEntry(&ht->t, &sp);
129        he;
130        he= Tcl_NextHashEntry(&sp)) {
131     /* ht_forall skips empty (deleted) entries so is no good for this */
132     TFREE(Tcl_GetHashValue(he));
133   }
134   Tcl_DeleteHashTable(&ht->t);
135 }
136
137 /*==================== Existential ====================*/
138
139 /*---------- Rw data structure ----------*/
140
141 typedef struct Rw {
142   int ix, autocompact;
143   int cdb_fd, lock_fd;
144   struct cdb cdb; /* valid iff cdb_fd >= 0 */
145   FILE *logfile;
146   HashTable logincore;
147   Pathbuf pbsome, pbother;
148   off_t mainsz;
149   ScriptToInvoke on_info, on_lexminval;
150 } Rw;
151
152 static int rw_close(Tcl_Interp *ip, Rw *rw) {
153   int rc, r;
154
155   rc= TCL_OK;
156   ht_destroy(&rw->logincore);
157   if (rw->cdb_fd >= 0) cdb_free(&rw->cdb);
158   maybe_close(rw->cdb_fd);
159   maybe_close(rw->lock_fd);
160
161   if (rw->logfile) {
162     r= fclose(rw->logfile);
163     if (r && ip) { rc= cht_posixerr(ip, errno, "probable data loss! failed to"
164                                     " fclose logfile during untidy close"); }
165   }
166
167   pathbuf_free(&rw->pbsome); pathbuf_free(&rw->pbother);
168   return rc;
169 }
170
171 static void destroy_cdbrw_idtabcb(Tcl_Interp *ip, void *rw_v) {
172   rw_close(0,rw_v);
173   TFREE(rw_v);
174 }
175 const IdDataSpec cdbtcl_rwdatabases= {
176   "cdb-rwdb", "cdb-openrwdatabases-table", destroy_cdbrw_idtabcb
177 };
178
179 /*---------- File handling ----------*/
180
181 static int acquire_lock(Tcl_Interp *ip, Pathbuf *pb, int *lockfd_r) {
182   /* *lockfd_r must be -1 on entry.  If may be set to >=0 even
183    * on error, and must be closed by the caller. */
184   mode_t um, lockmode;
185   struct flock fl;
186   int r;
187
188   um= umask(~(mode_t)0);
189   umask(um);
190
191   lockmode= 0666 & ~((um & 0444)>>1);
192   /* Remove r where umask would remove w;
193    * eg umask intending 0664 here gives 0660 */
194   
195   *lockfd_r= open(pathbuf_sfx(pb,".lock"), O_RDWR|O_CREAT, lockmode);
196   if (*lockfd_r < 0)
197     return cht_posixerr(ip, errno, "could not open/create lockfile");
198
199   fl.l_type= F_WRLCK;
200   fl.l_whence= SEEK_SET;
201   fl.l_start= 0;
202   fl.l_len= 0;
203   fl.l_pid= getpid();
204
205   r= fcntl(*lockfd_r, F_SETLK, &fl);
206   if (r == -1) {
207     if (errno == EACCES || errno == EAGAIN)
208       return cht_staticerr(ip, "lock held by another process", "CDB LOCKED");
209     else return cht_posixerr(ip, errno, "unexpected error from fcntl while"
210                              " acquiring lock");
211   }
212
213   return TCL_OK;
214 }
215
216 /*---------- Log reading and writing ----------*/
217
218 static int readlognum(FILE *f, int delim, int *num_r) {
219   int c;
220   char numbuf[20], *p, *ep;
221   unsigned long ul;
222
223   p= numbuf;
224   for (;;) {
225     c= getc(f);  if (c==EOF) return -2;
226     if (c == delim) break;
227     if (!isdigit((unsigned char)c)) return -2;
228     *p++= c;
229     if (p == numbuf+sizeof(numbuf)) return -2;
230   }
231   if (p == numbuf) return -2;
232   *p= 0;
233
234   errno=0; ul= strtoul(numbuf, &ep, 10);
235   if (*ep || errno || ul >= INT_MAX/2) return -2;
236   *num_r= ul;
237   return 0;
238 }
239
240 static int readstorelogrecord(FILE *f, HashTable *ht,
241                               int (*omitfn)(const HashValue*,
242                                             struct ht_forall_ctx *ctx),
243                               struct ht_forall_ctx *ctx,
244                               void (*updatefn)(HashTable*, const char*,
245                                                HashValue*)) {
246   /* returns:
247    *      0     for OK
248    *     -1     eof
249    *     -2     corrupt or error
250    *     -3     got newline indicating end
251    *     >0     value from omitfn
252    */
253   int keylen, vallen;
254   char *key;
255   HashValue *val;
256   int c, rc, r;
257
258   c= getc(f);
259   if (c==EOF) { return feof(f) ? -1 : -2; }
260   if (c=='\n') return -3;
261   if (c!='+') return -2;
262
263   rc= readlognum(f, ',', &keylen);  if (rc) return rc;
264   rc= readlognum(f, ':', &vallen);  if (rc) return rc;
265
266   key= TALLOC(keylen+1);
267   val= htv_prep(vallen);
268
269   r= fread(key, 1,keylen, f);
270   if (r!=keylen) goto x2_free_keyval;
271   if (memchr(key,0,keylen)) goto x2_free_keyval;
272   key[keylen]= 0;
273
274   c= getc(f);  if (c!='-') goto x2_free_keyval;
275   c= getc(f);  if (c!='>') goto x2_free_keyval;
276   
277   r= fread(htv_fillptr(val), 1,vallen, f);
278   if (r!=vallen) goto x2_free_keyval;
279
280   c= getc(f);  if (c!='\n') goto x2_free_keyval;
281
282   rc= omitfn ? omitfn(val, ctx) : TCL_OK;
283   if (rc) { assert(rc>0); TFREE(val); }
284   else updatefn(ht, key, val);
285   
286   TFREE(key);
287   return rc;
288
289  x2_free_keyval:
290   TFREE(val);
291   TFREE(key);
292   return -2;
293 }
294
295 static int writerecord(FILE *f, const char *key, const HashValue *val) {
296   int r;
297
298   r= fprintf(f, "+%d,%d:%s->", strlen(key), val->len, key);
299   if (r<0) return -1;
300   
301   r= fwrite(val->data, 1, val->len, f);
302   if (r != val->len) return -1;
303
304   r= putc('\n', f);
305   if (r==EOF) return -1;
306
307   return 0;
308 }
309
310 /*---------- Creating ----------*/
311
312 int cht_do_cdbwr_create_empty(ClientData cd, Tcl_Interp *ip,
313                               const char *pathb) {
314   static const char *const toremoves[]= { ".cdb", ".log", ".tmp", 0 };
315
316   Pathbuf pb, pbmain;
317   int lock_fd=-1, rc, r;
318   FILE *f= 0;
319   const char *const *toremove;
320   struct stat stab;
321
322   pathbuf_init(&pb, pathb);
323   pathbuf_init(&pbmain, pathb);
324
325   rc= acquire_lock(ip, &pb, &lock_fd);  if (rc) goto x_rc;
326
327   r= lstat(pathbuf_sfx(&pbmain, ".main"), &stab);
328   if (!r) { rc= cht_staticerr(ip, "database already exists during creation",
329                               "CDB ALREADY-EXISTS");  goto x_rc; }
330   if (errno != ENOENT) PE("check for existing database .main during creation");
331
332   for (toremove=toremoves; *toremove; toremove++) {
333     r= remove(pathbuf_sfx(&pb, *toremove));
334     if (r && errno != ENOENT)
335       PE("delete possible spurious file during creation");
336   }
337   
338   f= fopen(pathbuf_sfx(&pb, ".tmp"), "w");
339   if (!f) PE("create new database .tmp");  
340   r= putc('\n', f);  if (r==EOF) PE("write sentinel to new database .tmp");
341   r= fclose(f);  f=0;  if (r) PE("close new database .tmp during creation");
342
343   r= rename(pb.buf, pbmain.buf);
344   if (r) PE("install new database .tmp as .main (finalising creation)");
345   
346   rc= TCL_OK;
347
348  x_rc:
349   if (f) fclose(f);
350   maybe_close(lock_fd);
351   pathbuf_free(&pb);
352   pathbuf_free(&pbmain);
353   return rc;
354 }
355
356 /*---------- Info callbacks ----------*/
357
358 static int infocbv3(Tcl_Interp *ip, Rw *rw, const char *arg1,
359                     const char *arg2fmt, const char *arg3, va_list al) {
360   Tcl_Obj *aa[3];
361   int na;
362   char buf[200];
363   vsnprintf(buf, sizeof(buf), arg2fmt, al);
364
365   na= 0;
366   aa[na++]= cht_ret_string(ip, arg1);
367   aa[na++]= cht_ret_string(ip, buf);
368   if (arg3) aa[na++]= cht_ret_string(ip, arg3);
369   
370   return cht_scriptinv_invoke_fg(&rw->on_info, na, aa);
371 }
372   
373 static int infocb3(Tcl_Interp *ip, Rw *rw, const char *arg1,
374                    const char *arg2fmt, const char *arg3, ...) {
375   int rc;
376   va_list al;
377   va_start(al, arg3);
378   rc= infocbv3(ip,rw,arg1,arg2fmt,arg3,al);
379   va_end(al);
380   return rc;
381 }
382   
383 static int infocb(Tcl_Interp *ip, Rw *rw, const char *arg1,
384                   const char *arg2fmt, ...) {
385   int rc;
386   va_list al;
387   va_start(al, arg2fmt);
388   rc= infocbv3(ip,rw,arg1,arg2fmt,0,al);
389   va_end(al);
390   return rc;
391 }
392   
393 /*---------- Opening ----------*/
394
395 static int cdbinit(Tcl_Interp *ip, Rw *rw) {
396   /* On entry, cdb_fd >=0 but cdb is _undefined_/
397    * On exit, either cdb_fd<0 or cdb is initialised */
398   int r, rc;
399   
400   r= cdb_init(&rw->cdb, rw->cdb_fd);
401   if (r) {
402     rc= cht_posixerr(ip, errno, "failed to initialise cdb reader");
403     close(rw->cdb_fd);  rw->cdb_fd= -1;  return rc;
404   }
405   return TCL_OK;
406 }
407
408 int cht_do_cdbwr_open(ClientData cd, Tcl_Interp *ip, const char *pathb,
409                       Tcl_Obj *on_info, Tcl_Obj *on_lexminval,
410                       void **result) {
411   const Cdbwr_SubCommand *subcmd= cd;
412   int r, rc, mainfd=-1;
413   Rw *rw;
414   struct stat stab;
415   off_t logrecstart, logjunkpos;
416
417   rw= TALLOC(sizeof(*rw));
418   rw->ix= -1;
419   ht_setup(&rw->logincore);
420   cht_scriptinv_init(&rw->on_info);
421   cht_scriptinv_init(&rw->on_lexminval);
422   rw->cdb_fd= rw->lock_fd= -1;  rw->logfile= 0;
423   pathbuf_init(&rw->pbsome, pathb);
424   pathbuf_init(&rw->pbother, pathb);
425   rw->autocompact= 1;
426
427   rc= cht_scriptinv_set(&rw->on_info, ip, on_info, 0);
428   if (rc) goto x_rc;
429
430   rc= cht_scriptinv_set(&rw->on_lexminval, ip, on_lexminval, 0);
431   if (rc) goto x_rc;
432
433   mainfd= open(pathbuf_sfx(&rw->pbsome,".main"), O_RDONLY);
434   if (mainfd<0) PE("open existing database file .main");
435   rc= acquire_lock(ip, &rw->pbsome, &rw->lock_fd);  if (rc) goto x_rc;
436
437   r= fstat(mainfd, &stab);  if (r) PE("fstat .main");
438   rw->mainsz= stab.st_size;
439
440   rw->cdb_fd= open(pathbuf_sfx(&rw->pbsome,".cdb"), O_RDONLY);
441   if (rw->cdb_fd >=0) {
442     rc= cdbinit(ip, rw);  if (rc) goto x_rc;
443   } else if (errno == ENOENT) {
444     if (rw->mainsz > 1) {
445       rc= cht_staticerr(ip, ".cdb does not exist but .main is >1byte -"
446                         " .cdb must have been accidentally deleted!",
447                         "CDB CDBMISSING");
448       goto x_rc;
449     }
450     /* fine */
451   } else {
452     PE("open .cdb");
453   }
454
455   rw->logfile= fopen(pathbuf_sfx(&rw->pbsome,".log"), "r+");
456   if (!rw->logfile) {
457     if (errno != ENOENT) PE("failed to open .log during open");
458     rw->logfile= fopen(rw->pbsome.buf, "w");
459     if (!rw->logfile) PE("create .log during (clean) open");
460   } else { /* rw->logfile */
461     r= fstat(fileno(rw->logfile), &stab);
462     if (r==-1) PE("fstat .log during open");
463     rc= infocb(ip, rw, "open-dirty-start", "log=%luby",
464                (unsigned long)stab.st_size);
465     if (rc) goto x_rc;
466
467     for (;;) {
468       logrecstart= ftello(rw->logfile);
469       if (logrecstart < 0) PE("ftello .log during (dirty) open");
470       r= readstorelogrecord(rw->logfile, &rw->logincore, 0,0, ht_update);
471       if (ferror(rw->logfile)) {
472         rc= cht_posixerr(ip, errno, "error reading .log during (dirty) open");
473         goto x_rc;
474       }
475       if (r==-1) {
476         break;
477       } else if (r==-2 || r==-3) {
478         char buf[100];
479         logjunkpos= ftello(rw->logfile);
480         if(logjunkpos<0) PE("ftello .log during report of junk in dirty open");
481
482         snprintf(buf,sizeof(buf), "CDB SYNTAX LOG %lu %lu",
483                  (unsigned long)logjunkpos, (unsigned long)logrecstart);
484
485         if (!(subcmd->flags & RWSCF_OKJUNK)) {
486           Tcl_SetObjErrorCode(ip, Tcl_NewStringObj(buf,-1));
487           snprintf(buf,sizeof(buf),"%lu",(unsigned long)logjunkpos);
488           Tcl_ResetResult(ip);
489           Tcl_AppendResult(ip, "syntax error (junk) in .log during"
490                            " (dirty) open, at file position ", buf, (char*)0);
491           rc= TCL_ERROR;
492           goto x_rc;
493         }
494         rc= infocb3(ip, rw, "open-dirty-junk", "errorfpos=%luby", buf,
495                     (unsigned long)logjunkpos);
496         if (rc) goto x_rc;
497
498         r= fseeko(rw->logfile, logrecstart, SEEK_SET);
499         if (r) PE("failed to fseeko .log before junk during dirty open");
500
501         r= ftruncate(fileno(rw->logfile), logrecstart);
502         if (r) PE("ftruncate .log to chop junk during dirty open");
503       } else {
504         assert(!r);
505       }
506     }
507   }
508   /* now log is positioned for appending and everything is read */
509
510   *result= rw;
511   maybe_close(mainfd);
512   return TCL_OK;
513
514  x_rc:
515   rw_close(0,rw);
516   TFREE(rw);
517   maybe_close(mainfd);
518   return rc;
519 }
520
521 int cht_do_cdbwr_open_okjunk(ClientData cd, Tcl_Interp *ip, const char *pathb,
522                       Tcl_Obj *on_info, Tcl_Obj *on_lexminval,
523                       void **result) {
524   return cht_do_cdbwr_open(cd,ip,pathb,on_info,on_lexminval,result);
525 }
526
527 /*==================== COMPACTION ====================*/
528
529 struct ht_forall_ctx {
530   struct cdb_make cdbm;
531   FILE *mainfile;
532   long *reccount;
533   int lexminvall;
534   const char *lexminval; /* may be invalid if lexminvall <= 0 */
535 };
536
537 /*---------- helper functions ----------*/
538
539 static int expiredp(const HashValue *val, struct ht_forall_ctx *a) {
540   int r, l;
541   if (!val->len || a->lexminvall<=0) return 0;
542   l= val->len < a->lexminvall ? val->len : a->lexminvall;
543   r= memcmp(val->data, a->lexminval, l);
544   if (r>0) return 0;
545   if (r<0) return 1;
546   return val->len < a->lexminvall;
547 }
548
549 static int delete_ifexpired(const char *key, HashValue *val,
550                             struct ht_forall_ctx *a) {
551   if (!expiredp(val, a)) return 0;
552   val->len= 0;
553   /* we don't actually need to realloc it to free the memory because
554    * this will shortly all be deleted as part of the compaction */
555   return 0;
556 }
557
558 static int addto_cdb(const char *key, HashValue *val,
559                      struct ht_forall_ctx *a) {
560   return cdb_make_add(&a->cdbm, key, strlen(key), val->data, val->len);
561 }
562
563 static int addto_main(const char *key, HashValue *val,
564                       struct ht_forall_ctx *a) {
565   (*a->reccount)++;
566   return writerecord(a->mainfile, key, val);
567 }
568
569 /*---------- compact main entrypoint ----------*/
570
571 static int compact_core(Tcl_Interp *ip, Rw *rw, unsigned long logsz,
572                         long *reccount_r) {
573   /* creates new .cdb and .main
574    * closes logfile
575    * leaves .log with old data
576    * leaves cdb fd open onto old db
577    * leaves logincore full of crap
578    */
579   int r, rc;
580   int cdbfd, cdbmaking;
581   off_t errpos, newmainsz;
582   char buf[100];
583   Tcl_Obj *res;
584   struct ht_forall_ctx a;
585
586   a.mainfile= 0;
587   cdbfd= -1;
588   cdbmaking= 0;
589   *reccount_r= 0;
590   a.reccount= reccount_r;
591
592   r= fclose(rw->logfile);
593   if (r) { rc= cht_posixerr(ip, errno, "probable data loss!  failed to fclose"
594                             " logfile during compact");  goto x_rc; }
595   rw->logfile= 0;
596   
597   rc= infocb(ip, rw, "compact-start", "log=%luby main=%luby",
598              logsz, (unsigned long)rw->mainsz);
599   if (rc) goto x_rc;
600
601   if (cht_scriptinv_interp(&rw->on_lexminval)) {
602     rc= cht_scriptinv_invoke_fg(&rw->on_lexminval, 0,0);
603     if (rc) goto x_rc;
604
605     res= Tcl_GetObjResult(ip);  assert(res);
606     a.lexminval= Tcl_GetStringFromObj(res, &a.lexminvall);
607     assert(a.lexminval);
608
609     /* we rely not calling Tcl_Eval during the actual compaction;
610      * if we did Tcl_Eval then the interp result would be trashed.
611      */
612     rc= ht_forall(&rw->logincore, delete_ifexpired, &a);
613
614   } else {
615     a.lexminvall= 0;
616   }
617
618   /* merge unsuperseded records from main into hash table */
619
620   a.mainfile= fopen(pathbuf_sfx(&rw->pbsome,".main"), "r");
621   if (!a.mainfile) PE("failed to open .main for reading during compact");
622
623   for (;;) {
624     r= readstorelogrecord(a.mainfile, &rw->logincore,
625                           expiredp, &a,
626                           ht_maybeupdate);
627     if (ferror(a.mainfile)) { rc= cht_posixerr(ip, errno, "error reading"
628                          " .main during compact"); goto x_rc; }
629     if (r==-3) {
630       break;
631     } else if (r==-1 || r==-2) {
632       errpos= ftello(a.mainfile);
633       if (errpos<0) PE("ftello .main during report of syntax error");
634       snprintf(buf,sizeof(buf), "CDB %s MAIN %lu",
635                r==-1 ? "TRUNCATED" : "SYNTAX", (unsigned long)errpos);
636       Tcl_SetObjErrorCode(ip, Tcl_NewStringObj(buf,-1));
637       snprintf(buf,sizeof(buf), "%lu", (unsigned long)errpos);
638       Tcl_ResetResult(ip);
639       Tcl_AppendResult(ip,
640                        r==-1 ? "unexpected eof (truncated file)"
641                        " in .main during compact, at file position "
642                        : "syntax error"
643                        " in .main during compact, at file position ",
644                        buf, (char*)0);
645       rc= TCL_ERROR;
646       goto x_rc;
647     } else {
648       assert(!rc);
649     }
650   }
651   fclose(a.mainfile);
652   a.mainfile= 0;
653
654   /* create new cdb */
655
656   cdbfd= open(pathbuf_sfx(&rw->pbsome,".tmp"), O_WRONLY|O_CREAT|O_TRUNC, 0666);
657   if (cdbfd<0) PE("create .tmp for new cdb during compact");
658
659   r= cdb_make_start(&a.cdbm, cdbfd);
660   if (r) PE("cdb_make_start during compact");
661   cdbmaking= 1;
662
663   r= ht_forall(&rw->logincore, addto_cdb, &a);
664   if (r) PE("cdb_make_add during compact");
665
666   r= cdb_make_finish(&a.cdbm);
667   if(r) PE("cdb_make_finish during compact");
668   cdbmaking= 0;
669
670   r= fdatasync(cdbfd);  if (r) PE("fdatasync new cdb during compact");
671   r= close(cdbfd);  if (r) PE("close new cdb during compact");
672   cdbfd= -1;
673
674   r= rename(rw->pbsome.buf, pathbuf_sfx(&rw->pbother,".cdb"));
675   if (r) PE("install new .cdb during compact");
676
677   /* create new main */
678
679   a.mainfile= fopen(pathbuf_sfx(&rw->pbsome,".tmp"), "w");
680   if (!a.mainfile) PE("create .tmp for new main during compact");
681
682   r= ht_forall(&rw->logincore, addto_main, &a);
683   if (r) { rc= cht_posixerr(ip, r, "error writing to new .main"
684                             " during compact");  goto x_rc; }
685
686   r= putc('\n', a.mainfile);
687   if (r==EOF) PE("write trailing \n to main during compact");
688   
689   r= fflush(a.mainfile);  if (r) PE("fflush new main during compact");
690   r= fdatasync(fileno(a.mainfile));
691   if (r) PE("fdatasync new main during compact");
692
693   newmainsz= ftello(a.mainfile);
694   if (newmainsz<0) PE("ftello new main during compact");
695   
696   r= fclose(a.mainfile);  if (r) PE("fclose new main during compact");
697   a.mainfile= 0;
698
699   r= rename(rw->pbsome.buf, pathbuf_sfx(&rw->pbother,".main"));
700   if (r) PE("install new .main during compact");
701
702   rw->mainsz= newmainsz;
703
704   /* done! */
705   
706   rc= infocb(ip, rw, "compact-end", "main=%luby nrecs=%ld",
707              (unsigned long)rw->mainsz, *a.reccount);
708   if (rc) goto x_rc;
709
710   return rc;
711
712 x_rc:
713   if (a.mainfile) fclose(a.mainfile);
714   if (cdbmaking) cdb_make_finish(&a.cdbm);
715   maybe_close(cdbfd);
716   remove(pathbuf_sfx(&rw->pbsome,".tmp")); /* for tidyness */
717   return rc;
718 }
719
720 /*---------- Closing ----------*/
721
722 static int compact_forclose(Tcl_Interp *ip, Rw *rw, long *reccount_r) {
723   off_t logsz;
724   int r, rc;
725
726   logsz= ftello(rw->logfile);
727   if (logsz < 0) PE("ftello logfile (during tidy close)");
728
729   rc= compact_core(ip, rw, logsz, reccount_r);  if (rc) goto x_rc;
730
731   r= remove(pathbuf_sfx(&rw->pbsome,".log"));
732   if (r) PE("remove .log (during tidy close)");
733
734   return TCL_OK;
735
736 x_rc: return rc;
737 }
738   
739 int cht_do_cdbwr_close(ClientData cd, Tcl_Interp *ip, void *rw_v) {
740   Rw *rw= rw_v;
741   int rc, rc_close;
742   long reccount= -1;
743   off_t logsz;
744
745   if (rw->autocompact) rc= compact_forclose(ip, rw, &reccount);
746   else rc= TCL_OK;
747
748   if (!rc) {
749     if (rw->logfile) {
750       logsz= ftello(rw->logfile);
751       if (logsz < 0)
752         rc= cht_posixerr(ip, errno, "ftell logfile during close info");
753       else
754         rc= infocb(ip, rw, "close", "main=%luby log=%luby",
755                    rw->mainsz, logsz);
756     } else if (reccount>=0) {
757       rc= infocb(ip, rw, "close", "main=%luby nrecs=%ld",
758                  rw->mainsz, reccount);
759     } else {
760       rc= infocb(ip, rw, "close", "main=%luby", rw->mainsz);
761     }
762   }
763   rc_close= rw_close(ip,rw);
764   if (rc_close) rc= rc_close;
765   
766   cht_tabledataid_disposing(ip, rw_v, &cdbtcl_rwdatabases);
767   TFREE(rw);
768   return rc;
769 }
770
771 /*---------- Other compaction-related entrypoints ----------*/
772
773 static int compact_keepopen(Tcl_Interp *ip, Rw *rw, int force) {
774   off_t logsz;
775   long reccount;
776   int rc, r;
777
778   logsz= ftello(rw->logfile);
779   if (logsz < 0) return cht_posixerr(ip, errno, "ftell .log"
780                                        " during compact check or force");
781
782   if (!force && logsz < rw->mainsz / 3 + 1000) return TCL_OK;
783   /* Test case:                    ^^^ testing best value for this
784    *   main=9690434by nrecs=122803  read all in one go
785    *  no autocompact, :    6.96user 0.68system 0:08.93elapsed
786    *  auto, mulitplier 2:  7.10user 0.79system 0:09.54elapsed
787    *  auto, unity:         7.80user 0.98system 0:11.84elapsed
788    *  auto, divisor  2:    8.23user 1.05system 0:13.30elapsed
789    *  auto, divisor  3:    8.55user 1.12system 0:12.88elapsed
790    *  auto, divisor  5:    9.95user 1.43system 0:15.72elapsed
791    */
792
793   rc= compact_core(ip, rw, logsz, &reccount);  if (rc) goto x_rc;
794
795   maybe_close(rw->cdb_fd);
796   rw->cdb_fd= -1;
797   ht_destroy(&rw->logincore);
798   ht_setup(&rw->logincore);
799
800   rw->cdb_fd= open(pathbuf_sfx(&rw->pbsome,".cdb"), O_RDONLY);
801   if (rw->cdb_fd < 0) PE("reopen .cdb after compact");
802
803   rc= cdbinit(ip, rw);  if (rc) goto x_rc;
804
805   rw->logfile= fopen(pathbuf_sfx(&rw->pbsome,".log"), "w");
806   if (!rw->logfile) PE("reopen .log after compact");
807
808   r= fsync(fileno(rw->logfile));  if (r) PE("fsync .log after compact reopen");
809
810   return TCL_OK;
811
812 x_rc:
813   /* doom! all updates fail after this (because rw->logfile is 0), and
814    * we may be using a lot more RAM than would be ideal.  Program will
815    * have to reopen if it really wants sanity. */
816   return rc;
817 }
818
819 int cht_do_cdbwr_compact_force(ClientData cd, Tcl_Interp *ip, void *rw_v) {
820   return compact_keepopen(ip, rw_v, 1);
821 }
822 int cht_do_cdbwr_compact_check(ClientData cd, Tcl_Interp *ip, void *rw_v) {
823   return compact_keepopen(ip, rw_v, 0);
824 }
825
826 int cht_do_cdbwr_compact_explicit(ClientData cd, Tcl_Interp *ip, void *rw_v) {
827   Rw *rw= rw_v;
828   rw->autocompact= 0;
829   return TCL_OK;
830 }
831 int cht_do_cdbwr_compact_auto(ClientData cd, Tcl_Interp *ip, void *rw_v) {
832   Rw *rw= rw_v;
833   rw->autocompact= 1;
834   return TCL_OK;
835 }
836
837 /*---------- Updateing ----------*/
838
839 static int update(Tcl_Interp *ip, Rw *rw, const char *key,
840                   const Byte *data, int dlen) {
841   HashValue *val;
842   int rc, r;
843
844   if (!rw->logfile) return cht_staticerr
845     (ip, "previous compact failed; cdbwr must be closed and reopened "
846      "before any further updates", "CDB BROKEN");
847   
848   val= htv_prep(dlen);  assert(val);
849   memcpy(htv_fillptr(val), data, dlen);
850
851   r= writerecord(rw->logfile, key, val);
852   if (!r) r= fflush(rw->logfile);
853   if (r) PE("write update to logfile");
854
855   ht_update(&rw->logincore, key, val);
856
857   if (!rw->autocompact) return TCL_OK;
858   return compact_keepopen(ip, rw, 0);
859
860  x_rc:
861   TFREE(val);
862   return rc;
863 }  
864
865 int cht_do_cdbwr_update(ClientData cd, Tcl_Interp *ip,
866                         void *rw_v, const char *key, Tcl_Obj *value) {
867   int dlen;
868   const char *data;
869   data= Tcl_GetStringFromObj(value, &dlen);  assert(data);
870   return update(ip, rw_v, key, data, dlen);
871 }
872
873 int cht_do_cdbwr_update_hb(ClientData cd, Tcl_Interp *ip,
874                            void *rw_v, const char *key, HBytes_Value value) {
875   return update(ip, rw_v, key, cht_hb_data(&value), cht_hb_len(&value));
876 }
877
878 int cht_do_cdbwr_delete(ClientData cd, Tcl_Interp *ip, void *rw_v,
879                         const char *key) {
880   return update(ip, rw_v, key, 0, 0);
881 }
882
883 /*---------- Lookups ----------*/
884
885 static int lookup_rw(Tcl_Interp *ip, void *rw_v, const char *key,
886                     const Byte **data_r, int *len_r /* -1 => notfound */) {
887   Rw *rw= rw_v;
888   const HashValue *val;
889
890   val= ht_lookup(&rw->logincore, key);
891   if (val) {
892     if (val->len) { *data_r= val->data; *len_r= val->len; return TCL_OK; }
893     else { *data_r= 0; *len_r= -1; return TCL_OK; }
894   }
895
896   return cht_cdb_lookup_cdb(ip, &rw->cdb, key, strlen(key), data_r, len_r);
897
898
899 int cht_do_cdbwr_lookup(ClientData cd, Tcl_Interp *ip, void *rw_v,
900                         const char *key, Tcl_Obj *def,
901                         Tcl_Obj **result) {
902   const Byte *data;
903   int dlen, r;
904   
905   r= lookup_rw(ip, rw_v, key, &data, &dlen);  if (r) return r;
906   return cht_cdb_donesomelookup(ip, rw_v, def, result, data, dlen,
907                                 cht_cdb_storeanswer_string);
908 }
909   
910 int cht_do_cdbwr_lookup_hb(ClientData cd, Tcl_Interp *ip, void *rw_v,
911                            const char *key, Tcl_Obj *def,
912                            Tcl_Obj **result) {
913   const Byte *data;
914   int dlen, r;
915   
916   r= lookup_rw(ip, rw_v, key, &data, &dlen);  if (r) return r;
917   return cht_cdb_donesomelookup(ip, rw_v, def, result, data, dlen,
918                                 cht_cdb_storeanswer_hb);
919 }
920
921 int cht_do_cdbtoplevel_cdb_wr(ClientData cd, Tcl_Interp *ip,
922                               const Cdbwr_SubCommand* subcmd,
923                               int objc, Tcl_Obj *const *objv) {
924   return subcmd->func((void*)subcmd,ip,objc,objv);
925 }