chiark / gitweb /
update debian version
[inn-innduct.git] / storage / ovdb / ovdb.c
1 /*
2  * ovdb.c
3  * ovdb 2.00
4  * Overview storage using BerkeleyDB 2.x/3.x/4.x
5  *
6  * 2004-02-17 : Need to track search cursors, since it's possible that
7  *              ovdb_closesearch does not get called.  We now close
8  *              any cursors still open in ovdb_close, or they'd be in
9  *              the database indefinitely causing deadlocks.
10  * 2002-08-13 : Change BOOL to bool, remove portability to < 2.4.
11  * 2002-08-11 : Cleaned up use of sprintf and fixed a bunch of warnings.
12  * 2002-02-28 : Update getartinfo for the overview API change in 2.4.  This
13  *              breaks compatibility with INN 2.3.x....
14  * 2000-12-12 : Add support for BerkeleyDB DB_SYSTEM_MEM option, controlled
15  *            : by ovdb.conf 'useshm' and 'shmkey'
16  * 2000-11-27 : Update for DB 3.2.x compatibility
17  * 2000-11-13 : New 'readserver' feature
18  * 2000-10-10 : ovdb_search now closes the cursor right after the last
19  *              record is read.
20  * 2000-10-05 : artnum member of struct datakey changed from ARTNUM to u_int32_t.
21  *              OS's where sizeof(long)==8 will have to rebuild their databases
22  *              after this update.
23  * 2000-10-05 : from Dan Riley: struct datakey needs to be zero'd, for
24  *              64-bit OSs where the struct has internal padding bytes.
25  * 2000-09-29 : ovdb_expiregroup can now fix incorrect counts; use new
26  *              inn/version.h so can have same ovdb.c for 2.3.0, 2.3.1, and 2.4
27  * 2000-09-28 : low mark in ovdb_expiregroup still wasn't right
28  * 2000-09-27 : Further improvements to ovdb_expiregroup: restructured the
29  *              loop; now updates groupinfo as it goes along rather than
30  *              counting records at the end, which prevents a possible
31  *              deadlock.
32  * 2000-09-19 : *lo wasn't being set in ovdb_expiregroup
33  * 2000-09-15 : added ovdb_check_user(); tweaked some error msgs; fixed an
34  *              improper use of RENEW
35  * 2000-08-28:  New major release: version 2.00 (beta)
36  *    + "groupsbyname" and "groupstats" databases replaced with "groupinfo".
37  *    + ovdb_recover, ovdb_upgrade, and dbprocs are now deprecated; their
38  *         functionality is now in ovdb_init and ovdb_monitor.
39  *    + ovdb_init can upgrade a database from the old version of ovdb to
40  *         work with this version.
41  *    + Rewrote ovdb_expiregroup(); it can now re-write OV data rather
42  *         than simply deleting old keys (which can leave 'holes' that result
43  *         in inefficient disk-space use).
44  *    + Add "nocompact" to ovdb.conf, which controls whether ovdb_expiregroup()
45  *         rewrites OV data.
46  *    + No longer needs the BerkeleyDB tools db_archive, db_checkpoint, and
47  *         db_deadlock.  That functionality is now in ovdb_monitor.
48  *    + ovdb_open() won't succeed if ovdb_monitor is not running.  This will
49  *         prevent the problems that happen if the database is not regularly
50  *         checkpointed and deadlock-tested.
51  *    + Internal group IDs (32-bit ints) are now reused.
52  *    + Add "maxlocks" to ovdb.conf, which will set the DB lk_max parameter.
53  *    + Pull "test" code out into ovdb_stat.  ovdb_stat will also provide
54  *         functionality similar to the BerkeleyDB "db_stat" command.
55  *    + Update docs: write man pages for the new ovdb_* commands; update
56  *         ovdb.pod
57  *         
58  * 2000-07-11 : fix possible alignment problem; add test code
59  * 2000-07-07 : bugfix: timestamp handling
60  * 2000-06-10 : Modified groupnum() interface; fix ovdb_add() to return false
61  *              for certain groupnum() errors
62  * 2000-06-08 : Added BerkeleyDB 3.1.x compatibility
63  * 2000-04-09 : Tweak some default parameters; store aliased group info
64  * 2000-03-29 : Add DB_RMW flag to the 'get' of get-modify-put sequences
65  * 2000-02-17 : Update expire behavior to be consistent with current
66  *              ov3 and buffindexed
67  * 2000-01-13 : Fix to make compatible with unmodified nnrpd/article.c
68  * 2000-01-04 : Added data versioning
69  * 1999-12-20 : Added BerkeleyDB 3.x compatibility
70  * 1999-12-06 : First Release -- H. Kehoe <hakehoe@avalon.net>
71  */
72
73 #include "config.h"
74 #include "clibrary.h"
75 #include "portable/socket.h"
76 #include "portable/time.h"
77 #include <errno.h>
78 #include <fcntl.h>
79 #ifdef HAVE_LIMITS_H
80 # include <limits.h>
81 #endif
82 #include <pwd.h>
83 #include <signal.h>
84 #ifdef HAVE_SYS_SELECT_H
85 # include <sys/select.h>
86 #endif
87 #include <syslog.h>
88
89 #include "conffile.h"
90 #include "inn/innconf.h"
91 #include "inn/messages.h"
92 #include "libinn.h"
93 #include "paths.h"
94 #include "storage.h"
95
96 #include "ov.h"
97 #include "ovinterface.h"
98 #include "ovdb.h"
99 #include "ovdb-private.h"
100
101 #ifdef HAVE_UNIX_DOMAIN_SOCKETS
102 # include <sys/un.h>
103 #endif
104
105 #ifndef USE_BERKELEY_DB
106
107 /* Provide stub functions if we don't have db */
108
109 bool ovdb_open(int mode UNUSED)
110 {
111     syslog(L_FATAL, "OVDB: ovdb support not enabled");
112     return false;
113 }
114
115 bool ovdb_groupstats(char *group UNUSED, int *lo UNUSED, int *hi UNUSED, int *count UNUSED, int *flag UNUSED)
116 { return false; }
117
118 bool ovdb_groupadd(char *group UNUSED, ARTNUM lo UNUSED, ARTNUM hi UNUSED, char *flag UNUSED)
119 { return false; }
120
121 bool ovdb_groupdel(char *group UNUSED)
122 { return false; }
123
124 bool ovdb_add(char *group UNUSED, ARTNUM artnum UNUSED, TOKEN token UNUSED, char *data UNUSED, int len UNUSED, time_t arrived UNUSED, time_t expires UNUSED)
125 { return false; }
126
127 bool ovdb_cancel(TOKEN token UNUSED)
128 { return false; }
129
130 void *ovdb_opensearch(char *group UNUSED, int low UNUSED, int high UNUSED)
131 { return NULL; }
132
133 bool ovdb_search(void *handle UNUSED, ARTNUM *artnum UNUSED, char **data UNUSED, int *len UNUSED, TOKEN *token UNUSED, time_t *arrived UNUSED)
134 { return false; }
135
136 void ovdb_closesearch(void *handle UNUSED) { }
137
138 bool ovdb_getartinfo(char *group UNUSED, ARTNUM artnum UNUSED, TOKEN *token UNUSED)
139 { return false; }
140
141 bool ovdb_expiregroup(char *group UNUSED, int *lo UNUSED, struct history *h UNUSED)
142 { return false; }
143
144 bool ovdb_ctl(OVCTLTYPE type UNUSED, void *val UNUSED)
145 { return false; }
146
147 void ovdb_close(void) { }
148
149 #else /* USE_BERKELEY_DB */
150
151 #define EXPIREGROUP_TXN_SIZE 100
152 #define DELETE_TXN_SIZE 500
153
154 struct ovdb_conf ovdb_conf;
155 DB_ENV *OVDBenv = NULL;
156 int ovdb_errmode = OVDB_ERR_SYSLOG;
157
158 static int OVDBmode;
159 static bool Cutofflow;
160 static DB **dbs = NULL;
161 static int oneatatime = 0;
162 static int current_db = -1;
163 static time_t eo_start = 0;
164 static int clientmode = 0;
165
166 static DB *groupinfo = NULL;
167 static DB *groupaliases = NULL;
168
169 #define OVDBtxn_nosync  1
170 #define OVDBnumdbfiles  2
171 #define OVDBpagesize    3
172 #define OVDBcachesize   4
173 #define OVDBminkey      5
174 #define OVDBmaxlocks    6
175 #define OVDBnocompact   7
176 #define OVDBreadserver  8
177 #define OVDBnumrsprocs  9
178 #define OVDBmaxrsconn   10
179 #define OVDBuseshm      11
180 #define OVDBshmkey      12
181
182 static CONFTOKEN toks[] = {
183   { OVDBtxn_nosync, "txn_nosync" },
184   { OVDBnumdbfiles, "numdbfiles" },
185   { OVDBpagesize, "pagesize" },
186   { OVDBcachesize, "cachesize" },
187   { OVDBminkey, "minkey" },
188   { OVDBmaxlocks, "maxlocks" },
189   { OVDBnocompact, "nocompact" },
190   { OVDBreadserver, "readserver" },
191   { OVDBnumrsprocs, "numrsprocs" },
192   { OVDBmaxrsconn, "maxrsconn" },
193   { OVDBuseshm, "useshm" },
194   { OVDBshmkey, "shmkey" },
195   { 0, NULL },
196 };
197
198 #define _PATH_OVDBCONF "ovdb.conf"
199
200 /*********** readserver functions ***********/
201
202 static int clientfd = -1;
203
204 /* read client send and recieve functions. */
205
206 static int
207 csend(void *data, int n)
208 {
209     ssize_t status;
210
211     if (n == 0)
212         return 0;
213     status = xwrite(clientfd, data, n);
214     if (status < 0)
215         syswarn("OVDB: rc: cant write");
216     return status;
217 }
218
219 static int crecv(void *data, int n)
220 {
221     int r, p = 0;
222
223     if(n == 0)
224         return 0;
225
226     while(p < n) {
227         r = read(clientfd, (char *)data + p, n - p);
228         if(r <= 0) {
229             if(r < 0 && errno == EINTR)
230                 continue;
231             syslog(LOG_ERR, "OVDB: rc: cant read: %m");
232             clientfd = -1;
233             exit(1);
234         }
235         p+= r;
236     }
237     return p;
238 }
239
240 /* Attempt to connect to the readserver.  If anything fails, we
241    return -1 so that ovdb_open can open the database directly. */
242
243 static int client_connect()
244 {
245     ssize_t r;
246     size_t p = 0;
247     char *path;
248 #ifdef HAVE_UNIX_DOMAIN_SOCKETS
249     struct sockaddr_un sa;
250 #else
251     struct sockaddr_in sa;
252 #endif
253     char banner[sizeof(OVDB_SERVER_BANNER)];
254     fd_set fds;
255     struct timeval timeout;
256
257 #ifdef HAVE_UNIX_DOMAIN_SOCKETS
258     clientfd = socket(AF_UNIX, SOCK_STREAM, 0);
259 #else
260     clientfd = socket(AF_INET, SOCK_STREAM, 0);
261 #endif
262     if(clientfd < 0) {
263         syslog(LOG_ERR, "OVDB: rc: socket: %m");
264         return -1;
265     }
266
267 #ifdef HAVE_UNIX_DOMAIN_SOCKETS
268     sa.sun_family = AF_UNIX;
269     path = concatpath(innconf->pathrun, OVDB_SERVER_SOCKET);
270     strlcpy(sa.sun_path, path, sizeof(sa.sun_path));
271     free(path);
272 #else
273     sa.sin_family = AF_INET;
274     sa.sin_port = 0;
275     sa.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
276     bind(clientfd, (struct sockaddr *) &sa, sizeof sa);
277     sa.sin_port = htons(OVDB_SERVER_PORT);
278 #endif
279     if((r = connect(clientfd, (struct sockaddr *) &sa, sizeof sa)) != 0) {
280         syslog(LOG_ERR, "OVDB: rc: cant connect to server: %m");
281         close(clientfd);
282         clientfd = -1;
283         return -1;
284     }
285
286     while(p < sizeof(OVDB_SERVER_BANNER)) {
287         FD_ZERO(&fds);
288         FD_SET(clientfd, &fds);
289         timeout.tv_sec = 30;
290         timeout.tv_usec = 0;
291
292         r = select(clientfd+1, &fds, NULL, NULL, &timeout);
293
294         if(r < 0) {
295             if(errno == EINTR)
296                 continue;
297             syslog(LOG_ERR, "OVDB: rc: select: %m");
298             close(clientfd);
299             clientfd = -1;
300             return -1;
301         }
302         if(r == 0) {
303             syslog(LOG_ERR, "OVDB: rc: timeout waiting for server");
304             close(clientfd);
305             clientfd = -1;
306             return -1;
307         }
308
309         r = read(clientfd, banner + p, sizeof(OVDB_SERVER_BANNER) - p);
310         if(r <= 0) {
311             if(r < 0 && errno == EINTR)
312                 continue;
313             syslog(LOG_ERR, "OVDB: rc: cant read: %m");
314             close(clientfd);
315             clientfd = -1;
316             return -1;
317         }
318         p+= r;
319     }
320
321     if(memcmp(banner, OVDB_SERVER_BANNER, sizeof(OVDB_SERVER_BANNER))) {
322         syslog(LOG_ERR, "OVDB: rc: unknown reply from server");
323         close(clientfd);
324         clientfd = -1;
325         return -1;
326     }
327     return 0;
328 }
329
330 static void
331 client_disconnect(void)
332 {
333     struct rs_cmd rs;
334
335     if (clientfd != -1) {
336         rs.what = CMD_QUIT;
337         csend(&rs, sizeof(rs));
338     }
339     clientfd = -1;
340 }
341
342
343 /*********** internal functions ***********/
344
345 #if DB_VERSION_MAJOR == 2
346 char *db_strerror(int err)
347 {
348     switch(err) {
349     case DB_RUNRECOVERY:
350         return "Recovery Needed";
351     default:
352         return strerror(err);
353     }
354 }
355 #endif /* DB_VERSION_MAJOR == 2 */
356
357
358 static bool conf_bool_val(char *str, bool *value)
359 {
360     if(strcasecmp(str, "on") == 0
361        || strcasecmp(str, "true") == 0
362        || strcasecmp(str, "yes") == 0) {
363         *value = true;
364         return true;
365     }
366     if(strcasecmp(str, "off") == 0
367        || strcasecmp(str, "false") == 0
368        || strcasecmp(str, "no") == 0) {
369         *value = false;
370         return true;
371     }
372     return false;
373 }
374
375 static bool conf_long_val(char *str, long *value)
376 {
377     long v;
378
379     errno = 0;
380     v = strtol(str, NULL, 10);
381     if(v == 0 && errno != 0) {
382         return false;
383     }
384     *value = v;
385     return true;
386 }
387
388 void read_ovdb_conf(void)
389 {
390     static int confread = 0;
391     int done = 0;
392     char *path;
393     CONFFILE *f;
394     CONFTOKEN *tok;
395     bool b;
396     long l;
397
398     if(confread)
399         return;
400
401     /* defaults */
402     ovdb_conf.home = innconf->pathoverview;
403     ovdb_conf.txn_nosync = 1;
404     ovdb_conf.numdbfiles = 32;
405     ovdb_conf.pagesize = 8192;
406     ovdb_conf.cachesize = 8000 * 1024;
407     ovdb_conf.minkey = 0;
408     ovdb_conf.maxlocks = 4000;
409     ovdb_conf.nocompact = 1;
410     ovdb_conf.readserver = 0;
411     ovdb_conf.numrsprocs = 5;
412     ovdb_conf.maxrsconn = 0;
413     ovdb_conf.useshm = 0;
414     ovdb_conf.shmkey = 6400;
415
416     path = concatpath(innconf->pathetc, _PATH_OVDBCONF);
417     f = CONFfopen(path);
418     free(path);
419
420     if(f) {
421         while(!done && (tok = CONFgettoken(toks, f))) {
422             switch(tok->type) {
423             case OVDBtxn_nosync:
424                 tok = CONFgettoken(0, f);
425                 if(!tok) {
426                     done = 1;
427                     continue;
428                 }
429                 if(conf_bool_val(tok->name, &b)) {
430                     ovdb_conf.txn_nosync = b;
431                 }
432                 break;
433             case OVDBnumdbfiles:
434                 tok = CONFgettoken(0, f);
435                 if(!tok) {
436                     done = 1;
437                     continue;
438                 }
439                 if(conf_long_val(tok->name, &l) && l > 0) {
440                     ovdb_conf.numdbfiles = l;
441                 }
442                 break;
443             case OVDBpagesize:
444                 tok = CONFgettoken(0, f);
445                 if(!tok) {
446                     done = 1;
447                     continue;
448                 }
449                 if(conf_long_val(tok->name, &l) && l > 0) {
450                     ovdb_conf.pagesize = l;
451                 }
452                 break;
453             case OVDBcachesize:
454                 tok = CONFgettoken(0, f);
455                 if(!tok) {
456                     done = 1;
457                     continue;
458                 }
459                 if(conf_long_val(tok->name, &l) && l > 0) {
460                     ovdb_conf.cachesize = l * 1024;
461                 }
462                 break;
463             case OVDBminkey:
464                 tok = CONFgettoken(0, f);
465                 if(!tok) {
466                     done = 1;
467                     continue;
468                 }
469                 if(conf_long_val(tok->name, &l) && l > 1) {
470                     ovdb_conf.minkey = l;
471                 }
472                 break;
473             case OVDBmaxlocks:
474                 tok = CONFgettoken(0, f);
475                 if(!tok) {
476                     done = 1;
477                     continue;
478                 }
479                 if(conf_long_val(tok->name, &l) && l > 0) {
480                     ovdb_conf.maxlocks = l;
481                 }
482                 break;
483             case OVDBnocompact:
484                 tok = CONFgettoken(0, f);
485                 if(!tok) {
486                     done = 1;
487                     continue;
488                 }
489                 if(conf_long_val(tok->name, &l) && l >= 0) {
490                     ovdb_conf.nocompact = l;
491                 }
492                 break;
493             case OVDBreadserver:
494                 tok = CONFgettoken(0, f);
495                 if(!tok) {
496                     done = 1;
497                     continue;
498                 }
499                 if(conf_bool_val(tok->name, &b)) {
500                     ovdb_conf.readserver = b;
501                 }
502                 break;
503             case OVDBnumrsprocs:
504                 tok = CONFgettoken(0, f);
505                 if(!tok) {
506                     done = 1;
507                     continue;
508                 }
509                 if(conf_long_val(tok->name, &l) && l > 0) {
510                     ovdb_conf.numrsprocs = l;
511                 }
512                 break;
513             case OVDBmaxrsconn:
514                 tok = CONFgettoken(0, f);
515                 if(!tok) {
516                     done = 1;
517                     continue;
518                 }
519                 if(conf_long_val(tok->name, &l) && l >= 0) {
520                     ovdb_conf.maxrsconn = l;
521                 }
522                 break;
523             case OVDBuseshm:
524                 tok = CONFgettoken(0, f);
525                 if(!tok) {
526                     done = 1;
527                     continue;
528                 }
529                 if(conf_bool_val(tok->name, &b)) {
530                     ovdb_conf.useshm = b;
531                 }
532                 break;
533             case OVDBshmkey:
534                 tok = CONFgettoken(0, f);
535                 if(!tok) {
536                     done = 1;
537                     continue;
538                 }
539                 if(conf_long_val(tok->name, &l) && l >= 0) {
540                     ovdb_conf.shmkey = l;
541                 }
542                 break;
543             }
544         }
545         CONFfclose(f);
546     }
547
548     /* If user did not specify minkey, choose one based on pagesize */
549     if(ovdb_conf.minkey == 0) {
550         ovdb_conf.minkey = ovdb_conf.pagesize / 2048 - 1;
551         if(ovdb_conf.minkey < 2)
552             ovdb_conf.minkey = 2;
553     }
554
555     confread = 1;
556 }
557
558
559 /* Function that db will use to report errors */
560 #if DB_VERSION_MAJOR > 4 || (DB_VERSION_MAJOR == 4 && DB_VERSION_MINOR >= 3)
561 static void OVDBerror(const DB_ENV *dbenv UNUSED, const char *db_errpfx UNUSED, const char *buffer)
562 #else
563 static void OVDBerror(const char *db_errpfx UNUSED, char *buffer)
564 #endif
565 {
566     switch(ovdb_errmode) {
567     case OVDB_ERR_SYSLOG:
568         syslog(L_ERROR, "OVDB: %s", buffer);
569         break;
570     case OVDB_ERR_STDERR:
571         fprintf(stderr, "OVDB: %s\n", buffer);
572         break;
573     }
574 }
575
576 static u_int32_t _db_flags = 0;
577 #if DB_VERSION_MAJOR == 2
578 static DB_INFO   _dbinfo;
579 #endif
580
581 static int open_db_file(int which)
582 {
583     int ret;
584     char name[10];
585 #if DB_VERSION_MAJOR > 4 || (DB_VERSION_MAJOR == 4 && DB_VERSION_MINOR >= 1)
586     DB_TXN *tid;
587 #endif
588
589     if(dbs[which] != NULL)
590         return 0;
591
592     snprintf(name, sizeof(name), "ov%05d", which);
593
594 #if DB_VERSION_MAJOR == 2
595     ret = db_open(name, DB_BTREE, _db_flags, 0666, OVDBenv, &_dbinfo,
596                   &(dbs[which]));
597     if (ret != 0) {
598         dbs[which] = NULL;
599         return ret;
600     }
601 #else
602     ret = db_create(&(dbs[which]), OVDBenv, 0);
603     if (ret != 0)
604         return ret;
605
606     if(ovdb_conf.minkey > 0)
607         (dbs[which])->set_bt_minkey(dbs[which], ovdb_conf.minkey);
608     if(ovdb_conf.pagesize > 0)
609         (dbs[which])->set_pagesize(dbs[which], ovdb_conf.pagesize);
610
611 #if DB_VERSION_MAJOR > 4 || (DB_VERSION_MAJOR == 4 && DB_VERSION_MINOR >= 1)
612     TXN_START(t_open_db_file, tid);
613     ret = (dbs[which])->open(dbs[which], tid, name, NULL, DB_BTREE, _db_flags,
614                              0666);
615     if (ret == 0)
616         TXN_COMMIT(t_open_db_file, tid);
617 #else
618     ret = (dbs[which])->open(dbs[which], name, NULL, DB_BTREE, _db_flags,
619                              0666);
620 #endif
621     if (ret != 0) {
622         (dbs[which])->close(dbs[which], 0);
623         dbs[which] = NULL;
624         return ret;
625     }
626 #endif
627     return 0;
628 }
629
630 static void close_db_file(int which)
631 {
632     if(which == -1 || dbs[which] == NULL)
633         return;
634     
635     dbs[which]->close(dbs[which], 0);
636     dbs[which] = NULL;
637 }
638
639 static int which_db(char *group)
640 {
641     HASH grouphash;
642     unsigned int i;
643
644     grouphash = Hash(group, strlen(group));
645     memcpy(&i, &grouphash, sizeof(i));
646     return i % ovdb_conf.numdbfiles;
647 }
648
649 static DB *get_db_bynum(int which)
650 {
651     int ret;
652     if(which >= ovdb_conf.numdbfiles)
653         return NULL;
654     if(oneatatime) {
655         if(which != current_db && current_db != -1)
656             close_db_file(current_db);
657
658         ret = open_db_file(which);
659         if (ret != 0)
660             syslog(L_ERROR, "OVDB: open_db_file failed: %s", db_strerror(ret));
661
662         current_db = which;
663     }
664     return(dbs[which]);
665 }
666
667
668 int ovdb_getgroupinfo(char *group, struct groupinfo *gi, int ignoredeleted, DB_TXN *tid, int getflags)
669 {
670     int ret;
671     DBT key, val;
672
673     if(group == NULL)   /* just in case */
674         return DB_NOTFOUND;
675
676     memset(&key, 0, sizeof key);
677     memset(&val, 0, sizeof val);
678
679     key.data = group;
680     key.size = strlen(group);
681     val.data = gi;
682     val.ulen = sizeof(struct groupinfo);
683     val.flags = DB_DBT_USERMEM;
684
685     ret = groupinfo->get(groupinfo, tid, &key, &val, getflags);
686     if (ret != 0)
687         return ret;
688
689     if(val.size != sizeof(struct groupinfo)) {
690         syslog(L_ERROR, "OVDB: wrong size for %s groupinfo (%u)",
691             group, val.size);
692         return DB_NOTFOUND;
693     }
694
695     if(ignoredeleted && (gi->status & GROUPINFO_DELETED))
696         return DB_NOTFOUND;
697
698     return 0;
699 }
700
701 #define GROUPID_MAX_FREELIST 10240
702 #define GROUPID_MIN_FREELIST 100
703
704 /* allocate a new group ID and return in gno */
705 /* must be used in a transaction */
706 static int groupid_new(group_id_t *gno, DB_TXN *tid)
707 {
708     DBT key, val;
709     int ret, n;
710     group_id_t newgno, *freelist, one;
711
712     memset(&key, 0, sizeof key);
713     memset(&val, 0, sizeof val);
714
715     key.data = (char *) "!groupid_freelist";
716     key.size = sizeof("!groupid_freelist");
717
718     ret = groupinfo->get(groupinfo, tid, &key, &val, DB_RMW);
719     if (ret != 0) {
720         if(ret == DB_NOTFOUND) {
721             val.size = sizeof(group_id_t);
722             val.data = &one;
723             one = 1;
724         } else {
725             return ret;
726         }
727     }
728
729     if(val.size % sizeof(group_id_t)) {
730         syslog(L_ERROR, "OVDB: invalid size (%d) for !groupid_freelist",
731                 val.size);
732         return EINVAL;
733     }
734
735     n = val.size / sizeof(group_id_t);
736     freelist = xmalloc(n * sizeof(group_id_t));
737     memcpy(freelist, val.data, val.size);
738     if(n <= GROUPID_MIN_FREELIST ) {
739         newgno = freelist[n-1];
740         (freelist[n-1])++;
741         val.data = freelist;
742     } else {
743         newgno = freelist[0];
744         val.data = &(freelist[1]);
745         val.size -= sizeof(group_id_t);
746     }
747
748     ret = groupinfo->put(groupinfo, tid, &key, &val, 0);
749     if (ret != 0) {
750         free(freelist);
751         return ret;
752     }
753
754     free(freelist);
755     *gno = newgno;
756     return 0;
757 }
758
759 /* mark given group ID as "unused" */
760 /* must be used in a transaction */
761 static int groupid_free(group_id_t gno, DB_TXN *tid)
762 {
763     DBT key, val;
764     int ret, n, i;
765     group_id_t *freelist;
766
767     memset(&key, 0, sizeof key);
768     memset(&val, 0, sizeof val);
769
770     key.data = (char *) "!groupid_freelist";
771     key.size = sizeof("!groupid_freelist");
772
773     ret = groupinfo->get(groupinfo, tid, &key, &val, DB_RMW);
774     if (ret != 0) {
775         return ret;
776     }
777
778     if(val.size % sizeof(group_id_t)) {
779         syslog(L_ERROR, "OVDB: invalid size (%d) for !groupid_freelist",
780                 val.size);
781         return EINVAL;
782     }
783
784     n = val.size / sizeof(group_id_t);
785     if(n > GROUPID_MAX_FREELIST)
786         return 0;
787     freelist = xmalloc((n + 1) * sizeof(group_id_t));
788     memcpy(freelist, val.data, val.size);
789
790     if(gno >= freelist[n-1]) {  /* shouldn't happen */
791         free(freelist);
792         return 0;
793     }
794     for(i = 0; i < n-1; i++) {
795         if(gno == freelist[i]) {        /* already on freelist */
796             free(freelist);
797             return 0;
798         }
799     }
800         
801     freelist[n] = freelist[n-1];
802     freelist[n-1] = gno;
803     val.data = freelist;
804     val.size += sizeof(group_id_t);
805
806     ret = groupinfo->put(groupinfo, tid, &key, &val, 0);
807
808     free(freelist);
809     return ret;
810 }
811
812 /* Must be called outside of a transaction because it makes its own
813    transactions */
814 static int delete_all_records(int whichdb, group_id_t gno)
815 {
816     DB *db;
817     DBC *dbcursor;
818     DBT key, val;
819     struct datakey dk;
820     int count;
821     int ret;
822     DB_TXN *tid;
823
824     memset(&key, 0, sizeof key);
825     memset(&val, 0, sizeof val);
826     memset(&dk, 0, sizeof dk);
827
828     db = get_db_bynum(whichdb);
829     if(db == NULL)
830         return DB_NOTFOUND;
831
832     dk.groupnum = gno;
833     dk.artnum = 0;
834
835     while(1) {
836         TXN_START(t_del, tid);
837
838         /* get a cursor to traverse the ov records and delete them */
839         ret = db->cursor(db, tid, &dbcursor, 0);
840         switch(ret)
841         {
842         case 0:
843             break;
844         case TRYAGAIN:
845             TXN_RETRY(t_del, tid);
846         default:
847             TXN_ABORT(t_del, tid);
848             syslog(L_ERROR, "OVDB: delete_all_records: db->cursor: %s", db_strerror(ret));
849             return ret;
850         }
851
852         key.data = &dk;
853         key.size = sizeof dk;
854         val.flags = DB_DBT_PARTIAL;
855
856         for(count = 0; count < DELETE_TXN_SIZE; count++) {
857             ret = dbcursor->c_get(dbcursor, &key, &val,
858                             count ? DB_NEXT : DB_SET_RANGE);
859             switch (ret)
860             {
861             case 0:
862                 break;
863             case DB_NOTFOUND:
864                 dbcursor->c_close(dbcursor);
865                 TXN_COMMIT(t_del, tid);
866                 return 0;
867             case TRYAGAIN:
868                 dbcursor->c_close(dbcursor);
869                 TXN_RETRY(t_del, tid);
870             default:
871                 warn("OVDB: delete_all_records: DBcursor->c_get: %s",
872                      db_strerror(ret));
873                 dbcursor->c_close(dbcursor);
874                 TXN_ABORT(t_del, tid);
875                 return ret;
876             }
877
878             if(key.size == sizeof dk
879                     && memcmp(key.data, &gno, sizeof gno)) {
880                 break;
881             }
882
883             ret = dbcursor->c_del(dbcursor, 0);
884             switch (ret) {
885             case 0:
886             case DB_KEYEMPTY:
887                 break;
888             case TRYAGAIN:
889                 dbcursor->c_close(dbcursor);
890                 TXN_RETRY(t_del, tid);
891             default:
892                 warn("OVDB: delete_all_records: DBcursor->c_del: %s",
893                      db_strerror(ret));
894                 dbcursor->c_close(dbcursor);
895                 TXN_ABORT(t_del, tid);
896                 return ret;
897             }
898         }
899         dbcursor->c_close(dbcursor);
900         TXN_COMMIT(t_del, tid);
901         if(count < DELETE_TXN_SIZE) {
902             break;
903         }
904     }
905     return 0;
906 }
907
908 /* Make a temporary groupinfo key using the given db number and group ID.
909    Must be called in a transaction */
910 static int
911 mk_temp_groupinfo(int whichdb, group_id_t gno, DB_TXN *tid)
912 {
913     char keystr[1 + sizeof gno];
914     DBT key, val;
915     struct groupinfo gi;
916     int ret;
917
918     memset(&key, 0, sizeof key);
919     memset(&val, 0, sizeof val);
920     memset(&gi, 0, sizeof gi);
921
922     keystr[0] = 0;
923     memcpy(keystr + 1, &gno, sizeof gno);
924
925     gi.current_db = whichdb;
926     gi.current_gid = gno;
927     gi.status = GROUPINFO_DELETED;
928
929     key.data = keystr;
930     key.size = sizeof keystr;
931     val.data = &gi;
932     val.size = sizeof gi;
933
934     ret = groupinfo->put(groupinfo, tid, &key, &val, 0);
935     switch (ret) {
936     case 0:
937         break;
938     default:
939         syslog(L_ERROR, "OVDB: mk_temp_groupinfo: groupinfo->put: %s", db_strerror(ret));
940     case TRYAGAIN:
941         return ret;
942     }
943
944     return 0;
945 }
946
947 /* Delete a temporary groupinfo key created by mk_temp_groupid, then
948    frees the group id.
949    Must NOT be called within a transaction. */
950 static int
951 rm_temp_groupinfo(group_id_t gno)
952 {
953     char keystr[1 + sizeof gno];
954     DB_TXN *tid;
955     DBT key;
956     int ret;
957
958     memset(&key, 0, sizeof key);
959
960     keystr[0] = 0;
961     memcpy(keystr + 1, &gno, sizeof gno);
962
963     key.data = keystr;
964     key.size = sizeof keystr;
965
966     TXN_START(t_tmp, tid);
967
968     ret = groupinfo->del(groupinfo, tid, &key, 0);
969     switch(ret) {
970     case 0:
971     case DB_NOTFOUND:
972         break;
973     case TRYAGAIN:
974         TXN_RETRY(t_tmp, tid);
975     default:
976         TXN_ABORT(t_tmp, tid);
977         syslog(L_ERROR, "OVDB: rm_temp_groupinfo: groupinfo->del: %s", db_strerror(ret));
978         return ret;
979     }
980
981     switch(groupid_free(gno, tid)) {
982     case 0:
983         break;
984     case TRYAGAIN:
985         TXN_RETRY(t_tmp, tid);
986     default:
987         TXN_ABORT(t_tmp, tid);
988         syslog(L_ERROR, "OVDB: rm_temp_groupinfo: groupid_free: %s", db_strerror(ret));
989         return ret;
990     }
991
992     TXN_COMMIT(t_tmp, tid);
993     return 0;
994 }
995
996 /* This function deletes overview records for deleted or forgotton groups */
997 /* argument: 0 = process deleted groups   1 = process forgotton groups */
998 static bool delete_old_stuff(int forgotton)
999 {
1000     DBT key, val;
1001     DBC *cursor;
1002     DB_TXN *tid;
1003     struct groupinfo gi;
1004     char **dellist = NULL;
1005     size_t *dellistsz = NULL;
1006     int listlen, listcount;
1007     int i, ret;
1008
1009     TXN_START(t_dellist, tid);
1010     if (dellist != NULL) {
1011         for (i = 0; i < listcount; ++i)
1012             free(dellist[i]);
1013         free(dellist);
1014     }
1015     if (dellistsz != NULL)
1016         free(dellistsz);
1017     listlen = 20;
1018     listcount = 0;
1019     dellist = xmalloc(listlen * sizeof(char *));
1020     dellistsz = xmalloc(listlen * sizeof(size_t));
1021
1022     memset(&key, 0, sizeof key);
1023     memset(&val, 0, sizeof val);
1024
1025     val.data = &gi;
1026     val.ulen = val.dlen = sizeof gi;
1027     val.flags = DB_DBT_USERMEM | DB_DBT_PARTIAL;
1028
1029     /* get a cursor to traverse all of the groupinfo records */
1030     ret = groupinfo->cursor(groupinfo, tid, &cursor, 0);
1031     if (ret != 0) {
1032         syslog(L_ERROR, "OVDB: delete_old_stuff: groupinfo->cursor: %s", db_strerror(ret));
1033         free(dellist);
1034         free(dellistsz);
1035         return false;
1036     }
1037
1038     while((ret = cursor->c_get(cursor, &key, &val, DB_NEXT)) == 0) {
1039         if(key.size == sizeof("!groupid_freelist") &&
1040                 !strcmp("!groupid_freelist", key.data))
1041             continue;
1042         if(val.size != sizeof(struct groupinfo)) {
1043             syslog(L_ERROR, "OVDB: delete_old_stuff: wrong size for groupinfo record");
1044             continue;
1045         }
1046         if((!forgotton && (gi.status & GROUPINFO_DELETED))
1047                 || (forgotton && (gi.expired < eo_start))) {
1048             dellist[listcount] = xmalloc(key.size);
1049             memcpy(dellist[listcount], key.data, key.size);
1050             dellistsz[listcount] = key.size;
1051             listcount++;
1052             if(listcount >= listlen) {
1053                 listlen += 20;
1054                 dellist = xrealloc(dellist, listlen * sizeof(char *));
1055                 dellistsz = xrealloc(dellistsz, listlen * sizeof(size_t));
1056             }
1057         }
1058     }
1059     cursor->c_close(cursor);
1060     switch (ret) {
1061     case 0:
1062     case DB_NOTFOUND:
1063         TXN_COMMIT(t_dellist, tid);
1064         break;
1065     case TRYAGAIN:
1066         TXN_RETRY(t_dellist, tid);
1067     default:
1068         TXN_ABORT(t_dellist, tid);
1069         syslog(L_ERROR, "OVDB: delete_old_stuff: cursor->c_get: %s", db_strerror(ret));
1070         for (i = 0; i < listcount; ++i)
1071             free(dellist[i]);
1072         free(dellist);
1073         free(dellistsz);
1074         return false;
1075     }
1076
1077     for(i = 0; i < listcount; i++) {
1078         TXN_START(t_dos, tid);
1079
1080         /* Can't use ovdb_getgroupinfo here */
1081         key.data = dellist[i];
1082         key.size = dellistsz[i];
1083         val.data = &gi;
1084         val.ulen = sizeof(struct groupinfo);
1085         val.flags = DB_DBT_USERMEM;
1086
1087         ret = groupinfo->get(groupinfo, tid, &key, &val, 0);
1088
1089         switch (ret)
1090         {
1091         case 0:
1092             break;
1093         case TRYAGAIN:
1094             TXN_RETRY(t_dos, tid);
1095         case DB_NOTFOUND:
1096             TXN_ABORT(t_dos, tid);
1097             continue;
1098         default:
1099             syslog(L_ERROR, "OVDB: delete_old_stuff: groupinfo->get: %s\n", db_strerror(ret));
1100             TXN_ABORT(t_dos, tid);
1101             continue;
1102         }
1103         if(val.size != sizeof(struct groupinfo)) {
1104             TXN_ABORT(t_dos, tid);
1105             continue;
1106         }
1107
1108         /* This if() is true if this ISN'T a key created by mk_temp_groupinfo */
1109         if(*((char *)key.data) != 0 || key.size != 1 + sizeof(group_id_t)) {
1110
1111             switch(mk_temp_groupinfo(gi.current_db, gi.current_gid, tid)) {
1112             case 0:
1113                 break;
1114             case TRYAGAIN:
1115                 TXN_RETRY(t_dos, tid);
1116             default:
1117                 TXN_ABORT(t_dos, tid);
1118                 continue;
1119             }
1120             if(gi.status & GROUPINFO_MOVING) {
1121                 switch(mk_temp_groupinfo(gi.new_db, gi.new_gid, tid)) {
1122                 case 0:
1123                     break;
1124                 case TRYAGAIN:
1125                     TXN_RETRY(t_dos, tid);
1126                 default:
1127                     TXN_ABORT(t_dos, tid);
1128                     continue;
1129                 }
1130             }
1131
1132             /* delete the corresponding groupaliases record (if exists) */
1133             switch(groupaliases->del(groupaliases, tid, &key, 0)) {
1134             case 0:
1135             case DB_NOTFOUND:
1136                 break;
1137             case TRYAGAIN:
1138                 TXN_RETRY(t_dos, tid);
1139             default:
1140                 TXN_ABORT(t_dos, tid);
1141                 continue;
1142             }
1143
1144             switch(groupinfo->del(groupinfo, tid, &key, 0)) {
1145             case 0:
1146             case DB_NOTFOUND:
1147                 break;
1148             case TRYAGAIN:
1149                 TXN_RETRY(t_dos, tid);
1150             default:
1151                 TXN_ABORT(t_dos, tid);
1152                 continue;
1153             }
1154         }
1155
1156         TXN_COMMIT(t_dos, tid);
1157
1158         if(delete_all_records(gi.current_db, gi.current_gid) == 0) {
1159             rm_temp_groupinfo(gi.current_gid);
1160         }
1161         if(gi.status & GROUPINFO_MOVING) {
1162             if(delete_all_records(gi.new_db, gi.new_gid) == 0) {
1163                 rm_temp_groupinfo(gi.new_gid);
1164             }
1165         }
1166     }
1167 out:
1168     for(i = 0; i < listcount; i++)
1169         free(dellist[i]);
1170     free(dellist);
1171     free(dellistsz);
1172     return true;
1173 }
1174
1175 static int count_records(struct groupinfo *gi)
1176 {
1177     int ret;
1178     DB *db;
1179     DBC *cursor;
1180     DBT key, val;
1181     struct datakey dk;
1182     u_int32_t artnum, newlow = 0;
1183
1184     memset(&key, 0, sizeof key);
1185     memset(&val, 0, sizeof val);
1186     memset(&dk, 0, sizeof dk);
1187
1188     db = get_db_bynum(gi->current_db);
1189     if(db == NULL)
1190         return DB_NOTFOUND;
1191
1192     dk.groupnum = gi->current_gid;
1193     dk.artnum = 0;
1194     key.data = &dk;
1195     key.size = key.ulen = sizeof dk;
1196     key.flags = DB_DBT_USERMEM;
1197     val.flags = DB_DBT_PARTIAL;
1198
1199     gi->count = 0;
1200
1201     ret = db->cursor(db, NULL, &cursor, 0);
1202     if (ret)
1203         return ret;
1204
1205     ret = cursor->c_get(cursor, &key, &val, DB_SET_RANGE);
1206     switch (ret)
1207     {
1208     case 0:
1209     case DB_NOTFOUND:
1210         break;
1211     default:
1212         cursor->c_close(cursor);
1213         return ret;
1214     }
1215     while(ret == 0 && key.size == sizeof(dk) && dk.groupnum == gi->current_gid) {
1216         artnum = ntohl(dk.artnum);
1217         if(newlow == 0 || newlow > artnum)
1218             newlow = artnum;
1219         if(artnum > gi->high)
1220             gi->high = artnum;
1221         gi->count++;
1222
1223         ret = cursor->c_get(cursor, &key, &val, DB_NEXT);
1224     }
1225     cursor->c_close(cursor);
1226     if(gi->count == 0)
1227         gi->low = gi->high + 1;
1228     else
1229         gi->low = newlow;
1230
1231     if(ret == DB_NOTFOUND)
1232         return 0;
1233     return ret;
1234 }
1235
1236
1237 /*
1238  * Locking:  OVopen() calls ovdb_getlock(OVDB_LOCK_NORMAL).  This
1239  * aquires a read (shared) lock on the lockfile.  Multiple processes
1240  * can have this file locked at the same time.  That way, if there
1241  * are any processes that are using the database, the lock file will
1242  * have one or more shared locks on it.
1243  *
1244  * ovdb_init, when starting, calls ovdb_getlock(OVDB_LOCK_EXCLUSIVE).
1245  * This tries to get a write (exclusive) lock, which will fail if
1246  * anyone has a shared lock.  This way, ovdb_init can tell if there
1247  * are any processes using the database.  If not, and the excl. lock
1248  * succeeds, ovdb_init is free to do DB_RUNRECOVER.
1249  *
1250  * ovdb_getlock() in the "normal" lock mode calls ovdb_check_monitor,
1251  * which looks for the OVDB_MONITOR_PIDFILE.  If said file does not
1252  * exist, or the PID in it does not exist, it will fail.  This will
1253  * prevent OVopen() from opening the database if ovdb_monitor is not running.
1254  *
1255  * The OVDB_LOCK_ADMIN mode is used by ovdb_monitor to get a shared lock
1256  * without testing the pidfile.
1257  */
1258 static int lockfd = -1;
1259 bool ovdb_getlock(int mode)
1260 {
1261     if(lockfd == -1) {
1262         char *lockfn = concatpath(innconf->pathrun, OVDB_LOCKFN);
1263         lockfd = open(lockfn,
1264                 mode == OVDB_LOCK_NORMAL ? O_RDWR : O_CREAT|O_RDWR, 0660);
1265         if(lockfd == -1) {
1266             free(lockfn);
1267             if(errno == ENOENT)
1268                 syslog(L_FATAL, "OVDB: can not open database unless ovdb_monitor is running.");
1269             return false;
1270         }
1271         close_on_exec(lockfd, true);
1272         free(lockfn);
1273     } else
1274         return true;
1275
1276     if(mode == OVDB_LOCK_NORMAL) {
1277         if(!ovdb_check_pidfile(OVDB_MONITOR_PIDFILE)) {
1278             syslog(L_FATAL, "OVDB: can not open database unless ovdb_monitor is running.");
1279             return false;
1280         }
1281     }
1282     if(mode == OVDB_LOCK_EXCLUSIVE) {
1283         if(!inn_lock_file(lockfd, INN_LOCK_WRITE, false)) {
1284             close(lockfd);
1285             lockfd = -1;
1286             return false;
1287         }
1288         return true;
1289     } else {
1290         if(!inn_lock_file(lockfd, INN_LOCK_READ, false)) {
1291             close(lockfd);
1292             lockfd = -1;
1293             return false;
1294         }
1295         return true;
1296     }
1297 }
1298
1299 bool ovdb_releaselock(void)
1300 {
1301     bool r;
1302     if(lockfd == -1)
1303         return true;
1304     r = inn_lock_file(lockfd, INN_LOCK_UNLOCK, false);
1305     close(lockfd);
1306     lockfd = -1;
1307     return r;
1308 }
1309
1310 bool ovdb_check_pidfile(char *file)
1311 {
1312     int f, pid;
1313     char buf[SMBUF];
1314     char *pidfn = concatpath(innconf->pathrun, file);
1315
1316     f = open(pidfn, O_RDONLY);
1317     if(f == -1) {
1318         if(errno != ENOENT)
1319             syslog(L_FATAL, "OVDB: can't open %s: %m", pidfn);
1320         free(pidfn);
1321         return false;
1322     }
1323     memset(buf, 0, SMBUF);
1324     if(read(f, buf, SMBUF-1) < 0) {
1325         syslog(L_FATAL, "OVDB: can't read from %s: %m", pidfn);
1326         free(pidfn);
1327         close(f);
1328         return false;
1329     }
1330     close(f);
1331     free(pidfn);
1332     pid = atoi(buf);
1333     if(pid <= 1) {
1334         return false;
1335     }
1336     if(kill(pid, 0) < 0 && errno == ESRCH) {
1337         return false;
1338     }
1339     return true;
1340 }
1341
1342 /* make sure the effective uid is that of NEWSUSER */
1343 bool ovdb_check_user(void)
1344 {
1345     struct passwd *p;
1346     static int result = -1;
1347
1348     if(result == -1) {
1349         p = getpwnam(NEWSUSER);
1350         if(!p) {
1351             syslog(L_FATAL, "OVDB: getpwnam(" NEWSUSER ") failed: %m");
1352             return false;
1353         }
1354         result = (p->pw_uid == geteuid());
1355     }
1356     return result;
1357 }
1358
1359 static int check_version(void)
1360 {
1361     int ret;
1362     DB *vdb;
1363     DBT key, val;
1364     u_int32_t dv;
1365
1366 #if DB_VERSION_MAJOR == 2
1367     DB_INFO dbinfo;
1368     memset(&dbinfo, 0, sizeof dbinfo);
1369
1370     ret = db_open("version", DB_BTREE, _db_flags, 0666, OVDBenv, &dbinfo,
1371                   &vdb);
1372     if (ret != 0) {
1373         syslog(L_FATAL, "OVDB: db_open failed: %s", db_strerror(ret));
1374         return ret;
1375     }
1376 #else
1377     /* open version db */
1378     ret = db_create(&vdb, OVDBenv, 0);
1379     if (ret != 0) {
1380         syslog(L_FATAL, "OVDB: open: db_create: %s", db_strerror(ret));
1381         return ret;
1382     }
1383 #if DB_VERSION_MAJOR > 4 || (DB_VERSION_MAJOR == 4 && DB_VERSION_MINOR >= 1)
1384     ret = vdb->open(vdb, NULL, "version", NULL, DB_BTREE, _db_flags, 0666);
1385 #else
1386     ret = vdb->open(vdb, "version", NULL, DB_BTREE, _db_flags, 0666);
1387 #endif
1388     if (ret != 0) {
1389         vdb->close(vdb, 0);
1390         syslog(L_FATAL, "OVDB: open: version->open: %s", db_strerror(ret));
1391         return ret;
1392     }
1393 #endif
1394     memset(&key, 0, sizeof key);
1395     memset(&val, 0, sizeof val);
1396     key.data = (char *) "dataversion";
1397     key.size = sizeof("dataversion");
1398     ret = vdb->get(vdb, NULL, &key, &val, 0);
1399     if (ret != 0) {
1400         if(ret != DB_NOTFOUND) {
1401             syslog(L_FATAL, "OVDB: open: can't retrieve version: %s", db_strerror(ret));
1402             vdb->close(vdb, 0);
1403             return ret;
1404         }
1405     }
1406     if(ret == DB_NOTFOUND || val.size != sizeof dv) {
1407         dv = DATA_VERSION;
1408         if(!(OVDBmode & OV_WRITE)) {
1409             vdb->close(vdb, 0);
1410             return EACCES;
1411         }
1412         val.data = &dv;
1413         val.size = sizeof dv;
1414         ret = vdb->put(vdb, NULL, &key, &val, 0);
1415         if (ret != 0) {
1416             syslog(L_FATAL, "OVDB: open: can't store version: %s", db_strerror(ret));
1417             vdb->close(vdb, 0);
1418             return ret;
1419         }
1420     } else
1421         memcpy(&dv, val.data, sizeof dv);
1422
1423     if(dv > DATA_VERSION) {
1424         syslog(L_FATAL, "OVDB: can't open database: unknown version %d", dv);
1425         vdb->close(vdb, 0);
1426         return EINVAL;
1427     }
1428     if(dv < DATA_VERSION) {
1429         syslog(L_FATAL, "OVDB: database is an old version, please run ovdb_init");
1430         vdb->close(vdb, 0);
1431         return EINVAL;
1432     }
1433
1434     /* The version db also stores some config values, which will override the
1435        corresponding ovdb.conf setting. */
1436
1437     key.data = (char *) "numdbfiles";
1438     key.size = sizeof("numdbfiles");
1439     ret = vdb->get(vdb, NULL, &key, &val, 0);
1440     if (ret != 0) {
1441         if(ret != DB_NOTFOUND) {
1442             syslog(L_FATAL, "OVDB: open: can't retrieve numdbfiles: %s", db_strerror(ret));
1443             vdb->close(vdb, 0);
1444             return ret;
1445         }
1446     }
1447     if(ret == DB_NOTFOUND || val.size != sizeof(ovdb_conf.numdbfiles)) {
1448         if(!(OVDBmode & OV_WRITE)) {
1449             vdb->close(vdb, 0);
1450             return EACCES;
1451         }
1452         val.data = &(ovdb_conf.numdbfiles);
1453         val.size = sizeof(ovdb_conf.numdbfiles);
1454         ret = vdb->put(vdb, NULL, &key, &val, 0);
1455         if (ret != 0) {
1456             syslog(L_FATAL, "OVDB: open: can't store numdbfiles: %s", db_strerror(ret));
1457             vdb->close(vdb, 0);
1458             return ret;
1459         }
1460     } else {
1461         memcpy(&(ovdb_conf.numdbfiles), val.data, sizeof(ovdb_conf.numdbfiles));
1462     }
1463
1464     vdb->close(vdb, 0);
1465     return 0;
1466 }
1467
1468
1469 int ovdb_open_berkeleydb(int mode, int flags)
1470 {
1471     int ret;
1472     u_int32_t ai_flags = DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_MPOOL|DB_INIT_TXN;
1473
1474     OVDBmode = mode;
1475     read_ovdb_conf();
1476
1477     if(OVDBenv != NULL)
1478         return 0;       /* already opened */
1479
1480     if(OVDBmode & OV_WRITE) {
1481         _db_flags |= DB_CREATE;
1482         ai_flags |= DB_CREATE;
1483     } else {
1484         _db_flags |= DB_RDONLY;
1485     }
1486     if(flags & OVDB_RECOVER)
1487         ai_flags |= DB_RECOVER;
1488
1489 #if DB_VERSION_MAJOR == 2 || (DB_VERSION_MAJOR == 3 && DB_VERSION_MINOR < 2)
1490     if(ovdb_conf.txn_nosync)
1491         ai_flags |= DB_TXN_NOSYNC;
1492 #endif
1493
1494 #if DB_VERSION_MAJOR == 2
1495
1496     OVDBenv = xcalloc(1, sizeof(DB_ENV));
1497
1498     OVDBenv->db_errcall = OVDBerror;
1499     OVDBenv->mp_size = ovdb_conf.cachesize;
1500     OVDBenv->lk_max = ovdb_conf.maxlocks;
1501
1502     /* initialize environment */
1503     ret = db_appinit(ovdb_conf.home, NULL, OVDBenv, ai_flags);
1504     if (ret != 0) {
1505         free(OVDBenv);
1506         OVDBenv = NULL;
1507         syslog(L_FATAL, "OVDB: db_appinit failed: %s", db_strerror(ret));
1508         return ret;
1509     }
1510 #else
1511     ret = db_env_create(&OVDBenv, 0);
1512     if (ret != 0) {
1513         syslog(L_FATAL, "OVDB: db_env_create: %s", db_strerror(ret));
1514         return ret;
1515     }
1516
1517     if((flags & (OVDB_UPGRADE | OVDB_RECOVER)) == (OVDB_UPGRADE | OVDB_RECOVER))
1518         ai_flags |= DB_PRIVATE;
1519     if(!(ai_flags & DB_PRIVATE)) {
1520         if(ovdb_conf.useshm)
1521             ai_flags |= DB_SYSTEM_MEM;
1522 #if DB_VERSION_MAJOR >= 4 || (DB_VERSION_MAJOR == 3 && DB_VERSION_MINOR > 0)
1523         OVDBenv->set_shm_key(OVDBenv, ovdb_conf.shmkey);
1524 #endif
1525     }
1526
1527     OVDBenv->set_errcall(OVDBenv, OVDBerror);
1528     OVDBenv->set_cachesize(OVDBenv, 0, ovdb_conf.cachesize, 1);
1529 #if DB_VERSION_MAJOR >= 4
1530     OVDBenv->set_lk_max_locks(OVDBenv, ovdb_conf.maxlocks);
1531     OVDBenv->set_lk_max_lockers(OVDBenv, ovdb_conf.maxlocks);
1532     OVDBenv->set_lk_max_objects(OVDBenv, ovdb_conf.maxlocks);
1533 #else
1534     OVDBenv->set_lk_max(OVDBenv, ovdb_conf.maxlocks);
1535 #endif
1536
1537 #if DB_VERSION_MAJOR >= 4 || (DB_VERSION_MAJOR == 3 && DB_VERSION_MINOR >= 2)
1538     if(ovdb_conf.txn_nosync)
1539         OVDBenv->set_flags(OVDBenv, DB_TXN_NOSYNC, 1);
1540 #endif
1541
1542     if((flags & (OVDB_UPGRADE | OVDB_RECOVER)) != OVDB_UPGRADE) {
1543 #if DB_VERSION_MAJOR == 3 && DB_VERSION_MINOR == 0
1544         ret = OVDBenv->open(OVDBenv, ovdb_conf.home, NULL, ai_flags, 0666);
1545 #else
1546         ret = OVDBenv->open(OVDBenv, ovdb_conf.home, ai_flags, 0666);
1547 #endif
1548         if (ret != 0) {
1549             OVDBenv->close(OVDBenv, 0);
1550             OVDBenv = NULL;
1551             syslog(L_FATAL, "OVDB: OVDBenv->open: %s", db_strerror(ret));
1552             return ret;
1553         }
1554     }
1555 #endif /* DB_VERSION_MAJOR == 2 */
1556
1557     return 0;
1558 }
1559
1560 bool ovdb_open(int mode)
1561 {
1562     int i, ret;
1563 #if DB_VERSION_MAJOR == 2
1564     DB_INFO dbinfo;
1565 #else
1566     DB_TXN *tid;
1567 #endif
1568
1569     if(OVDBenv != NULL || clientmode) {
1570         syslog(L_ERROR, "OVDB: ovdb_open called more than once");
1571         return false;
1572     }
1573
1574     read_ovdb_conf();
1575     if(ovdb_conf.readserver && mode == OV_READ)
1576         clientmode = 1;
1577
1578     if(mode & OVDB_SERVER)
1579         clientmode = 0;
1580
1581     if(clientmode) {
1582         if(client_connect() == 0)
1583             return true;
1584         clientmode = 0;
1585     }
1586     if(!ovdb_check_user()) {
1587         syslog(L_FATAL, "OVDB: must be running as " NEWSUSER " to access overview.");
1588         return false;
1589     }
1590     if(!ovdb_getlock(OVDB_LOCK_NORMAL)) {
1591         syslog(L_FATAL, "OVDB: ovdb_getlock failed: %m");
1592         return false;
1593     }
1594
1595     if(ovdb_open_berkeleydb(mode, 0) != 0)
1596         return false;
1597
1598     if(check_version() != 0)
1599         return false;
1600
1601     if(mode & OV_WRITE || mode & OVDB_SERVER) {
1602         oneatatime = 0;
1603     } else {
1604         oneatatime = 1;
1605     }
1606
1607 #if DB_VERSION_MAJOR == 2
1608     memset(&_dbinfo, 0, sizeof _dbinfo);
1609     _dbinfo.db_pagesize = ovdb_conf.pagesize;
1610     _dbinfo.bt_minkey = ovdb_conf.minkey;
1611 #endif
1612
1613     dbs = xcalloc(ovdb_conf.numdbfiles, sizeof(DB *));
1614     
1615     if(!oneatatime) {
1616         for(i = 0; i < ovdb_conf.numdbfiles; i++) {
1617             ret = open_db_file(i);
1618             if (ret != 0) {
1619                 syslog(L_FATAL, "OVDB: open_db_file failed: %s", db_strerror(ret));
1620                 return false;
1621             }
1622         }
1623     }
1624
1625 #if DB_VERSION_MAJOR == 2
1626     memset(&dbinfo, 0, sizeof dbinfo);
1627
1628     ret = db_open("groupinfo", DB_BTREE, _db_flags, 0666, OVDBenv,
1629                   &dbinfo, &groupinfo);
1630     if (ret != 0) {
1631         syslog(L_FATAL, "OVDB: db_open failed: %s", db_strerror(ret));
1632         return false;
1633     }
1634
1635     ret = db_open("groupaliases", DB_HASH, _db_flags, 0666, OVDBenv,
1636                   &dbinfo, &groupaliases);
1637     if (ret != 0) {
1638         syslog(L_FATAL, "OVDB: db_open failed: %s", db_strerror(ret));
1639         return false;
1640     }
1641 #else
1642     ret = db_create(&groupinfo, OVDBenv, 0);
1643     if (ret != 0) {
1644         syslog(L_FATAL, "OVDB: open: db_create: %s", db_strerror(ret));
1645         return false;
1646     }
1647 #if DB_VERSION_MAJOR > 4 || (DB_VERSION_MAJOR == 4 && DB_VERSION_MINOR >= 1)
1648     TXN_START(t_open_groupinfo, tid);
1649     ret = groupinfo->open(groupinfo, tid, "groupinfo", NULL, DB_BTREE,
1650                           _db_flags, 0666);
1651     if (ret == 0)
1652         TXN_COMMIT(t_open_groupinfo, tid);
1653 #else
1654     ret = groupinfo->open(groupinfo, "groupinfo", NULL, DB_BTREE,
1655                           _db_flags, 0666);
1656 #endif
1657     if (ret != 0) {
1658         groupinfo->close(groupinfo, 0);
1659         syslog(L_FATAL, "OVDB: open: groupinfo->open: %s", db_strerror(ret));
1660         return false;
1661     }
1662     ret = db_create(&groupaliases, OVDBenv, 0);
1663     if (ret != 0) {
1664         syslog(L_FATAL, "OVDB: open: db_create: %s", db_strerror(ret));
1665         return false;
1666     }
1667 #if DB_VERSION_MAJOR > 4 || (DB_VERSION_MAJOR == 4 && DB_VERSION_MINOR >= 1)
1668     TXN_START(t_open_groupaliases, tid);
1669     ret = groupaliases->open(groupaliases, tid, "groupaliases", NULL, DB_HASH,
1670                              _db_flags, 0666);
1671     if (ret == 0)
1672         TXN_COMMIT(t_open_groupaliases, tid);
1673 #else
1674     ret = groupaliases->open(groupaliases, "groupaliases", NULL, DB_HASH,
1675                              _db_flags, 0666);
1676 #endif
1677     if (ret != 0) {
1678         groupaliases->close(groupaliases, 0);
1679         syslog(L_FATAL, "OVDB: open: groupaliases->open: %s", db_strerror(ret));
1680         return false;
1681     }
1682 #endif
1683
1684     Cutofflow = false;
1685     return true;
1686 }
1687
1688
1689 bool
1690 ovdb_groupstats(char *group, int *lo, int *hi, int *count, int *flag)
1691 {
1692     int ret;
1693     struct groupinfo gi;
1694
1695     if (clientmode) {
1696         struct rs_cmd rs;
1697         struct rs_groupstats repl;
1698
1699         rs.what = CMD_GROUPSTATS;
1700         rs.grouplen = strlen(group)+1;
1701
1702         if (csend(&rs, sizeof(rs)) < 0)
1703             return false;
1704         if (csend(group, rs.grouplen) < 0)
1705             return false;
1706         crecv(&repl, sizeof(repl));
1707
1708         if(repl.status != CMD_GROUPSTATS)
1709             return false;
1710
1711         /* we don't use the alias yet, but the OV API will be extended
1712            at some point so that the alias is returned also */
1713         if(repl.aliaslen != 0) {
1714             char *buf = xmalloc(repl.aliaslen);
1715             crecv(buf, repl.aliaslen);
1716             free(buf);
1717         }
1718
1719         if(lo)
1720             *lo = repl.lo;
1721         if(hi)
1722             *hi = repl.hi;
1723         if(count)
1724             *count = repl.count;
1725         if(flag)
1726             *flag = repl.flag;
1727         return true;
1728     }
1729
1730     ret = ovdb_getgroupinfo(group, &gi, true, NULL, 0);
1731     switch (ret)
1732     {
1733     case 0:
1734         break;
1735     case DB_NOTFOUND:
1736         return false;
1737     default:
1738         syslog(L_ERROR, "OVDB: ovdb_getgroupinfo failed: %s", db_strerror(ret));
1739         return false;
1740     }
1741
1742     if(lo != NULL)
1743         *lo = gi.low;
1744     if(hi != NULL)
1745         *hi = gi.high;
1746     if(count != NULL)
1747         *count = gi.count;
1748     if(flag != NULL)
1749         *flag = gi.flag;
1750     return true;
1751 }
1752
1753 bool ovdb_groupadd(char *group, ARTNUM lo, ARTNUM hi, char *flag)
1754 {
1755     DBT key, val;
1756     struct groupinfo gi;
1757     DB_TXN *tid;
1758     int ret = 0;
1759     int new;
1760
1761     memset(&key, 0, sizeof key);
1762     memset(&val, 0, sizeof val);
1763
1764     TXN_START(t_groupadd, tid);
1765
1766     if(tid==NULL)
1767         return false;
1768
1769     new = 0;
1770     ret = ovdb_getgroupinfo(group, &gi, false, tid, DB_RMW);
1771     switch (ret)
1772     {
1773     case DB_NOTFOUND:
1774         new = 1;
1775     case 0:
1776         break;
1777     case TRYAGAIN:
1778         TXN_RETRY(t_groupadd, tid);
1779     default:
1780         TXN_ABORT(t_groupadd, tid);
1781         syslog(L_ERROR, "OVDB: ovdb_getgroupinfo failed: %s", db_strerror(ret));
1782         return false;
1783     }
1784
1785     if(!new && (gi.status & GROUPINFO_DELETED)
1786                 && !(gi.status & GROUPINFO_EXPIRING)
1787                 && !(gi.status & GROUPINFO_MOVING)) {
1788         int s, c = 0;
1789         char g[MAXHEADERSIZE];
1790
1791         strlcpy(g, group, sizeof(g));
1792         s = strlen(g) + 1;
1793         key.data = g;
1794         key.size = s + sizeof(int);
1795         do {
1796             c++;
1797             memcpy(g+s, &c, sizeof(int));
1798             ret = groupinfo->get(groupinfo, tid, &key, &val, 0);
1799         } while(ret == 0);
1800         if(ret == TRYAGAIN) {
1801             TXN_RETRY(t_groupadd, tid);
1802         }
1803         val.data = &gi;
1804         val.size = sizeof(gi);
1805         ret = groupinfo->put(groupinfo, tid, &key, &val, 0);
1806         switch (ret)
1807         {
1808         case 0:
1809             break;
1810         case TRYAGAIN:
1811             TXN_RETRY(t_groupadd, tid);
1812         default:
1813             TXN_ABORT(t_groupadd, tid);
1814             syslog(L_ERROR, "OVDB: groupinfo->put: %s", db_strerror(ret));
1815             return false;
1816         }
1817         key.data = group;
1818         key.size = strlen(group);
1819         ret = groupinfo->del(groupinfo, tid, &key, 0);
1820         switch (ret)
1821         {
1822         case 0:
1823             break;
1824         case TRYAGAIN:
1825             TXN_RETRY(t_groupadd, tid);
1826         default:
1827             TXN_ABORT(t_groupadd, tid);
1828             syslog(L_ERROR, "OVDB: groupinfo->del: %s", db_strerror(ret));
1829             return false;
1830         }
1831         new = 1;
1832     }
1833
1834     if(new) {
1835         ret = groupid_new(&gi.current_gid, tid);
1836         switch (ret)
1837         {
1838         case 0:
1839             break;
1840         case TRYAGAIN:
1841             TXN_RETRY(t_groupadd, tid);
1842         default:
1843             TXN_ABORT(t_groupadd, tid);
1844             syslog(L_ERROR, "OVDB: groupid_new: %s", db_strerror(ret));
1845             return false;
1846         }
1847         gi.low = lo ? lo : 1;
1848         gi.high = hi;
1849         gi.count = 0;
1850         gi.flag = *flag;
1851         gi.expired = time(NULL);
1852         gi.expiregrouppid = 0;
1853         gi.current_db = gi.new_db = which_db(group);
1854         gi.new_gid = gi.current_gid;
1855         gi.status = 0;
1856     } else {
1857         gi.flag = *flag;
1858     }
1859
1860     key.data = group;
1861     key.size = strlen(group);
1862     val.data = &gi;
1863     val.size = sizeof gi;
1864
1865     ret = groupinfo->put(groupinfo, tid, &key, &val, 0);
1866     switch (ret) {
1867     case 0:
1868         break;
1869     case TRYAGAIN:
1870         TXN_RETRY(t_groupadd, tid);
1871     default:
1872         TXN_ABORT(t_groupadd, tid);
1873         syslog(L_ERROR, "OVDB: groupadd: groupinfo->put: %s", db_strerror(ret));
1874         return false;
1875     }
1876
1877     if(*flag == '=') {
1878         key.data = group;
1879         key.size = strlen(group);
1880         val.data = flag + 1;
1881         val.size = strcspn(flag, "\n") - 1;
1882
1883         switch(ret = groupaliases->put(groupaliases, tid, &key, &val, 0)) {
1884         case 0:
1885             break;
1886         case TRYAGAIN:
1887             TXN_RETRY(t_groupadd, tid);
1888         default:
1889             TXN_ABORT(t_groupadd, tid);
1890             syslog(L_ERROR, "OVDB: groupadd: groupaliases->put: %s", db_strerror(ret));
1891             return false;
1892         }
1893     }
1894
1895     TXN_COMMIT(t_groupadd, tid);
1896     return true;
1897 }
1898
1899 bool ovdb_groupdel(char *group)
1900 {
1901     DBT key, val;
1902     struct groupinfo gi;
1903     DB_TXN *tid;
1904     int ret = 0;
1905
1906     memset(&key, 0, sizeof key);
1907     memset(&val, 0, sizeof val);
1908
1909     /* We only need to set the deleted flag in groupinfo to prevent readers
1910        from seeing this group.  The actual overview records aren't deleted
1911        now, since that could take a significant amount of time (and innd
1912        is who normally calls this function).  The expireover run will
1913        clean up the deleted groups. */
1914
1915     TXN_START(t_groupdel, tid);
1916
1917     if(tid==NULL)
1918         return false;
1919
1920     ret = ovdb_getgroupinfo(group, &gi, true, tid, DB_RMW);
1921     switch (ret)
1922     {
1923     case DB_NOTFOUND:
1924         return true;
1925     case 0:
1926         break;
1927     case TRYAGAIN:
1928         TXN_RETRY(t_groupdel, tid);
1929     default:
1930         syslog(L_ERROR, "OVDB: ovdb_getgroupinfo failed: %s", db_strerror(ret));
1931         TXN_ABORT(t_groupdel, tid);
1932         return false;
1933     }
1934
1935     gi.status |= GROUPINFO_DELETED;
1936
1937     key.data = group;
1938     key.size = strlen(group);
1939     val.data = &gi;
1940     val.size = sizeof gi;
1941
1942     switch(ret = groupinfo->put(groupinfo, tid, &key, &val, 0)) {
1943     case 0:
1944         break;
1945     case TRYAGAIN:
1946         TXN_RETRY(t_groupdel, tid);
1947     default:
1948         TXN_ABORT(t_groupdel, tid);
1949         syslog(L_ERROR, "OVDB: groupadd: groupinfo->put: %s", db_strerror(ret));
1950         return false;
1951     }
1952
1953     switch(ret = groupaliases->del(groupaliases, tid, &key, 0)) {
1954     case 0:
1955     case DB_NOTFOUND:
1956         break;
1957     case TRYAGAIN:
1958         TXN_RETRY(t_groupdel, tid);
1959     default:
1960         syslog(L_ERROR, "OVDB: groupdel: groupaliases->del: %s", db_strerror(ret));
1961         TXN_ABORT(t_groupdel, tid);
1962         return false;
1963     }
1964
1965     TXN_COMMIT(t_groupdel, tid);
1966     return true;
1967 }
1968
1969 bool ovdb_add(char *group, ARTNUM artnum, TOKEN token, char *data, int len, time_t arrived, time_t expires)
1970 {
1971     static size_t databuflen = 0;
1972     static char *databuf;
1973     DB          *db;
1974     DBT         key, val;
1975     DB_TXN      *tid;
1976     struct groupinfo gi;
1977     struct datakey dk;
1978     int ret = 0;
1979
1980     memset(&dk, 0, sizeof dk);
1981
1982     if(databuflen == 0) {
1983         databuflen = BIG_BUFFER;
1984         databuf = xmalloc(databuflen);
1985     }
1986     if(databuflen < len + sizeof(struct ovdata)) {
1987         databuflen = len + sizeof(struct ovdata);
1988         databuf = xrealloc(databuf, databuflen);
1989     }
1990
1991     /* hmm... BerkeleyDB needs something like a 'struct iovec' so that we don't
1992        have to make a new buffer and copy everything in to it */
1993
1994     ((struct ovdata *)databuf)->token = token;
1995     ((struct ovdata *)databuf)->arrived = arrived;
1996     ((struct ovdata *)databuf)->expires = expires;
1997     memcpy(databuf + sizeof(struct ovdata), data, len);
1998     len += sizeof(struct ovdata);
1999
2000     memset(&key, 0, sizeof key);
2001     memset(&val, 0, sizeof val);
2002
2003     /* start the transaction */
2004     TXN_START(t_add, tid);
2005
2006     if(tid==NULL)
2007         return false;
2008
2009     /* first, retrieve groupinfo */
2010     ret = ovdb_getgroupinfo(group, &gi, true, tid, DB_RMW);
2011     switch (ret)
2012     {
2013     case 0:
2014         break;
2015     case DB_NOTFOUND:
2016         TXN_ABORT(t_add, tid);
2017         return true;
2018     case TRYAGAIN:
2019         TXN_RETRY(t_add, tid);
2020     default:
2021         TXN_ABORT(t_add, tid);
2022         syslog(L_ERROR, "OVDB: add: ovdb_getgroupinfo: %s", db_strerror(ret));
2023         return false;
2024     }
2025
2026     /* adjust groupinfo */
2027     if(Cutofflow && gi.low > artnum) {
2028         TXN_ABORT(t_add, tid);
2029         return true;
2030     }
2031     if(gi.low == 0 || gi.low > artnum)
2032         gi.low = artnum;
2033     if(gi.high < artnum)
2034         gi.high = artnum;
2035     gi.count++;
2036
2037     /* store groupinfo */
2038     key.data = group;
2039     key.size = strlen(group);
2040     val.data = &gi;
2041     val.size = sizeof gi;
2042
2043     ret = groupinfo->put(groupinfo, tid, &key, &val, 0);
2044     switch (ret)
2045     {
2046     case 0:
2047         break;
2048     case TRYAGAIN:
2049         TXN_RETRY(t_add, tid);
2050     default:
2051         TXN_ABORT(t_add, tid);
2052         syslog(L_ERROR, "OVDB: add: groupinfo->put: %s", db_strerror(ret));
2053         return false;
2054     }
2055
2056     /* store overview */
2057     db = get_db_bynum(gi.current_db);
2058     if(db == NULL) {
2059         TXN_ABORT(t_add, tid);
2060         return false;
2061     }
2062     dk.groupnum = gi.current_gid;
2063     dk.artnum = htonl((u_int32_t)artnum);
2064
2065     key.data = &dk;
2066     key.size = sizeof dk;
2067     val.data = databuf;
2068     val.size = len;
2069
2070     ret = db->put(db, tid, &key, &val, 0);
2071     switch (ret)
2072     {
2073     case 0:
2074         break;
2075     case TRYAGAIN:
2076         TXN_RETRY(t_add, tid);
2077     default:
2078         TXN_ABORT(t_add, tid);
2079         syslog(L_ERROR, "OVDB: add: db->put: %s", db_strerror(ret));
2080         return false;
2081     }
2082
2083     if(artnum < gi.high && gi.status & GROUPINFO_MOVING) {
2084         /* If the GROUPINFO_MOVING flag is set, then expireover
2085            is writing overview records under a new groupid.
2086            If this overview record is not at the highmark,
2087            we need to also store it under the new groupid */
2088         db = get_db_bynum(gi.new_db);
2089         if(db == NULL) {
2090             TXN_ABORT(t_add, tid);
2091             return false;
2092         }
2093         dk.groupnum = gi.new_gid;
2094
2095         ret = db->put(db, tid, &key, &val, 0);
2096         switch (ret)
2097         {
2098         case 0:
2099             break;
2100         case TRYAGAIN:
2101             TXN_RETRY(t_add, tid);
2102         default:
2103             TXN_ABORT(t_add, tid);
2104             syslog(L_ERROR, "OVDB: add: db->put: %s", db_strerror(ret));
2105             return false;
2106         }
2107     }
2108
2109     TXN_COMMIT(t_add, tid);
2110     return true;
2111 }
2112
2113 bool ovdb_cancel(TOKEN token UNUSED)
2114 {
2115     return true;
2116 }
2117
2118 struct ovdbsearch {
2119     DBC *cursor;
2120     group_id_t gid;
2121     u_int32_t firstart;
2122     u_int32_t lastart;
2123     int state;
2124 };
2125
2126 /* Even though nnrpd only does one search at a time, a read server process could
2127    do many concurrent searches; hence we must keep track of an arbitrary number of
2128    open searches */
2129 static struct ovdbsearch **searches = NULL;
2130 static int nsearches = 0;
2131 static int maxsearches = 0;
2132
2133 void *
2134 ovdb_opensearch(char *group, int low, int high)
2135 {
2136     DB *db;
2137     struct ovdbsearch *s;
2138     struct groupinfo gi;
2139     int ret;
2140
2141     if(clientmode) {
2142         struct rs_cmd rs;
2143         struct rs_opensrch repl;
2144
2145         rs.what = CMD_OPENSRCH;
2146         rs.grouplen = strlen(group)+1;
2147         rs.artlo = low;
2148         rs.arthi = high;
2149
2150         if (csend(&rs, sizeof(rs)) < 0)
2151             return NULL;
2152         if (csend(group, rs.grouplen) < 0)
2153             return NULL;
2154         crecv(&repl, sizeof(repl));
2155
2156         if(repl.status != CMD_OPENSRCH)
2157             return NULL;
2158
2159         return repl.handle;
2160     }
2161
2162     ret = ovdb_getgroupinfo(group, &gi, true, NULL, 0);
2163     switch (ret)
2164     {
2165     case 0:
2166         break;
2167     case DB_NOTFOUND:
2168         return NULL;
2169     default:
2170         syslog(L_ERROR, "OVDB: ovdb_getgroupinfo failed: %s", db_strerror(ret));
2171         return NULL;
2172     }
2173
2174     s = xmalloc(sizeof(struct ovdbsearch));
2175     db = get_db_bynum(gi.current_db);
2176     if(db == NULL) {
2177         free(s);
2178         return NULL;
2179     }
2180
2181     ret = db->cursor(db, NULL, &(s->cursor), 0);
2182     if (ret != 0) {
2183         syslog(L_ERROR, "OVDB: opensearch: s->db->cursor: %s", db_strerror(ret));
2184         free(s);
2185         return NULL;
2186     }
2187
2188     s->gid = gi.current_gid;
2189     s->firstart = low;
2190     s->lastart = high;
2191     s->state = 0;
2192
2193     if(searches == NULL) {
2194         nsearches = 0;
2195         maxsearches = 50;
2196         searches = xmalloc(sizeof(struct ovdbsearch *) * maxsearches);
2197     }
2198     if(nsearches == maxsearches) {
2199         maxsearches += 50;
2200         searches = xrealloc(searches, sizeof(struct ovdbsearch *) * maxsearches);
2201     }
2202     searches[nsearches] = s;
2203     nsearches++;
2204
2205     return (void *)s;
2206 }
2207
2208 bool
2209 ovdb_search(void *handle, ARTNUM *artnum, char **data, int *len,
2210             TOKEN *token, time_t *arrived)
2211 {
2212     struct ovdbsearch *s = (struct ovdbsearch *)handle;
2213     DBT key, val;
2214     u_int32_t flags;
2215     struct ovdata ovd;
2216     struct datakey dk;
2217     int ret;
2218
2219     if (clientmode) {
2220         struct rs_cmd rs;
2221         struct rs_srch repl;
2222         static char *databuf;
2223         static int buflen = 0;
2224
2225         rs.what = CMD_SRCH;
2226         rs.handle = handle;
2227
2228         if (csend(&rs, sizeof(rs)) < 0)
2229             return false;
2230         if (crecv(&repl, sizeof(repl)) < 0)
2231             return false;
2232
2233         if(repl.status != CMD_SRCH)
2234             return false;
2235         if(repl.len > buflen) {
2236             if(buflen == 0) {
2237                 buflen = repl.len + 512;
2238                 databuf = xmalloc(buflen);
2239             } else {
2240                 buflen = repl.len + 512;
2241                 databuf = xrealloc(databuf, buflen);
2242             }
2243         }
2244         crecv(databuf, repl.len);
2245
2246         if(artnum)
2247             *artnum = repl.artnum;
2248         if(token)
2249             *token = repl.token;
2250         if(arrived)
2251             *arrived = repl.arrived;
2252         if(len)
2253             *len = repl.len;
2254         if(data)
2255             *data = databuf;
2256         return true;
2257     }
2258
2259     switch(s->state) {
2260     case 0:
2261         flags = DB_SET_RANGE;
2262         memset(&dk, 0, sizeof dk);
2263         dk.groupnum = s->gid;
2264         dk.artnum = htonl(s->firstart);
2265         s->state = 1;
2266         break;
2267     case 1:
2268         flags = DB_NEXT;
2269         break;
2270     case 2:
2271         s->state = 3;
2272         return false;
2273     default:
2274         syslog(L_ERROR, "OVDB: OVsearch called again after false return");
2275         return false;
2276     }
2277
2278     memset(&key, 0, sizeof key);
2279     memset(&val, 0, sizeof val);
2280     key.data = &dk;
2281     key.size = key.ulen = sizeof(struct datakey);
2282     key.flags = DB_DBT_USERMEM;
2283
2284     if(!data && !len) {
2285         /* caller doesn't need data, so we don't have to retrieve it all */
2286         val.flags |= DB_DBT_PARTIAL;
2287
2288         if(token || arrived)
2289             val.dlen = sizeof(struct ovdata);
2290     }
2291
2292     switch(ret = s->cursor->c_get(s->cursor, &key, &val, flags)) {
2293     case 0:
2294         break;
2295     case DB_NOTFOUND:
2296         s->state = 3;
2297         s->cursor->c_close(s->cursor);
2298         s->cursor = NULL;
2299         return false;
2300     default:
2301         syslog(L_ERROR, "OVDB: search: c_get: %s", db_strerror(ret));
2302         s->state = 3;
2303         s->cursor->c_close(s->cursor);
2304         s->cursor = NULL;
2305         return false;
2306     }
2307
2308     if(key.size != sizeof(struct datakey)) {
2309         s->state = 3;
2310         s->cursor->c_close(s->cursor);
2311         s->cursor = NULL;
2312         return false;
2313     }
2314
2315     if(dk.groupnum != s->gid || ntohl(dk.artnum) > s->lastart) {
2316         s->state = 3;
2317         s->cursor->c_close(s->cursor);
2318         s->cursor = NULL;
2319         return false;
2320     }
2321
2322     if( ((len || data) && val.size <= sizeof(struct ovdata))
2323         || ((token || arrived) && val.size < sizeof(struct ovdata)) ) {
2324         syslog(L_ERROR, "OVDB: search: bad value length");
2325         s->state = 3;
2326         s->cursor->c_close(s->cursor);
2327         s->cursor = NULL;
2328         return false;
2329     }
2330
2331     if(ntohl(dk.artnum) == s->lastart) {
2332         s->state = 2;
2333         s->cursor->c_close(s->cursor);
2334         s->cursor = NULL;
2335     }
2336
2337     if(artnum)
2338         *artnum = ntohl(dk.artnum);
2339
2340     if(token || arrived)
2341         memcpy(&ovd, val.data, sizeof(struct ovdata));
2342     if(token)
2343         *token = ovd.token;
2344     if(arrived)
2345         *arrived = ovd.arrived;
2346
2347     if(len)
2348         *len = val.size - sizeof(struct ovdata);
2349     if(data)
2350         *data = (char *)val.data + sizeof(struct ovdata);
2351
2352     return true;
2353 }
2354
2355 void ovdb_closesearch(void *handle)
2356 {
2357     int i;
2358     if(clientmode) {
2359         struct rs_cmd rs;
2360
2361         rs.what = CMD_CLOSESRCH;
2362         rs.handle = handle;
2363         csend(&rs, sizeof(rs));
2364         /* no reply is sent for a CMD_CLOSESRCH */
2365     } else {
2366         struct ovdbsearch *s = (struct ovdbsearch *)handle;
2367
2368         if(s->cursor)
2369             s->cursor->c_close(s->cursor);
2370
2371         for(i = 0; i < nsearches; i++) {
2372             if(s == searches[i]) {
2373                 break;
2374             }
2375         }
2376         nsearches--;
2377         for( ; i < nsearches; i++) {
2378             searches[i] = searches[i+1];
2379         }
2380
2381         free(handle);
2382     }
2383 }
2384
2385 bool ovdb_getartinfo(char *group, ARTNUM artnum, TOKEN *token)
2386 {
2387     int ret, cdb = 0;
2388     group_id_t cgid = 0;
2389     DB *db;
2390     DBT key, val;
2391     struct ovdata ovd;
2392     struct datakey dk;
2393     struct groupinfo gi;
2394     int pass = 0;
2395
2396     if(clientmode) {
2397         struct rs_cmd rs;
2398         struct rs_artinfo repl;
2399
2400         rs.what = CMD_ARTINFO;
2401         rs.grouplen = strlen(group)+1;
2402         rs.artlo = artnum;
2403
2404         if (csend(&rs, sizeof(rs)) < 0)
2405             return false;
2406         if (csend(group, rs.grouplen) < 0)
2407             return false;
2408         crecv(&repl, sizeof(repl));
2409
2410         if(repl.status != CMD_ARTINFO)
2411             return false;
2412
2413         if(token)
2414             *token = repl.token;
2415
2416         return true;
2417     }
2418
2419     while(1) {
2420         ret = ovdb_getgroupinfo(group, &gi, true, NULL, 0);
2421         switch (ret)
2422         {
2423         case 0:
2424             break;
2425         case DB_NOTFOUND:
2426             return false;
2427         default:
2428             syslog(L_ERROR, "OVDB: ovdb_getgroupinfo failed: %s", db_strerror(ret));
2429             return false;
2430         }
2431
2432         if(pass) {
2433             /* This was our second groupinfo retrieval; because the article
2434                retrieval came up empty.  If the group ID hasn't changed
2435                since the first groupinfo retrieval, we can assume the article
2436                is definitely not there.  Otherwise, we'll try to retrieve
2437                it the article again. */
2438             if(cdb == gi.current_db && cgid == gi.current_gid)
2439                 return false;
2440         }
2441
2442         db = get_db_bynum(gi.current_db);
2443         if(db == NULL)
2444             return false;
2445
2446         memset(&dk, 0, sizeof dk);
2447         dk.groupnum = gi.current_gid;
2448         dk.artnum = htonl((u_int32_t)artnum);
2449
2450         memset(&key, 0, sizeof key);
2451         memset(&val, 0, sizeof val);
2452
2453         key.data = &dk;
2454         key.size = sizeof dk;
2455
2456         /* caller doesn't need data, so we don't have to retrieve it all */
2457         val.flags = DB_DBT_PARTIAL;
2458
2459         if(token)
2460             val.dlen = sizeof(struct ovdata);
2461
2462         switch(ret = db->get(db, NULL, &key, &val, 0)) {
2463         case 0:
2464         case DB_NOTFOUND:
2465             break;
2466         default:
2467             syslog(L_ERROR, "OVDB: getartinfo: db->get: %s", db_strerror(ret));
2468             return false;
2469         }
2470
2471         if(ret == DB_NOTFOUND) {
2472             /* If the group is being moved (i.e., its group ID is going
2473                to change), there's a chance the article is now under the
2474                new ID.  So we'll grab the groupinfo again to check for
2475                that. */
2476             if(!pass && (gi.status & GROUPINFO_MOVING)) {
2477                 cdb = gi.current_db;
2478                 cgid = gi.current_gid;
2479                 pass++;
2480                 continue;
2481             }
2482             return false;
2483         }
2484         break;
2485     }
2486
2487     if(token && val.size < sizeof(struct ovdata)) {
2488         syslog(L_ERROR, "OVDB: getartinfo: data too short");
2489         return false;
2490     }
2491
2492     if(token) {
2493         memcpy(&ovd, val.data, sizeof(struct ovdata));
2494         *token = ovd.token;
2495     }
2496     return true;
2497 }
2498
2499 bool ovdb_expiregroup(char *group, int *lo, struct history *h)
2500 {
2501     DB *db, *ndb = NULL;
2502     DBT key, val, nkey, gkey, gval;
2503     DB_TXN *tid;
2504     DBC *cursor = NULL;
2505     int ret = 0, delete, old_db = 0, cleanup;
2506     struct groupinfo gi;
2507     struct ovdata ovd;
2508     struct datakey dk, ndk;
2509     group_id_t old_gid = 0;
2510     ARTHANDLE *ah;
2511     u_int32_t artnum = 0, currentart, lowest;
2512     int i, compact, done, currentcount, newcount;
2513
2514     if(eo_start == 0) {
2515         eo_start = time(NULL);
2516         delete_old_stuff(0);    /* remove deleted groups first */
2517     }
2518
2519     /* Special case:  when called with NULL group, we're to clean out
2520      * records for forgotton groups (groups removed from the active file
2521      * but not from overview).
2522      * This happens at the end of the expireover run, and only if all
2523      * of the groups in the active file have been processed.
2524      * delete_old_stuff(1) will remove groups that are in ovdb but
2525      * have not been processed during this run.
2526      */
2527
2528     if(group == NULL)
2529         return delete_old_stuff(1);
2530
2531     memset(&key, 0, sizeof key);
2532     memset(&nkey, 0, sizeof nkey);
2533     memset(&val, 0, sizeof val);
2534     memset(&dk, 0, sizeof dk);
2535     memset(&ndk, 0, sizeof ndk);
2536
2537     TXN_START(t_expgroup_1, tid);
2538
2539     if(tid==NULL)
2540         return false;
2541
2542     cleanup = 0;
2543
2544     ret = ovdb_getgroupinfo(group, &gi, true, tid, DB_RMW);
2545     switch (ret)
2546     {
2547     case 0:
2548         break;
2549     case TRYAGAIN:
2550         TXN_RETRY(t_expgroup_1, tid);
2551     default:
2552         syslog(L_ERROR, "OVDB: expiregroup: ovdb_getgroupinfo failed: %s", db_strerror(ret));
2553     case DB_NOTFOUND:
2554         TXN_ABORT(t_expgroup_1, tid);
2555         return false;
2556     }
2557
2558     if(gi.status & GROUPINFO_EXPIRING) {
2559         /* is there another expireover working on this group? */
2560         ret = kill(gi.expiregrouppid, 0);
2561         switch(ret)
2562         {
2563         case 0:
2564         case EPERM:
2565             TXN_ABORT(t_expgroup_1, tid);
2566             return false;
2567         }
2568
2569         /* a previous expireover run must've died.  We'll clean
2570            up after it */
2571         if(gi.status & GROUPINFO_MOVING) {
2572             cleanup = 1;
2573             old_db = gi.new_db;
2574             old_gid = gi.new_gid;
2575
2576             ret = mk_temp_groupinfo(old_db, old_gid, tid);
2577             switch(ret) {
2578             case 0:
2579                 break;
2580             case TRYAGAIN:
2581                 TXN_RETRY(t_expgroup_1, tid);
2582             default:
2583                 TXN_ABORT(t_expgroup_1, tid);
2584                 return false;
2585             }
2586             gi.status &= ~GROUPINFO_MOVING;
2587         }
2588     }
2589
2590     if(gi.count < ovdb_conf.nocompact || ovdb_conf.nocompact == 0)
2591         compact = 1;
2592     else
2593         compact = 0;
2594
2595     if(gi.count == 0)
2596         compact = 0;
2597
2598     db = get_db_bynum(gi.current_db);
2599     if(db == NULL) {
2600         TXN_ABORT(t_expgroup_1, tid);
2601         return false;
2602     }
2603
2604     gi.status |= GROUPINFO_EXPIRING;
2605     gi.expiregrouppid = getpid();
2606     if(compact) {
2607         gi.status |= GROUPINFO_MOVING;
2608         gi.new_db = gi.current_db;
2609         ndb = db;
2610         ret = groupid_new(&gi.new_gid, tid);
2611         switch (ret)
2612         {
2613         case 0:
2614             break;
2615         case TRYAGAIN:
2616             TXN_RETRY(t_expgroup_1, tid);
2617         default:
2618             TXN_ABORT(t_expgroup_1, tid);
2619             syslog(L_ERROR, "OVDB: expiregroup: groupid_new: %s", db_strerror(ret));
2620             return false;
2621         }
2622     }
2623
2624     key.data = group;
2625     key.size = strlen(group);
2626     val.data = &gi;
2627     val.size = sizeof gi;
2628
2629     ret = groupinfo->put(groupinfo, tid, &key, &val, 0);
2630     switch (ret)
2631     {
2632     case 0:
2633         break;
2634     case TRYAGAIN:
2635         TXN_RETRY(t_expgroup_1, tid);
2636     default:
2637         TXN_ABORT(t_expgroup_1, tid);
2638         syslog(L_ERROR, "OVDB: expiregroup: groupinfo->put: %s", db_strerror(ret));
2639         return false;
2640     }
2641     TXN_COMMIT(t_expgroup_1, tid);
2642
2643     if(cleanup) {
2644         if(delete_all_records(old_db, old_gid) == 0) {
2645             rm_temp_groupinfo(old_gid);
2646         }
2647     }
2648
2649     /*
2650      * The following loop iterates over the OV records for the group in
2651      * "batches", to limit transaction sizes.
2652      *
2653      * loop {
2654      *    start transaction
2655      *    get groupinfo
2656      *    process EXPIREGROUP_TXN_SIZE records
2657      *    write updated groupinfo
2658      *    commit transaction
2659      * }
2660      */
2661     currentart = 0;
2662     lowest = currentcount = 0;
2663
2664     memset(&gkey, 0, sizeof gkey);
2665     memset(&gval, 0, sizeof gval);
2666     gkey.data = group;
2667     gkey.size = strlen(group);
2668     gval.data = &gi;
2669     gval.size = sizeof gi;
2670
2671     while(1) {
2672         TXN_START(t_expgroup_loop, tid);
2673         if(tid==NULL)
2674             return false;
2675         done = 0;
2676         newcount = 0;
2677
2678         ret = ovdb_getgroupinfo(group, &gi, false, tid, DB_RMW);
2679         switch (ret)
2680         {
2681         case 0:
2682             break;
2683         case TRYAGAIN:
2684             TXN_RETRY(t_expgroup_loop, tid);
2685         default:
2686             TXN_ABORT(t_expgroup_loop, tid);
2687             syslog(L_ERROR, "OVDB: expiregroup: ovdb_getgroupinfo: %s", db_strerror(ret));
2688             return false;
2689         }
2690
2691         ret = db->cursor(db, tid, &cursor, 0);
2692         switch (ret)
2693         {
2694         case 0:
2695             break;
2696         case TRYAGAIN:
2697             TXN_RETRY(t_expgroup_loop, tid);
2698         default:
2699             TXN_ABORT(t_expgroup_loop, tid);
2700             syslog(L_ERROR, "OVDB: expiregroup: db->cursor: %s", db_strerror(ret));
2701             return false;
2702         }
2703
2704         dk.groupnum = gi.current_gid;
2705         dk.artnum = htonl(currentart);
2706         key.data = &dk;
2707         key.size = key.ulen = sizeof dk;
2708         key.flags = DB_DBT_USERMEM;
2709
2710         for(i=0; i < EXPIREGROUP_TXN_SIZE; i++) {
2711             ret = cursor->c_get(cursor, &key, &val,
2712                                 i == 0 ? DB_SET_RANGE : DB_NEXT);
2713             switch (ret)
2714             {
2715             case 0:
2716             case DB_NOTFOUND:
2717                 break;
2718             case TRYAGAIN:
2719                 cursor->c_close(cursor);
2720                 TXN_RETRY(t_expgroup_loop, tid);
2721             default:
2722                 cursor->c_close(cursor);
2723                 TXN_ABORT(t_expgroup_loop, tid);
2724                 syslog(L_ERROR, "OVDB: expiregroup: c_get: %s", db_strerror(ret));
2725                 return false;
2726             }
2727
2728             /* stop if: there are no more keys, an unknown key is reached,
2729                or reach a different group */
2730
2731             if(ret == DB_NOTFOUND
2732                     || key.size != sizeof dk
2733                     || dk.groupnum != gi.current_gid) {
2734                 done++;
2735                 break;
2736             }
2737
2738             artnum = ntohl(dk.artnum);
2739
2740             delete = 0;
2741             if(val.size < sizeof ovd) {
2742                 delete = 1;     /* must be corrupt, just delete it */
2743             } else {
2744                 memcpy(&ovd, val.data, sizeof ovd);
2745
2746                 ah = NULL;
2747                 if (!SMprobe(EXPENSIVESTAT, &ovd.token, NULL) || OVstatall) {
2748                     if((ah = SMretrieve(ovd.token, RETR_STAT)) == NULL) {
2749                         delete = 1;
2750                     } else
2751                         SMfreearticle(ah);
2752                 } else {
2753                     if (!OVhisthasmsgid(h, (char *)val.data + sizeof(ovd))) {
2754                         delete = 1;
2755                     }
2756                 }
2757                 if (!delete && innconf->groupbaseexpiry &&
2758                             OVgroupbasedexpire(ovd.token, group,
2759                                     (char *)val.data + sizeof(ovd),
2760                                     val.size - sizeof(ovd),
2761                                     ovd.arrived, ovd.expires)) {
2762                     delete = 1;
2763                 }
2764             }
2765
2766             if(delete) {
2767                 if(!compact) {
2768                     switch(ret = cursor->c_del(cursor, 0)) {
2769                     case 0:
2770                     case DB_NOTFOUND:
2771                     case DB_KEYEMPTY:
2772                         break;
2773                     case TRYAGAIN:
2774                         cursor->c_close(cursor);
2775                         TXN_RETRY(t_expgroup_loop, tid);
2776                     default:
2777                         cursor->c_close(cursor);
2778                         TXN_ABORT(t_expgroup_loop, tid);
2779                         syslog(L_ERROR, "OVDB: expiregroup: c_del: %s", db_strerror(ret));
2780                         return false;
2781                     }
2782                 }
2783                 if(gi.count > 0)
2784                     gi.count--;
2785             } else {
2786                 if(compact) {
2787                     ndk.groupnum = gi.new_gid;
2788                     ndk.artnum = dk.artnum;
2789                     nkey.data = &ndk;
2790                     nkey.size = sizeof ndk;
2791
2792                     switch(ret = ndb->put(ndb, tid, &nkey, &val, 0)) {
2793                     case 0:
2794                         break;
2795                     case TRYAGAIN:
2796                         cursor->c_close(cursor);
2797                         TXN_RETRY(t_expgroup_loop, tid);
2798                     default:
2799                         cursor->c_close(cursor);
2800                         TXN_ABORT(t_expgroup_loop, tid);
2801                         syslog(L_ERROR, "OVDB: expiregroup: ndb->put: %s", db_strerror(ret));
2802                         return false;
2803                     }
2804                 }
2805                 newcount++;
2806                 if(lowest != -1 && (lowest == 0 || artnum < lowest))
2807                     lowest = artnum;
2808             }
2809         }
2810         /* end of for loop */
2811
2812         if(cursor->c_close(cursor) == TRYAGAIN) {
2813             TXN_RETRY(t_expgroup_loop, tid);
2814         }
2815
2816         if(lowest != 0 && lowest != -1)
2817             gi.low = lowest;
2818
2819         if(done) {
2820             if(compact) {
2821                 old_db = gi.current_db;
2822                 gi.current_db = gi.new_db;
2823                 old_gid = gi.current_gid;
2824                 gi.current_gid = gi.new_gid;
2825                 gi.status &= ~GROUPINFO_MOVING;
2826
2827                 ret = mk_temp_groupinfo(old_db, old_gid, tid);
2828                 switch(ret) {
2829                 case 0:
2830                     break;
2831                 case TRYAGAIN:
2832                     TXN_RETRY(t_expgroup_loop, tid);
2833                 default:
2834                     TXN_ABORT(t_expgroup_loop, tid);
2835                     return false;
2836                 }
2837             }
2838
2839             gi.status &= ~GROUPINFO_EXPIRING;
2840             gi.expired = time(NULL);
2841             if(gi.count == 0 && lowest == 0)
2842                 gi.low = gi.high+1;
2843         }
2844
2845         ret = groupinfo->put(groupinfo, tid, &gkey, &gval, 0);
2846         switch (ret)
2847         {
2848         case 0:
2849             break;
2850         case TRYAGAIN:
2851             TXN_RETRY(t_expgroup_loop, tid);
2852         default:
2853             TXN_ABORT(t_expgroup_loop, tid);
2854             syslog(L_ERROR, "OVDB: expiregroup: groupinfo->put: %s", db_strerror(ret));
2855             return false;
2856         }
2857         TXN_COMMIT(t_expgroup_loop, tid);
2858
2859         currentcount += newcount;
2860         if(lowest != 0)
2861             lowest = -1;
2862
2863         if(done)
2864             break;
2865
2866         currentart = artnum+1;
2867     }
2868
2869     if(compact) {
2870         if(delete_all_records(old_db, old_gid) == 0) {
2871             rm_temp_groupinfo(old_gid);
2872         }
2873     }
2874
2875     if(currentcount != gi.count) {
2876         syslog(L_NOTICE, "OVDB: expiregroup: recounting %s", group);
2877
2878         TXN_START(t_expgroup_recount, tid);
2879         if(tid == NULL)
2880             return false;
2881
2882         switch(ret = ovdb_getgroupinfo(group, &gi, false, tid, DB_RMW)) {
2883         case 0:
2884             break;
2885         case TRYAGAIN:
2886             TXN_RETRY(t_expgroup_recount, tid);
2887         default:
2888             TXN_ABORT(t_expgroup_recount, tid);
2889             syslog(L_ERROR, "OVDB: expiregroup: ovdb_getgroupinfo: %s", db_strerror(ret));
2890             return false;
2891         }
2892
2893         if(count_records(&gi) != 0) {
2894             TXN_ABORT(t_expgroup_recount, tid);
2895             return false;
2896         }
2897
2898         ret = groupinfo->put(groupinfo, tid, &gkey, &gval, 0);
2899         switch (ret)
2900         {
2901         case 0:
2902             break;
2903         case TRYAGAIN:
2904             TXN_RETRY(t_expgroup_recount, tid);
2905         default:
2906             TXN_ABORT(t_expgroup_recount, tid);
2907             syslog(L_ERROR, "OVDB: expiregroup: groupinfo->put: %s", db_strerror(ret));
2908             return false;
2909         }
2910         TXN_COMMIT(t_expgroup_recount, tid);
2911     }
2912
2913     if(lo)
2914         *lo = gi.low;
2915     return true;
2916 }
2917
2918 bool ovdb_ctl(OVCTLTYPE type, void *val)
2919 {
2920     int *i;
2921     OVSORTTYPE *sorttype;
2922     bool *boolval;
2923
2924     switch (type) {
2925     case OVSPACE:
2926         i = (int *)val;
2927         *i = -1;
2928         return true;
2929     case OVSORT:
2930         sorttype = (OVSORTTYPE *)val;
2931         *sorttype = OVNEWSGROUP;
2932         return true;
2933     case OVCUTOFFLOW:
2934         Cutofflow = *(bool *)val;
2935         return true;
2936     case OVSTATICSEARCH:
2937         i = (int *)val;
2938         *i = true;
2939         return true;
2940     case OVCACHEKEEP:
2941     case OVCACHEFREE:
2942         boolval = (bool *)val;
2943         *boolval = false;
2944         return true;
2945     default:
2946         return false;
2947     }
2948 }
2949
2950 void ovdb_close_berkeleydb(void)
2951 {
2952     if(OVDBenv) {
2953         /* close db environment */
2954 #if DB_VERSION_MAJOR == 2
2955         db_appexit(OVDBenv);
2956         free(OVDBenv);
2957 #else
2958         OVDBenv->close(OVDBenv, 0);
2959 #endif
2960         OVDBenv = NULL;
2961     }
2962 }
2963
2964 void ovdb_close(void)
2965 {
2966     int i;
2967
2968     if(clientmode) {
2969         client_disconnect();
2970         return;
2971     }
2972
2973     while(searches != NULL && nsearches) {
2974         ovdb_closesearch(searches[0]);
2975     }
2976     if(searches != NULL) {
2977         free(searches);
2978         searches = NULL;
2979     }
2980
2981     if(dbs) {
2982         /* close databases */
2983         for(i = 0; i < ovdb_conf.numdbfiles; i++)
2984             close_db_file(i);
2985
2986         free(dbs);
2987         dbs = NULL;
2988     }
2989     if(groupinfo) {
2990         groupinfo->close(groupinfo, 0);
2991         groupinfo = NULL;
2992     }
2993     if(groupaliases) {
2994         groupaliases->close(groupaliases, 0);
2995         groupaliases = NULL;
2996     }
2997
2998     ovdb_close_berkeleydb();
2999     ovdb_releaselock();
3000 }
3001
3002
3003 #endif /* USE_BERKELEY_DB */
3004