chiark / gitweb /
lookup wip; see cvs diff from 1.12 before checkin
[chiark-tcl.git] / cdb / writeable.c
1 /**/
2
3 #include "chiark_tcl_cdb.h"
4
5 #define ftello ftell
6 #define fseeko fseek
7
8 /*---------- Forward declarations ----------*/
9
10 struct ht_forall_ctx;
11
12 /*---------- Useful routines ----------*/
13
14 static void maybe_close(int fd) {
15   if (fd>=0) close(fd);
16 }
17
18 #define PE(m) do{                                               \
19     rc= cht_posixerr(ip, errno, "failed to " m); goto x_rc;     \
20   }while(0)
21
22 /*==================== Subsystems and subtypes ====================*/
23
24 /*---------- Pathbuf ----------*/
25
26 typedef struct Pathbuf {
27   char *buf, *sfx;
28 } Pathbuf;
29
30 #define MAX_SUFFIX 4
31
32 static void pathbuf_init(Pathbuf *pb, const char *pathb) {
33   int l= strlen(pathb);
34   pb->buf= TALLOC(l + 4);
35   memcpy(pb->buf, pathb, l);
36   pb->sfx= pb->buf + l;
37   *pb->sfx++= '.';
38 }
39 static const char *pathbuf_sfx(Pathbuf *pb, const char *suffix) {
40   assert(strlen(suffix) <= MAX_SUFFIX);
41   strcpy(pb->sfx, suffix);
42   return pb->buf;
43 }
44 static void pathbuf_free(Pathbuf *pb) {
45   TFREE(pb->buf);
46   pb->buf= 0;
47 }
48
49 /*---------- Our hash table ----------*/
50
51 typedef struct HashTable {
52   Tcl_HashTable t;
53   Byte padding[128]; /* allow for expansion by Tcl, urgh */
54   Byte confound[16];
55 } HashTable;
56
57 typedef struct HashValue {
58   int len;
59   Byte data[1];
60 } HashValue;
61
62 static HashValue *htv_prep(int len) {
63   HashValue *hd;
64   hd= TALLOC((hd->data - (Byte*)hd) + len);
65   hd->len= len;
66   return hd;
67 }  
68 static Byte *htv_fillptr(HashValue *hd) {
69   return hd->data;
70 }
71
72 static void ht_setup(HashTable *ht) {
73   Tcl_InitHashTable(&ht->t, TCL_STRING_KEYS);
74 }
75 static void ht_update(HashTable *ht, const char *key, HashValue *val_eat) {
76   Tcl_HashEntry *he;
77   int new;
78
79   he= Tcl_CreateHashEntry(&ht->t, (char*)key, &new);
80   if (!new) TFREE(Tcl_GetHashValue(he));
81   Tcl_SetHashValue(he, val_eat);
82     /* eats the value since the data structure owns the memory */
83 }
84 static void ht_maybeupdate(HashTable *ht, const char *key,
85                            HashValue *val_eat) {
86   /* like ht_update except does not overwrite existing values */
87   Tcl_HashEntry *he;
88   int new;
89
90   he= Tcl_CreateHashEntry(&ht->t, (char*)key, &new);
91   if (!new) { TFREE(val_eat); return; }
92   Tcl_SetHashValue(he, val_eat);
93 }
94
95 static const HashValue *ht_lookup(HashTable *ht, const char *key) {
96   Tcl_HashEntry *he;
97   const HashValue *htv;
98   
99   he= Tcl_FindHashEntry(ht, key);
100   if (!he) return 0;
101   
102   return Tcl_GetHashValue(he);
103 }
104
105 static int ht_forall(HashTable *ht,
106                      int (*fn)(const char *key, HashValue *val,
107                                struct ht_forall_ctx *ctx),
108                      struct ht_forall_ctx *ctx) {
109   /* Returns first positive value returned by any call to fn, or 0. */
110   Tcl_HashSearch sp;
111   Tcl_HashEntry *he;
112   const char *key;
113   HashValue *val;
114   int r;
115   
116   for (he= Tcl_FirstHashEntry(&ht->t, &sp);
117        he;
118        he= Tcl_NextHashEntry(&sp)) {
119     val= Tcl_GetHashValue(he);
120     if (!val->len) continue;
121
122     key= Tcl_GetHashKey(&ht->t, he);
123     
124     r= fn(key, val, ctx);
125     if (r) return r;
126   }
127   return 0;
128 }
129
130 static void ht_destroy(HashTable *ht) {
131   Tcl_HashSearch sp;
132   Tcl_HashEntry *he;
133   
134   for (he= Tcl_FirstHashEntry(&ht->t, &sp);
135        he;
136        he= Tcl_NextHashEntry(&sp)) {
137     /* ht_forall skips empty (deleted) entries so is no good for this */
138     TFREE(Tcl_GetHashValue(he));
139   }
140   Tcl_DeleteHashTable(&ht->t);
141 }
142
143 /*==================== Existential ====================*/
144
145 /*---------- Rw data structure ----------*/
146
147 typedef struct Rw {
148   int ix, autocompact;
149   int cdb_fd, lock_fd;
150   struct cdb cdb; /* valid iff cdb_fd >= 0 */
151   FILE *logfile;
152   HashTable logincore;
153   Pathbuf pbsome, pbother;
154   off_t mainsz;
155   ScriptToInvoke on_info, on_lexminval;
156 } Rw;
157
158 static int rw_close(Tcl_Interp *ip, Rw *rw) {
159   int rc, r;
160
161   rc= TCL_OK;
162   ht_destroy(&rw->logincore);
163   if (rw->cdb_fd >= 0) cdb_free(&rw->cdb);
164   maybe_close(rw->cdb_fd);
165   maybe_close(rw->lock_fd);
166
167   if (rw->logfile) {
168     r= fclose(rw->logfile);
169     if (r && ip) { rc= cht_posixerr(ip, errno, "probable data loss! failed to"
170                                     " fclose logfile during untidy close"); }
171   }
172
173   pathbuf_free(&rw->pbsome); pathbuf_free(&rw->pbother);
174   TFREE(rw);
175   return rc;
176 }
177
178 static void destroy_cdbrw_idtabcb(Tcl_Interp *ip, void *rw) { rw_close(0,rw); }
179 const IdDataSpec cdbtcl_rwdatabases= {
180   "cdb-rwdb", "cdb-openrwdatabases-table", destroy_cdbrw_idtabcb
181 };
182
183 /*---------- File handling ----------*/
184
185 static int acquire_lock(Tcl_Interp *ip, Pathbuf *pb, int *lockfd_r) {
186   /* *lockfd_r must be -1 on entry.  If may be set to >=0 even
187    * on error, and must be closed by the caller. */
188   mode_t um, lockmode;
189   struct flock fl;
190   int r;
191
192   um= umask(~(mode_t)0);
193   umask(um);
194
195   lockmode= 0666 & ~((um & 0444)>>1);
196   /* Remove r where umask would remove w;
197    * eg umask intending 0664 here gives 0660 */
198   
199   *lockfd_r= open(pathbuf_sfx(pb,".lock"), O_RDONLY|O_CREAT, lockmode);
200   if (*lockfd_r < 0)
201     return cht_posixerr(ip, errno, "could not open/create lockfile");
202
203   fl.l_type= F_WRLCK;
204   fl.l_whence= SEEK_SET;
205   fl.l_start= 0;
206   fl.l_len= 0;
207   fl.l_pid= getpid();
208
209   r= fcntl(*lockfd_r, F_SETLK, &fl);
210   if (r == -1) {
211     if (errno == EACCES || errno == EAGAIN)
212       return cht_staticerr(ip, "lock held by another process", "CDB LOCKED");
213     else return cht_posixerr(ip, errno, "unexpected error from fcntl while"
214                              " acquiring lock");
215   }
216
217   return TCL_OK;
218 }
219
220 /*---------- Log reading and writing ----------*/
221
222 static int readlognum(FILE *f, int delim, int *num_r) {
223   int c;
224   char numbuf[20], *p, *ep;
225   unsigned long ul;
226
227   p= numbuf;
228   for (;;) {
229     c= getc(f);  if (c==EOF) return -2;
230     if (c == delim) break;
231     if (!isdigit((unsigned char)c)) return -2;
232     *p++= c;
233     if (p == numbuf+sizeof(numbuf)) return -2;
234   }
235   if (p == numbuf) return -2;
236   *p= 0;
237
238   errno=0; ul= strtoul(numbuf, &ep, 10);
239   if (*ep || errno || ul >= INT_MAX/2) return -2;
240   *num_r= ul;
241   return 0;
242 }
243
244 static int readstorelogrecord(FILE *f, HashTable *ht,
245                               int (*omitfn)(const HashValue*,
246                                             struct ht_forall_ctx *ctx),
247                               struct ht_forall_ctx *ctx,
248                               void (*updatefn)(HashTable*, const char*,
249                                                HashValue*)) {
250   /* returns:
251    *      0     for OK
252    *     -1     eof
253    *     -2     corrupt or error
254    *     -3     got newline indicating end
255    *     >0     value from omitfn
256    */
257   int keylen, vallen;
258   char *key;
259   HashValue *val;
260   int c, rc, r;
261
262   c= getc(f);
263   if (c==EOF) { if (feof(f)) return -1; return -2; }
264   if (c=='\n') return -3;
265   if (c!='+') return -2;
266
267   rc= readlognum(f, ',', &keylen);  if (rc) return rc;
268   rc= readlognum(f, ':', &vallen);  if (rc) return rc;
269
270   key= TALLOC(keylen+1);
271   val= htv_prep(vallen);
272
273   r= fread(key, 1,keylen, f);
274   if (r!=keylen) goto x2_free_keyval;
275   if (memchr(key,0,keylen)) goto x2_free_keyval;
276   key[keylen]= 0;
277
278   c= getc(f);  if (c!='-') goto x2_free_keyval;
279   c= getc(f);  if (c!='>') goto x2_free_keyval;
280   
281   r= fread(htv_fillptr(val), 1,vallen, f);
282   if (r!=vallen) goto x2_free_keyval;
283
284   rc= omitfn ? omitfn(val, ctx) : TCL_OK;
285   if (rc) { assert(rc>0); TFREE(val); }
286   else updatefn(ht, key, val);
287   
288   TFREE(key);
289   return rc;
290
291  x2_free_keyval:
292   TFREE(val);
293   TFREE(key);
294   return -2;
295 }
296
297 static int writerecord(FILE *f, const char *key, const HashValue *val) {
298   int r;
299
300   r= fprintf(f, "+%d,%d:%s->", strlen(key), val->len, key);
301   if (r<0) return -1;
302   
303   r= fwrite(val->data, 1, val->len, f);
304   if (r != val->len) return -1;
305
306   return 0;
307 }
308
309 /*---------- Creating ----------*/
310
311 int cht_do_cdbwr_create_empty(ClientData cd, Tcl_Interp *ip,
312                               const char *pathb) {
313   static const char *const toremoves[]= {
314     ".main", ".cdb", ".log", ".tmp", 0
315   };
316
317   Pathbuf pb;
318   int lock_fd=-1, fd=-1, rc, r;
319   const char *const *toremove;
320
321   pathbuf_init(&pb, pathb);
322   rc= acquire_lock(ip, &pb, &lock_fd);  if (rc) goto x_rc;
323   
324   fd= open(pathbuf_sfx(&pb, ".main"), O_RDWR|O_CREAT|O_EXCL, 0666);
325   if (fd <= 0) PE("create new database file");
326
327   for (toremove=toremoves; *toremove; toremove++) {
328     r= remove(*toremove);
329     if (r && errno != ENOENT)
330       PE("delete possible spurious file during creation");
331   }
332   
333   rc= TCL_OK;
334
335  x_rc:
336   maybe_close(fd);
337   maybe_close(lock_fd);
338   pathbuf_free(&pb);
339   return rc;
340 }
341
342 /*---------- Info callbacks ----------*/
343
344 static int infocbv3(Tcl_Interp *ip, Rw *rw, const char *arg1,
345                     const char *arg2fmt, const char *arg3, va_list al) {
346   Tcl_Obj *aa[3];
347   int na;
348   char buf[200];
349   vsnprintf(buf, sizeof(buf), arg2fmt, al);
350
351   na= 0;
352   aa[na++]= cht_ret_string(ip, arg1);
353   aa[na++]= cht_ret_string(ip, buf);
354   if (arg3) aa[na++]= cht_ret_string(ip, arg3);
355   
356   return cht_scriptinv_invoke_fg(&rw->on_info, na, aa);
357 }
358   
359 static int infocb3(Tcl_Interp *ip, Rw *rw, const char *arg1,
360                    const char *arg2fmt, const char *arg3, ...) {
361   int rc;
362   va_list al;
363   va_start(al, arg3);
364   rc= infocbv3(ip,rw,arg1,arg2fmt,arg3,al);
365   va_end(al);
366   return rc;
367 }
368   
369 static int infocb(Tcl_Interp *ip, Rw *rw, const char *arg1,
370                   const char *arg2fmt, ...) {
371   int rc;
372   va_list al;
373   va_start(al, arg2fmt);
374   rc= infocbv3(ip,rw,arg1,arg2fmt,0,al);
375   va_end(al);
376   return rc;
377 }
378   
379 /*---------- Opening ----------*/
380
381 static int cdbinit(Tcl_Interp *ip, Rw *rw) {
382   /* On entry, cdb_fd >=0 but cdb is _undefined_/
383    * On exit, either cdb_fd<0 or cdb is initialised */
384   int r, rc;
385   
386   r= cdb_init(&rw->cdb, rw->cdb_fd);
387   if (r) {
388     rc= cht_posixerr(ip, errno, "failed to initialise cdb reader");
389     close(rw->cdb_fd);  rw->cdb_fd= -1;  return rc;
390   }
391   return TCL_OK;
392 }
393
394 int cht_do_cdbwr_open(ClientData cd, Tcl_Interp *ip, const char *pathb,
395                       Tcl_Obj *on_info, Tcl_Obj *on_lexminval,
396                       void **result) {
397   const Cdbwr_SubCommand *subcmd= cd;
398   int r, rc, mainfd=-1;
399   Rw *rw;
400   struct stat stab;
401   off_t logrecstart, logjunkpos;
402
403   rw= TALLOC(sizeof(*rw));
404   ht_setup(&rw->logincore);
405   cht_scriptinv_init(&rw->on_info);
406   cht_scriptinv_init(&rw->on_lexminval);
407   rw->cdb_fd= rw->lock_fd= -1;  rw->logfile= 0;
408   pathbuf_init(&rw->pbsome, pathb);
409   pathbuf_init(&rw->pbother, pathb);
410   rw->autocompact= 1;
411
412   if (on_lexminval) {
413     rc= cht_scriptinv_set(&rw->on_lexminval, ip, on_lexminval, 0);
414     if (rc) goto x_rc;
415   } else {
416     rw->on_lexminval.llength= 0;
417   }
418
419   mainfd= open(pathbuf_sfx(&rw->pbsome,".main"), O_RDONLY);
420   if (mainfd<0) PE("open exist3ing database file .main");
421   rc= acquire_lock(ip, &rw->pbsome, &rw->lock_fd);  if (rc) goto x_rc;
422
423   r= fstat(mainfd, &stab);  if (r) PE("fstat .main");
424   rw->mainsz= stab.st_size;
425
426   rw->cdb_fd= open(pathbuf_sfx(&rw->pbsome,".cdb"), O_RDONLY);
427   if (rw->cdb_fd >=0) {
428     rc= cdbinit(ip, rw);  if (rc) goto x_rc;
429   } else if (errno == ENOENT) {
430     if (rw->mainsz) {
431       rc= cht_staticerr(ip, ".cdb does not exist but .main is nonempty -"
432                         " .cdb must have been accidentally deleted!",
433                         "CDB CDBMISSING");
434       goto x_rc;
435     }
436     /* fine */
437   } else {
438     PE("open .cdb");
439   }
440
441   rw->logfile= fopen(pathbuf_sfx(&rw->pbsome,".log"), "r+");
442   if (!rw->logfile) {
443     if (errno != ENOENT) PE("failed to open .log during open");
444     rw->logfile= fopen(rw->pbsome.buf, "w");
445     if (!rw->logfile) PE("create .log during (clean) open");
446   } else { /* rw->logfile */
447     r= fstat(fileno(rw->logfile), &stab);
448     if (r==-1) PE("fstat .log during open");
449     rc= infocb(ip, rw, "open-dirty-start", "log=%luby",
450                (unsigned long)stab.st_size);
451     if (rc) goto x_rc;
452
453     for (;;) {
454       logrecstart= ftello(rw->logfile);
455       if (logrecstart < 0) PE("ftello .log during (dirty) open");
456       r= readstorelogrecord(rw->logfile, &rw->logincore, 0,0, ht_update);
457       if (ferror(rw->logfile)) {
458         rc= cht_posixerr(ip, errno, "error reading .log during (dirty) open");
459         goto x_rc;
460       }
461       if (r==-1) {
462         break;
463       } else if (r==-2 || r==-3) {
464         char buf[100];
465         logjunkpos= ftello(rw->logfile);
466         if(logjunkpos<0) PE("ftello .log during report of junk in dirty open");
467
468         snprintf(buf,sizeof(buf), "CDB SYNTAX LOG %lu %lu",
469                  (unsigned long)logjunkpos, (unsigned long)logrecstart);
470
471         if (!(subcmd->flags & RWSCF_OKJUNK)) {
472           Tcl_SetObjErrorCode(ip, Tcl_NewStringObj(buf,-1));
473           snprintf(buf,sizeof(buf),"%lu",(unsigned long)logjunkpos);
474           Tcl_ResetResult(ip);
475           Tcl_AppendResult(ip, "syntax error (junk) in .log during"
476                            " (dirty) open, at file position ", buf, (char*)0);
477           rc= TCL_ERROR;
478           goto x_rc;
479         }
480         rc= infocb3(ip, rw, "open-dirty-junk", "errorfpos=%luby", buf,
481                     (unsigned long)logjunkpos);
482         if (rc) goto x_rc;
483
484         r= fseeko(rw->logfile, logrecstart, SEEK_SET);
485         if (r) PE("failed to fseeko .log before junk during dirty open");
486
487         r= ftruncate(fileno(rw->logfile), logrecstart);
488         if (r) PE("ftruncate .log to chop junk during dirty open");
489       } else {
490         assert(!r);
491       }
492     }
493   }
494   /* now log is positioned for appending and everything is read */
495
496   *result= rw;
497   maybe_close(mainfd);
498   return TCL_OK;
499
500  x_rc:
501   rw_close(0,rw);
502   maybe_close(mainfd);
503   return rc;
504 }
505
506 int cht_do_cdbwr_open_okjunk(ClientData cd, Tcl_Interp *ip, const char *pathb,
507                       Tcl_Obj *on_info, Tcl_Obj *on_lexminval,
508                       void **result) {
509   return cht_do_cdbwr_open(cd,ip,pathb,on_info,on_lexminval,result);
510 }
511
512 /*==================== COMPACTION ====================*/
513
514 struct ht_forall_ctx {
515   struct cdb_make cdbm;
516   FILE *mainfile;
517   int lexminvall;
518   long *reccount;
519   const char *lexminval;
520 };
521
522 /*---------- helper functions ----------*/
523
524 static int expiredp(const HashValue *val, struct ht_forall_ctx *a) {
525   int r, l;
526   if (!val->len) return 0;
527   l= val->len < a->lexminvall ? val->len : a->lexminvall;
528   r= memcmp(val->data, a->lexminval, l);
529   if (r>0) return 0;
530   if (r<0) return 1;
531   return val->len < a->lexminvall;
532 }
533
534 static int delete_ifexpired(const char *key, HashValue *val,
535                             struct ht_forall_ctx *a) {
536   if (!expiredp(val, a)) return 0;
537   val->len= 0;
538   /* we don't actually need to realloc it to free the memory because
539    * this will shortly all be deleted as part of the compaction */
540   return 0;
541 }
542
543 static int addto_cdb(const char *key, HashValue *val,
544                      struct ht_forall_ctx *a) {
545   return cdb_make_add(&a->cdbm, key, strlen(key), val->data, val->len);
546 }
547
548 static int addto_main(const char *key, HashValue *val,
549                       struct ht_forall_ctx *a) {
550   (*a->reccount)++;
551   return writerecord(a->mainfile, key, val);
552 }
553
554 /*---------- compact main entrypoint ----------*/
555
556 static int compact_core(Tcl_Interp *ip, Rw *rw, unsigned long logsz,
557                         long *reccount_r) {
558   /* creates new .cdb and .main
559    * closes logfile
560    * leaves .log with old data
561    * leaves cdb fd open onto old db
562    * leaves logincore full of crap
563    */
564   int r, rc;
565   int cdbfd, cdbmaking;
566   off_t errpos, newmainsz;
567   char buf[100];
568   Tcl_Obj *res;
569   struct ht_forall_ctx a;
570
571   a.mainfile= 0;
572   cdbfd= -1;
573   cdbmaking= 0;
574   *reccount_r= 0;
575   a.reccount= reccount_r;
576
577   r= fclose(rw->logfile);
578   if (r) { rc= cht_posixerr(ip, errno, "probable data loss!  failed to fclose"
579                             " logfile during compact");  goto x_rc; }
580   rw->logfile= 0;
581   
582   rc= infocb(ip, rw, "compact-start", "log=%luby main=%luby",
583              logsz, (unsigned long)rw->mainsz);
584   if (rc) goto x_rc;
585
586   if (rw->on_lexminval.llength) {
587     rc= cht_scriptinv_invoke_fg(&rw->on_lexminval, 0,0);
588     if (rc) goto x_rc;
589
590     res= Tcl_GetObjResult(ip);  assert(res);
591     a.lexminval= Tcl_GetStringFromObj(res, &a.lexminvall);
592     assert(a.lexminval);
593
594     /* we rely not calling Tcl_Eval during the actual compaction;
595      * if we did Tcl_Eval then the interp result would be trashed.
596      */
597     rc= ht_forall(&rw->logincore, delete_ifexpired, &a);
598
599   } else {
600     a.lexminval= "";
601   }
602
603   /* merge unsuperseded records from main into hash table */
604
605   a.mainfile= fopen(pathbuf_sfx(&rw->pbsome,".main"), "r");
606   if (!a.mainfile) PE("failed to open .main for reading during compact");
607
608   for (;;) {
609     r= readstorelogrecord(a.mainfile, &rw->logincore,
610                           expiredp, &a,
611                           ht_maybeupdate);
612     if (ferror(a.mainfile)) { rc= cht_posixerr(ip, errno, "error reading"
613                          " .main during compact"); goto x_rc;
614     }
615     if (r==-3) {
616       break;
617     } else if (r==-1 || r==-2) {
618       errpos= ftello(a.mainfile);
619       if (errpos<0) PE("ftello .main during report of syntax error");
620       snprintf(buf,sizeof(buf), "CDB SYNTAX MAIN %lu", (unsigned long)errpos);
621       Tcl_SetObjErrorCode(ip, Tcl_NewStringObj(buf,-1));
622       snprintf(buf,sizeof(buf), "%lu", (unsigned long)errpos);
623       Tcl_ResetResult(ip);
624       Tcl_AppendResult(ip, "syntax error in .main during"
625                        " compact, at file position ", buf, (char*)0);
626       rc= TCL_ERROR;
627       goto x_rc;
628     } else {
629       assert(!rc);
630     }
631   }
632   fclose(a.mainfile);
633   a.mainfile= 0;
634
635   /* create new cdb */
636
637   cdbfd= open(pathbuf_sfx(&rw->pbsome,".tmp"), O_WRONLY|O_CREAT|O_TRUNC, 0666);
638   if (cdbfd<0) PE("create .tmp for new cdb during compact");
639
640   r= cdb_make_start(&a.cdbm, cdbfd);
641   if (r) PE("cdb_make_start during compact");
642   cdbmaking= 1;
643
644   r= ht_forall(&rw->logincore, addto_cdb, &a);
645   if (r) PE("cdb_make_add during compact");
646
647   r= cdb_make_finish(&a.cdbm);
648   if(r) PE("cdb_make_finish during compact");
649   cdbmaking= 0;
650
651   r= fdatasync(cdbfd);  if (r) PE("fdatasync new cdb during compact");
652   r= close(cdbfd);  if (r) PE("close new cdb during compact");
653   cdbfd= -1;
654
655   r= rename(rw->pbsome.buf, pathbuf_sfx(&rw->pbother,".cdb"));
656   if (r) PE("install new .cdb during compact");
657
658   /* create new main */
659
660   a.mainfile= fopen(pathbuf_sfx(&rw->pbsome,".tmp"), "w");
661   if (!a.mainfile) PE("create .tmp for new main during compact");
662
663   r= ht_forall(&rw->logincore, addto_main, &a);
664   if (r) { rc= cht_posixerr(ip, r, "error writing to new .main"
665                             " during compact");  goto x_rc; }
666   
667   r= fflush(a.mainfile);  if (r) PE("fflush new main during compact");
668   r= fdatasync(fileno(a.mainfile));
669   if (r) PE("fdatasync new main during compact");
670
671   newmainsz= ftello(a.mainfile);
672   if (newmainsz<0) PE("ftello new main during compact");
673   
674   r= fclose(a.mainfile);  if (r) PE("fclose new main during compact");
675   a.mainfile= 0;
676
677   r= rename(rw->pbsome.buf, pathbuf_sfx(&rw->pbother,".main"));
678   if (r) PE("install new .main during compact");
679
680   rw->mainsz= newmainsz;
681
682   /* done! */
683   
684   rc= infocb(ip, rw, "compact-end", "main=%luby nrecs=%l",
685              (unsigned long)rw->mainsz, *a.reccount);
686   if (rc) goto x_rc;
687
688   return rc;
689
690 x_rc:
691   if (a.mainfile) fclose(a.mainfile);
692   if (cdbmaking) cdb_make_finish(&a.cdbm);
693   maybe_close(cdbfd);
694   remove(pathbuf_sfx(&rw->pbsome,".tmp")); /* for tidyness */
695   return rc;
696 }
697
698 /*---------- Closing ----------*/
699
700 static int compact_forclose(Tcl_Interp *ip, Rw *rw, long *reccount_r) {
701   off_t logsz;
702   int r, rc;
703
704   logsz= ftello(rw->logfile);
705   if (logsz < 0) PE("ftello logfile (during tidy close)");
706
707   rc= compact_core(ip, rw, logsz, reccount_r);  if (rc) goto x_rc;
708
709   r= remove(pathbuf_sfx(&rw->pbsome,".log"));
710   if (r) PE("remove .log (during tidy close)");
711
712   return TCL_OK;
713
714 x_rc: return rc;
715 }
716   
717 int cht_do_cdbwr_close(ClientData cd, Tcl_Interp *ip, void *rw_v) {
718   Rw *rw= rw_v;
719   int rc, rc_close;
720   long reccount= -1;
721   off_t logsz;
722
723   if (rw->autocompact) rc= compact_forclose(ip, rw, &reccount);
724   else rc= TCL_OK;
725
726   if (!rc) {
727     if (!rw->logfile) {
728       logsz= ftello(rw->logfile);
729       if (logsz < 0)
730         rc= cht_posixerr(ip, errno, "ftell logfile during close info");
731       else
732         rc= infocb(ip, rw, "close", "main=%luby log=%luby",
733                    rw->mainsz, logsz);
734     } else if (reccount>=0) {
735       rc= infocb(ip, rw, "close", "main=%luby nrecs=%l", rw->mainsz, reccount);
736     } else {
737       rc= infocb(ip, rw, "close", "main=%luby", rw->mainsz);
738     }
739   }
740   rc_close= rw_close(ip,rw);
741   if (rc_close) rc= rc_close;
742   
743   cht_tabledataid_disposing(ip, rw_v, &cdbtcl_rwdatabases);
744   return rc;
745 }
746
747 /*---------- Other compaction-related entrypoints ----------*/
748
749 static int compact_keepopen(Tcl_Interp *ip, Rw *rw, int force) {
750   off_t logsz;
751   long reccount;
752   int rc, r;
753
754   logsz= ftello(rw->logfile);
755   if (logsz < 0) return cht_posixerr(ip, errno, "ftell .log"
756                                        " during compact check or force");
757
758   if (!force && logsz < rw->mainsz / 10 + 1000) return TCL_OK;
759
760   rc= compact_core(ip, rw, logsz, &reccount);  if (rc) goto x_rc;
761
762   maybe_close(rw->cdb_fd);
763   rw->cdb_fd= -1;
764   ht_destroy(&rw->logincore);
765   ht_setup(&rw->logincore);
766
767   rw->cdb_fd= open(pathbuf_sfx(&rw->pbsome,".cdb"), O_RDONLY);
768   if (rw->cdb_fd < 0) PE("reopen .cdb after compact");
769
770   rc= cdbinit(ip, rw);  if (rc) goto x_rc;
771
772   rw->logfile= fopen(pathbuf_sfx(&rw->pbsome,".log"), "w");
773   if (!rw->logfile) PE("reopen .log after compact");
774
775   r= fsync(fileno(rw->logfile));  if (r) PE("fsync .log after compact reopen");
776
777   return TCL_OK;
778
779 x_rc:
780   /* doom! all updates fail after this (because rw->logfile is 0), and
781    * we may be using a lot more RAM than would be ideal.  Program will
782    * have to reopen if it really wants sanity. */
783   return rc;
784 }
785
786 int cht_do_cdbwr_compact_force(ClientData cd, Tcl_Interp *ip, void *rw_v) {
787   return compact_keepopen(ip, rw_v, 1);
788 }
789 int cht_do_cdbwr_compact_check(ClientData cd, Tcl_Interp *ip, void *rw_v) {
790   return compact_keepopen(ip, rw_v, 0);
791 }
792
793 int cht_do_cdbwr_compact_explicit(ClientData cd, Tcl_Interp *ip, void *rw_v) {
794   Rw *rw= rw_v;
795   rw->autocompact= 0;
796   return TCL_OK;
797 }
798 int cht_do_cdbwr_compact_auto(ClientData cd, Tcl_Interp *ip, void *rw_v) {
799   Rw *rw= rw_v;
800   rw->autocompact= 1;
801   return TCL_OK;
802 }
803
804 /*---------- Updateing ----------*/
805
806 static int update(Tcl_Interp *ip, Rw *rw, const char *key,
807                   const Byte *data, int dlen) {
808   HashValue *val;
809   int rc, r;
810
811   if (!rw->logfile) return cht_staticerr
812     (ip, "previous compact failed; cdbwr must be closed and reopened "
813      "before any further updates", "CDB BROKEN");
814   
815   val= htv_prep(dlen);  assert(val);
816   memcpy(htv_fillptr(val), data, dlen);
817
818   r= writerecord(rw->logfile, key, val);
819   if (!r) r= fflush(rw->logfile);
820   if (r) PE("write update to logfile");
821
822   ht_update(&rw->logincore, key, val);
823   return compact_keepopen(ip, rw, 0);
824
825  x_rc:
826   TFREE(val);
827   return rc;
828 }  
829
830 int cht_do_cdbwr_update(ClientData cd, Tcl_Interp *ip,
831                         void *rw_v, const char *key, Tcl_Obj *value) {
832   int dlen;
833   const char *data;
834   data= Tcl_GetStringFromObj(value, &dlen);  assert(data);
835   return update(ip, rw_v, key, data, dlen);
836 }
837
838 int cht_do_cdbwr_update_hb(ClientData cd, Tcl_Interp *ip,
839                            void *rw_v, const char *key, HBytes_Value value) {
840   return update(ip, rw_v, key, cht_hb_data(&value), cht_hb_len(&value));
841 }
842
843 int cht_do_cdbwr_delete(ClientData cd, Tcl_Interp *ip, void *rw_v,
844                         const char *key) {
845   return update(ip, rw_v, key, 0, 0);
846 }
847
848 /*---------- Lookups ----------*/
849
850 static int lookup(Tcl_Interp *ip, Rw *rw, const char *key, ) {
851   const HashValue val;
852
853   val= ht_lookup(&rw->logincore, key);
854   if (!val) {
855     cdb_ &rw->cdb
856
857     return notfound;
858   
859
860 int cht_do_cdbwr_lookup(ClientData cd, Tcl_Interp *ip, void *db,
861                         const char *key, Tcl_Obj *def, Tcl_Obj **result) {
862   
863 }
864
865 int cht_do_cdbwr_lookup_hb(ClientData cd, Tcl_Interp *ip, void *db, const char *key, HBytes_Value def, HBytes_Value *result);