chiark / gitweb /
[PATCH] PATCH udev close on exec
[elogind.git] / tdb / tdb.c
1  /* 
2    Unix SMB/CIFS implementation.
3    Samba database functions
4    Copyright (C) Andrew Tridgell              1999-2000
5    Copyright (C) Luke Kenneth Casson Leighton      2000
6    Copyright (C) Paul `Rusty' Russell              2000
7    Copyright (C) Jeremy Allison                    2000-2003
8    
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 2 of the License, or
12    (at your option) any later version.
13    
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18    
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22 */
23
24
25 /* NOTE: If you use tdbs under valgrind, and in particular if you run
26  * tdbtorture, you may get spurious "uninitialized value" warnings.  I
27  * think this is because valgrind doesn't understand that the mmap'd
28  * area may be written to by other processes.  Memory can, from the
29  * point of view of the grinded process, spontaneously become
30  * initialized.
31  *
32  * I can think of a few solutions.  [mbp 20030311]
33  *
34  * 1 - Write suppressions for Valgrind so that it doesn't complain
35  * about this.  Probably the most reasonable but people need to
36  * remember to use them.
37  *
38  * 2 - Use IO not mmap when running under valgrind.  Not so nice.
39  *
40  * 3 - Use the special valgrind macros to mark memory as valid at the
41  * right time.  Probably too hard -- the process just doesn't know.
42  */ 
43
44 /* udev defines */
45 #define STANDALONE
46 #define TDB_DEBUG
47 #define HAVE_MMAP       1
48
49
50 #ifdef STANDALONE
51 #if HAVE_CONFIG_H
52 #include <config.h>
53 #endif
54
55 #define _KLIBC_HAS_ARCH_SIG_ATOMIC_T
56 #include <stdlib.h>
57 #include <stdio.h>
58 #include <fcntl.h>
59 #include <unistd.h>
60 #include <string.h>
61 #include <fcntl.h>
62 #include <errno.h>
63 #include <sys/mman.h>
64 #include <sys/stat.h>
65 #include <signal.h>
66 #include "tdb.h"
67 #include "spinlock.h"
68 #include "../udev_lib.h"
69 #else
70 #include "includes.h"
71 #endif
72
73 #define TDB_MAGIC_FOOD "TDB file\n"
74 #define TDB_VERSION (0x26011967 + 6)
75 #define TDB_MAGIC (0x26011999U)
76 #define TDB_FREE_MAGIC (~TDB_MAGIC)
77 #define TDB_DEAD_MAGIC (0xFEE1DEAD)
78 #define TDB_ALIGNMENT 4
79 #define MIN_REC_SIZE (2*sizeof(struct list_struct) + TDB_ALIGNMENT)
80 #define DEFAULT_HASH_SIZE 131
81 #define TDB_PAGE_SIZE 0x2000
82 #define FREELIST_TOP (sizeof(struct tdb_header))
83 #define TDB_ALIGN(x,a) (((x) + (a)-1) & ~((a)-1))
84 #define TDB_BYTEREV(x) (((((x)&0xff)<<24)|((x)&0xFF00)<<8)|(((x)>>8)&0xFF00)|((x)>>24))
85 #define TDB_DEAD(r) ((r)->magic == TDB_DEAD_MAGIC)
86 #define TDB_BAD_MAGIC(r) ((r)->magic != TDB_MAGIC && !TDB_DEAD(r))
87 #define TDB_HASH_TOP(hash) (FREELIST_TOP + (BUCKET(hash)+1)*sizeof(tdb_off))
88
89 /* NB assumes there is a local variable called "tdb" that is the
90  * current context, also takes doubly-parenthesized print-style
91  * argument. */
92 #define TDB_LOG(x) (tdb->log_fn?((tdb->log_fn x),0) : 0)
93
94 /* lock offsets */
95 #define GLOBAL_LOCK 0
96 #define ACTIVE_LOCK 4
97
98 #ifndef MAP_FILE
99 #define MAP_FILE 0
100 #endif
101
102 #ifndef MAP_FAILED
103 #define MAP_FAILED ((void *)-1)
104 #endif
105
106 /* free memory if the pointer is valid and zero the pointer */
107 #ifndef SAFE_FREE
108 #define SAFE_FREE(x) do { if ((x) != NULL) {free((x)); (x)=NULL;} } while(0)
109 #endif
110
111 #define BUCKET(hash) ((hash) % tdb->header.hash_size)
112 TDB_DATA tdb_null;
113
114 /* all contexts, to ensure no double-opens (fcntl locks don't nest!) */
115 static TDB_CONTEXT *tdbs = NULL;
116
117 static int tdb_munmap(TDB_CONTEXT *tdb)
118 {
119         if (tdb->flags & TDB_INTERNAL)
120                 return 0;
121
122 #ifdef HAVE_MMAP
123         if (tdb->map_ptr) {
124                 int ret = munmap(tdb->map_ptr, tdb->map_size);
125                 if (ret != 0)
126                         return ret;
127         }
128 #endif
129         tdb->map_ptr = NULL;
130         return 0;
131 }
132
133 static void tdb_mmap(TDB_CONTEXT *tdb)
134 {
135         if (tdb->flags & TDB_INTERNAL)
136                 return;
137
138 #ifdef HAVE_MMAP
139         if (!(tdb->flags & TDB_NOMMAP)) {
140                 tdb->map_ptr = mmap(NULL, tdb->map_size, 
141                                     PROT_READ|(tdb->read_only? 0:PROT_WRITE), 
142                                     MAP_SHARED|MAP_FILE, tdb->fd, 0);
143
144                 /*
145                  * NB. When mmap fails it returns MAP_FAILED *NOT* NULL !!!!
146                  */
147
148                 if (tdb->map_ptr == MAP_FAILED) {
149                         tdb->map_ptr = NULL;
150                         TDB_LOG((tdb, 2, "tdb_mmap failed for size %d (%s)\n", 
151                                  tdb->map_size, strerror(errno)));
152                 }
153         } else {
154                 tdb->map_ptr = NULL;
155         }
156 #else
157         tdb->map_ptr = NULL;
158 #endif
159 }
160
161 /* Endian conversion: we only ever deal with 4 byte quantities */
162 static void *convert(void *buf, u32 size)
163 {
164         u32 i, *p = buf;
165         for (i = 0; i < size / 4; i++)
166                 p[i] = TDB_BYTEREV(p[i]);
167         return buf;
168 }
169 #define DOCONV() (tdb->flags & TDB_CONVERT)
170 #define CONVERT(x) (DOCONV() ? convert(&x, sizeof(x)) : &x)
171
172 /* the body of the database is made of one list_struct for the free space
173    plus a separate data list for each hash value */
174 struct list_struct {
175         tdb_off next; /* offset of the next record in the list */
176         tdb_len rec_len; /* total byte length of record */
177         tdb_len key_len; /* byte length of key */
178         tdb_len data_len; /* byte length of data */
179         u32 full_hash; /* the full 32 bit hash of the key */
180         u32 magic;   /* try to catch errors */
181         /* the following union is implied:
182                 union {
183                         char record[rec_len];
184                         struct {
185                                 char key[key_len];
186                                 char data[data_len];
187                         }
188                         u32 totalsize; (tailer)
189                 }
190         */
191 };
192
193 /***************************************************************
194  Allow a caller to set a "alarm" flag that tdb can check to abort
195  a blocking lock on SIGALRM.
196 ***************************************************************/
197
198 static sig_atomic_t *palarm_fired;
199
200 void tdb_set_lock_alarm(sig_atomic_t *palarm)
201 {
202         palarm_fired = palarm;
203 }
204
205 /* a byte range locking function - return 0 on success
206    this functions locks/unlocks 1 byte at the specified offset.
207
208    On error, errno is also set so that errors are passed back properly
209    through tdb_open(). */
210 static int tdb_brlock(TDB_CONTEXT *tdb, tdb_off offset, 
211                       int rw_type, int lck_type, int probe)
212 {
213         struct flock fl;
214         int ret;
215
216         if (tdb->flags & TDB_NOLOCK)
217                 return 0;
218         if ((rw_type == F_WRLCK) && (tdb->read_only)) {
219                 errno = EACCES;
220                 return -1;
221         }
222
223         fl.l_type = rw_type;
224         fl.l_whence = SEEK_SET;
225         fl.l_start = offset;
226         fl.l_len = 1;
227         fl.l_pid = 0;
228
229         do {
230                 ret = fcntl(tdb->fd,lck_type,&fl);
231                 if (ret == -1 && errno == EINTR && palarm_fired && *palarm_fired)
232                         break;
233         } while (ret == -1 && errno == EINTR);
234
235         if (ret == -1) {
236                 if (!probe && lck_type != F_SETLK) {
237                         /* Ensure error code is set for log fun to examine. */
238                         if (errno == EINTR && palarm_fired && *palarm_fired)
239                                 tdb->ecode = TDB_ERR_LOCK_TIMEOUT;
240                         else
241                                 tdb->ecode = TDB_ERR_LOCK;
242                         TDB_LOG((tdb, 5,"tdb_brlock failed (fd=%d) at offset %d rw_type=%d lck_type=%d\n", 
243                                  tdb->fd, offset, rw_type, lck_type));
244                 }
245                 /* Was it an alarm timeout ? */
246                 if (errno == EINTR && palarm_fired && *palarm_fired)
247                         return TDB_ERRCODE(TDB_ERR_LOCK_TIMEOUT, -1);
248                 /* Otherwise - generic lock error. */
249                 /* errno set by fcntl */
250                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
251         }
252         return 0;
253 }
254
255 /* lock a list in the database. list -1 is the alloc list */
256 static int tdb_lock(TDB_CONTEXT *tdb, int list, int ltype)
257 {
258         if (list < -1 || list >= (int)tdb->header.hash_size) {
259                 TDB_LOG((tdb, 0,"tdb_lock: invalid list %d for ltype=%d\n", 
260                            list, ltype));
261                 return -1;
262         }
263         if (tdb->flags & TDB_NOLOCK)
264                 return 0;
265
266         /* Since fcntl locks don't nest, we do a lock for the first one,
267            and simply bump the count for future ones */
268         if (tdb->locked[list+1].count == 0) {
269                 if (!tdb->read_only && tdb->header.rwlocks) {
270                         if (tdb_spinlock(tdb, list, ltype)) {
271                                 TDB_LOG((tdb, 0, "tdb_lock spinlock failed on list ltype=%d\n", 
272                                            list, ltype));
273                                 return -1;
274                         }
275                 } else if (tdb_brlock(tdb,FREELIST_TOP+4*list,ltype,F_SETLKW, 0)) {
276                         TDB_LOG((tdb, 0,"tdb_lock failed on list %d ltype=%d (%s)\n", 
277                                            list, ltype, strerror(errno)));
278                         return -1;
279                 }
280                 tdb->locked[list+1].ltype = ltype;
281         }
282         tdb->locked[list+1].count++;
283         return 0;
284 }
285
286 /* unlock the database: returns void because it's too late for errors. */
287         /* changed to return int it may be interesting to know there
288            has been an error  --simo */
289 static int tdb_unlock(TDB_CONTEXT *tdb, int list, int ltype)
290 {
291         int ret = -1;
292
293         if (tdb->flags & TDB_NOLOCK)
294                 return 0;
295
296         /* Sanity checks */
297         if (list < -1 || list >= (int)tdb->header.hash_size) {
298                 TDB_LOG((tdb, 0, "tdb_unlock: list %d invalid (%d)\n", list, tdb->header.hash_size));
299                 return ret;
300         }
301
302         if (tdb->locked[list+1].count==0) {
303                 TDB_LOG((tdb, 0, "tdb_unlock: count is 0\n"));
304                 return ret;
305         }
306
307         if (tdb->locked[list+1].count == 1) {
308                 /* Down to last nested lock: unlock underneath */
309                 if (!tdb->read_only && tdb->header.rwlocks) {
310                         ret = tdb_spinunlock(tdb, list, ltype);
311                 } else {
312                         ret = tdb_brlock(tdb, FREELIST_TOP+4*list, F_UNLCK, F_SETLKW, 0);
313                 }
314         } else {
315                 ret = 0;
316         }
317         tdb->locked[list+1].count--;
318
319         if (ret)
320                 TDB_LOG((tdb, 0,"tdb_unlock: An error occurred unlocking!\n")); 
321         return ret;
322 }
323
324 /* This is based on the hash algorithm from gdbm */
325 static u32 tdb_hash(TDB_DATA *key)
326 {
327         u32 value;      /* Used to compute the hash value.  */
328         u32   i;        /* Used to cycle through random values. */
329
330         /* Set the initial value from the key size. */
331         for (value = 0x238F13AF * key->dsize, i=0; i < key->dsize; i++)
332                 value = (value + (key->dptr[i] << (i*5 % 24)));
333
334         return (1103515243 * value + 12345);  
335 }
336
337 /* check for an out of bounds access - if it is out of bounds then
338    see if the database has been expanded by someone else and expand
339    if necessary 
340    note that "len" is the minimum length needed for the db
341 */
342 static int tdb_oob(TDB_CONTEXT *tdb, tdb_off len, int probe)
343 {
344         struct stat st;
345         if (len <= tdb->map_size)
346                 return 0;
347         if (tdb->flags & TDB_INTERNAL) {
348                 if (!probe) {
349                         /* Ensure ecode is set for log fn. */
350                         tdb->ecode = TDB_ERR_IO;
351                         TDB_LOG((tdb, 0,"tdb_oob len %d beyond internal malloc size %d\n",
352                                  (int)len, (int)tdb->map_size));
353                 }
354                 return TDB_ERRCODE(TDB_ERR_IO, -1);
355         }
356
357         if (fstat(tdb->fd, &st) == -1)
358                 return TDB_ERRCODE(TDB_ERR_IO, -1);
359
360         if (st.st_size < (size_t)len) {
361                 if (!probe) {
362                         /* Ensure ecode is set for log fn. */
363                         tdb->ecode = TDB_ERR_IO;
364                         TDB_LOG((tdb, 0,"tdb_oob len %d beyond eof at %d\n",
365                                  (int)len, (int)st.st_size));
366                 }
367                 return TDB_ERRCODE(TDB_ERR_IO, -1);
368         }
369
370         /* Unmap, update size, remap */
371         if (tdb_munmap(tdb) == -1)
372                 return TDB_ERRCODE(TDB_ERR_IO, -1);
373         tdb->map_size = st.st_size;
374         tdb_mmap(tdb);
375         return 0;
376 }
377
378 /* write a lump of data at a specified offset */
379 static int tdb_write(TDB_CONTEXT *tdb, tdb_off off, void *buf, tdb_len len)
380 {
381         if (tdb_oob(tdb, off + len, 0) != 0)
382                 return -1;
383
384         if (tdb->map_ptr)
385                 memcpy(off + (char *)tdb->map_ptr, buf, len);
386 #ifdef HAVE_PWRITE
387         else if (pwrite(tdb->fd, buf, len, off) != (ssize_t)len) {
388 #else
389         else if (lseek(tdb->fd, off, SEEK_SET) != off
390                  || write(tdb->fd, buf, len) != (ssize_t)len) {
391 #endif
392                 /* Ensure ecode is set for log fn. */
393                 tdb->ecode = TDB_ERR_IO;
394                 TDB_LOG((tdb, 0,"tdb_write failed at %d len=%d (%s)\n",
395                            off, len, strerror(errno)));
396                 return TDB_ERRCODE(TDB_ERR_IO, -1);
397         }
398         return 0;
399 }
400
401 /* read a lump of data at a specified offset, maybe convert */
402 static int tdb_read(TDB_CONTEXT *tdb,tdb_off off,void *buf,tdb_len len,int cv)
403 {
404         if (tdb_oob(tdb, off + len, 0) != 0)
405                 return -1;
406
407         if (tdb->map_ptr)
408                 memcpy(buf, off + (char *)tdb->map_ptr, len);
409 #ifdef HAVE_PREAD
410         else if (pread(tdb->fd, buf, len, off) != (ssize_t)len) {
411 #else
412         else if (lseek(tdb->fd, off, SEEK_SET) != off
413                  || read(tdb->fd, buf, len) != (ssize_t)len) {
414 #endif
415                 /* Ensure ecode is set for log fn. */
416                 tdb->ecode = TDB_ERR_IO;
417                 TDB_LOG((tdb, 0,"tdb_read failed at %d len=%d (%s)\n",
418                            off, len, strerror(errno)));
419                 return TDB_ERRCODE(TDB_ERR_IO, -1);
420         }
421         if (cv)
422                 convert(buf, len);
423         return 0;
424 }
425
426 /* read a lump of data, allocating the space for it */
427 static char *tdb_alloc_read(TDB_CONTEXT *tdb, tdb_off offset, tdb_len len)
428 {
429         char *buf;
430
431         if (!(buf = malloc(len))) {
432                 /* Ensure ecode is set for log fn. */
433                 tdb->ecode = TDB_ERR_OOM;
434                 TDB_LOG((tdb, 0,"tdb_alloc_read malloc failed len=%d (%s)\n",
435                            len, strerror(errno)));
436                 return TDB_ERRCODE(TDB_ERR_OOM, buf);
437         }
438         if (tdb_read(tdb, offset, buf, len, 0) == -1) {
439                 SAFE_FREE(buf);
440                 return NULL;
441         }
442         return buf;
443 }
444
445 /* read/write a tdb_off */
446 static int ofs_read(TDB_CONTEXT *tdb, tdb_off offset, tdb_off *d)
447 {
448         return tdb_read(tdb, offset, (char*)d, sizeof(*d), DOCONV());
449 }
450 static int ofs_write(TDB_CONTEXT *tdb, tdb_off offset, tdb_off *d)
451 {
452         tdb_off off = *d;
453         return tdb_write(tdb, offset, CONVERT(off), sizeof(*d));
454 }
455
456 /* read/write a record */
457 static int rec_read(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
458 {
459         if (tdb_read(tdb, offset, rec, sizeof(*rec),DOCONV()) == -1)
460                 return -1;
461         if (TDB_BAD_MAGIC(rec)) {
462                 /* Ensure ecode is set for log fn. */
463                 tdb->ecode = TDB_ERR_CORRUPT;
464                 TDB_LOG((tdb, 0,"rec_read bad magic 0x%x at offset=%d\n", rec->magic, offset));
465                 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
466         }
467         return tdb_oob(tdb, rec->next+sizeof(*rec), 0);
468 }
469 static int rec_write(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
470 {
471         struct list_struct r = *rec;
472         return tdb_write(tdb, offset, CONVERT(r), sizeof(r));
473 }
474
475 /* read a freelist record and check for simple errors */
476 static int rec_free_read(TDB_CONTEXT *tdb, tdb_off off, struct list_struct *rec)
477 {
478         if (tdb_read(tdb, off, rec, sizeof(*rec),DOCONV()) == -1)
479                 return -1;
480
481         if (rec->magic == TDB_MAGIC) {
482                 /* this happens when a app is showdown while deleting a record - we should
483                    not completely fail when this happens */
484                 TDB_LOG((tdb, 0,"rec_free_read non-free magic at offset=%d - fixing\n", 
485                          rec->magic, off));
486                 rec->magic = TDB_FREE_MAGIC;
487                 if (tdb_write(tdb, off, rec, sizeof(*rec)) == -1)
488                         return -1;
489         }
490
491         if (rec->magic != TDB_FREE_MAGIC) {
492                 /* Ensure ecode is set for log fn. */
493                 tdb->ecode = TDB_ERR_CORRUPT;
494                 TDB_LOG((tdb, 0,"rec_free_read bad magic 0x%x at offset=%d\n", 
495                            rec->magic, off));
496                 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
497         }
498         if (tdb_oob(tdb, rec->next+sizeof(*rec), 0) != 0)
499                 return -1;
500         return 0;
501 }
502
503 /* update a record tailer (must hold allocation lock) */
504 static int update_tailer(TDB_CONTEXT *tdb, tdb_off offset,
505                          const struct list_struct *rec)
506 {
507         tdb_off totalsize;
508
509         /* Offset of tailer from record header */
510         totalsize = sizeof(*rec) + rec->rec_len;
511         return ofs_write(tdb, offset + totalsize - sizeof(tdb_off),
512                          &totalsize);
513 }
514
515 static tdb_off tdb_dump_record(TDB_CONTEXT *tdb, tdb_off offset)
516 {
517         struct list_struct rec;
518         tdb_off tailer_ofs, tailer;
519
520         if (tdb_read(tdb, offset, (char *)&rec, sizeof(rec), DOCONV()) == -1) {
521                 printf("ERROR: failed to read record at %u\n", offset);
522                 return 0;
523         }
524
525         printf(" rec: offset=%u next=%d rec_len=%d key_len=%d data_len=%d full_hash=0x%x magic=0x%x\n",
526                offset, rec.next, rec.rec_len, rec.key_len, rec.data_len, rec.full_hash, rec.magic);
527
528         tailer_ofs = offset + sizeof(rec) + rec.rec_len - sizeof(tdb_off);
529         if (ofs_read(tdb, tailer_ofs, &tailer) == -1) {
530                 printf("ERROR: failed to read tailer at %u\n", tailer_ofs);
531                 return rec.next;
532         }
533
534         if (tailer != rec.rec_len + sizeof(rec)) {
535                 printf("ERROR: tailer does not match record! tailer=%u totalsize=%u\n",
536                                 (unsigned)tailer, (unsigned)(rec.rec_len + sizeof(rec)));
537         }
538         return rec.next;
539 }
540
541 static int tdb_dump_chain(TDB_CONTEXT *tdb, int i)
542 {
543         tdb_off rec_ptr, top;
544
545         top = TDB_HASH_TOP(i);
546
547         if (tdb_lock(tdb, i, F_WRLCK) != 0)
548                 return -1;
549
550         if (ofs_read(tdb, top, &rec_ptr) == -1)
551                 return tdb_unlock(tdb, i, F_WRLCK);
552
553         if (rec_ptr)
554                 printf("hash=%d\n", i);
555
556         while (rec_ptr) {
557                 rec_ptr = tdb_dump_record(tdb, rec_ptr);
558         }
559
560         return tdb_unlock(tdb, i, F_WRLCK);
561 }
562
563 void tdb_dump_all(TDB_CONTEXT *tdb)
564 {
565         int i;
566         for (i=0;i<tdb->header.hash_size;i++) {
567                 tdb_dump_chain(tdb, i);
568         }
569         printf("freelist:\n");
570         tdb_dump_chain(tdb, -1);
571 }
572
573 int tdb_printfreelist(TDB_CONTEXT *tdb)
574 {
575         int ret;
576         long total_free = 0;
577         tdb_off offset, rec_ptr;
578         struct list_struct rec;
579
580         if ((ret = tdb_lock(tdb, -1, F_WRLCK)) != 0)
581                 return ret;
582
583         offset = FREELIST_TOP;
584
585         /* read in the freelist top */
586         if (ofs_read(tdb, offset, &rec_ptr) == -1) {
587                 tdb_unlock(tdb, -1, F_WRLCK);
588                 return 0;
589         }
590
591         printf("freelist top=[0x%08x]\n", rec_ptr );
592         while (rec_ptr) {
593                 if (tdb_read(tdb, rec_ptr, (char *)&rec, sizeof(rec), DOCONV()) == -1) {
594                         tdb_unlock(tdb, -1, F_WRLCK);
595                         return -1;
596                 }
597
598                 if (rec.magic != TDB_FREE_MAGIC) {
599                         printf("bad magic 0x%08x in free list\n", rec.magic);
600                         tdb_unlock(tdb, -1, F_WRLCK);
601                         return -1;
602                 }
603
604                 printf("entry offset=[0x%08x], rec.rec_len = [0x%08x (%d)]\n", rec.next, rec.rec_len, rec.rec_len );
605                 total_free += rec.rec_len;
606
607                 /* move to the next record */
608                 rec_ptr = rec.next;
609         }
610         printf("total rec_len = [0x%08x (%d)]\n", (int)total_free, 
611                (int)total_free);
612
613         return tdb_unlock(tdb, -1, F_WRLCK);
614 }
615
616 /* Remove an element from the freelist.  Must have alloc lock. */
617 static int remove_from_freelist(TDB_CONTEXT *tdb, tdb_off off, tdb_off next)
618 {
619         tdb_off last_ptr, i;
620
621         /* read in the freelist top */
622         last_ptr = FREELIST_TOP;
623         while (ofs_read(tdb, last_ptr, &i) != -1 && i != 0) {
624                 if (i == off) {
625                         /* We've found it! */
626                         return ofs_write(tdb, last_ptr, &next);
627                 }
628                 /* Follow chain (next offset is at start of record) */
629                 last_ptr = i;
630         }
631         TDB_LOG((tdb, 0,"remove_from_freelist: not on list at off=%d\n", off));
632         return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
633 }
634
635 /* Add an element into the freelist. Merge adjacent records if
636    neccessary. */
637 static int tdb_free(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
638 {
639         tdb_off right, left;
640
641         /* Allocation and tailer lock */
642         if (tdb_lock(tdb, -1, F_WRLCK) != 0)
643                 return -1;
644
645         /* set an initial tailer, so if we fail we don't leave a bogus record */
646         if (update_tailer(tdb, offset, rec) != 0) {
647                 TDB_LOG((tdb, 0, "tdb_free: upfate_tailer failed!\n"));
648                 goto fail;
649         }
650
651         /* Look right first (I'm an Australian, dammit) */
652         right = offset + sizeof(*rec) + rec->rec_len;
653         if (right + sizeof(*rec) <= tdb->map_size) {
654                 struct list_struct r;
655
656                 if (tdb_read(tdb, right, &r, sizeof(r), DOCONV()) == -1) {
657                         TDB_LOG((tdb, 0, "tdb_free: right read failed at %u\n", right));
658                         goto left;
659                 }
660
661                 /* If it's free, expand to include it. */
662                 if (r.magic == TDB_FREE_MAGIC) {
663                         if (remove_from_freelist(tdb, right, r.next) == -1) {
664                                 TDB_LOG((tdb, 0, "tdb_free: right free failed at %u\n", right));
665                                 goto left;
666                         }
667                         rec->rec_len += sizeof(r) + r.rec_len;
668                 }
669         }
670
671 left:
672         /* Look left */
673         left = offset - sizeof(tdb_off);
674         if (left > TDB_HASH_TOP(tdb->header.hash_size-1)) {
675                 struct list_struct l;
676                 tdb_off leftsize;
677
678                 /* Read in tailer and jump back to header */
679                 if (ofs_read(tdb, left, &leftsize) == -1) {
680                         TDB_LOG((tdb, 0, "tdb_free: left offset read failed at %u\n", left));
681                         goto update;
682                 }
683                 left = offset - leftsize;
684
685                 /* Now read in record */
686                 if (tdb_read(tdb, left, &l, sizeof(l), DOCONV()) == -1) {
687                         TDB_LOG((tdb, 0, "tdb_free: left read failed at %u (%u)\n", left, leftsize));
688                         goto update;
689                 }
690
691                 /* If it's free, expand to include it. */
692                 if (l.magic == TDB_FREE_MAGIC) {
693                         if (remove_from_freelist(tdb, left, l.next) == -1) {
694                                 TDB_LOG((tdb, 0, "tdb_free: left free failed at %u\n", left));
695                                 goto update;
696                         } else {
697                                 offset = left;
698                                 rec->rec_len += leftsize;
699                         }
700                 }
701         }
702
703 update:
704         if (update_tailer(tdb, offset, rec) == -1) {
705                 TDB_LOG((tdb, 0, "tdb_free: update_tailer failed at %u\n", offset));
706                 goto fail;
707         }
708
709         /* Now, prepend to free list */
710         rec->magic = TDB_FREE_MAGIC;
711
712         if (ofs_read(tdb, FREELIST_TOP, &rec->next) == -1 ||
713             rec_write(tdb, offset, rec) == -1 ||
714             ofs_write(tdb, FREELIST_TOP, &offset) == -1) {
715                 TDB_LOG((tdb, 0, "tdb_free record write failed at offset=%d\n", offset));
716                 goto fail;
717         }
718
719         /* And we're done. */
720         tdb_unlock(tdb, -1, F_WRLCK);
721         return 0;
722
723  fail:
724         tdb_unlock(tdb, -1, F_WRLCK);
725         return -1;
726 }
727
728
729 /* expand a file.  we prefer to use ftruncate, as that is what posix
730   says to use for mmap expansion */
731 static int expand_file(TDB_CONTEXT *tdb, tdb_off size, tdb_off addition)
732 {
733         char buf[1024];
734 #if HAVE_FTRUNCATE_EXTEND
735         if (ftruncate(tdb->fd, size+addition) != 0) {
736                 TDB_LOG((tdb, 0, "expand_file ftruncate to %d failed (%s)\n", 
737                            size+addition, strerror(errno)));
738                 return -1;
739         }
740 #else
741         char b = 0;
742
743 #ifdef HAVE_PWRITE
744         if (pwrite(tdb->fd,  &b, 1, (size+addition) - 1) != 1) {
745 #else
746         if (lseek(tdb->fd, (size+addition) - 1, SEEK_SET) != (size+addition) - 1 || 
747             write(tdb->fd, &b, 1) != 1) {
748 #endif
749                 TDB_LOG((tdb, 0, "expand_file to %d failed (%s)\n", 
750                            size+addition, strerror(errno)));
751                 return -1;
752         }
753 #endif
754
755         /* now fill the file with something. This ensures that the file isn't sparse, which would be
756            very bad if we ran out of disk. This must be done with write, not via mmap */
757         memset(buf, 0x42, sizeof(buf));
758         while (addition) {
759                 int n = addition>sizeof(buf)?sizeof(buf):addition;
760 #ifdef HAVE_PWRITE
761                 int ret = pwrite(tdb->fd, buf, n, size);
762 #else
763                 int ret;
764                 if (lseek(tdb->fd, size, SEEK_SET) != size)
765                         return -1;
766                 ret = write(tdb->fd, buf, n);
767 #endif
768                 if (ret != n) {
769                         TDB_LOG((tdb, 0, "expand_file write of %d failed (%s)\n", 
770                                    n, strerror(errno)));
771                         return -1;
772                 }
773                 addition -= n;
774                 size += n;
775         }
776         return 0;
777 }
778
779
780 /* expand the database at least size bytes by expanding the underlying
781    file and doing the mmap again if necessary */
782 static int tdb_expand(TDB_CONTEXT *tdb, tdb_off size)
783 {
784         struct list_struct rec;
785         tdb_off offset;
786
787         if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
788                 TDB_LOG((tdb, 0, "lock failed in tdb_expand\n"));
789                 return -1;
790         }
791
792         /* must know about any previous expansions by another process */
793         tdb_oob(tdb, tdb->map_size + 1, 1);
794
795         /* always make room for at least 10 more records, and round
796            the database up to a multiple of TDB_PAGE_SIZE */
797         size = TDB_ALIGN(tdb->map_size + size*10, TDB_PAGE_SIZE) - tdb->map_size;
798
799         if (!(tdb->flags & TDB_INTERNAL))
800                 tdb_munmap(tdb);
801
802         /*
803          * We must ensure the file is unmapped before doing this
804          * to ensure consistency with systems like OpenBSD where
805          * writes and mmaps are not consistent.
806          */
807
808         /* expand the file itself */
809         if (!(tdb->flags & TDB_INTERNAL)) {
810                 if (expand_file(tdb, tdb->map_size, size) != 0)
811                         goto fail;
812         }
813
814         tdb->map_size += size;
815
816         if (tdb->flags & TDB_INTERNAL)
817                 tdb->map_ptr = realloc(tdb->map_ptr, tdb->map_size);
818         else {
819                 /*
820                  * We must ensure the file is remapped before adding the space
821                  * to ensure consistency with systems like OpenBSD where
822                  * writes and mmaps are not consistent.
823                  */
824
825                 /* We're ok if the mmap fails as we'll fallback to read/write */
826                 tdb_mmap(tdb);
827         }
828
829         /* form a new freelist record */
830         memset(&rec,'\0',sizeof(rec));
831         rec.rec_len = size - sizeof(rec);
832
833         /* link it into the free list */
834         offset = tdb->map_size - size;
835         if (tdb_free(tdb, offset, &rec) == -1)
836                 goto fail;
837
838         tdb_unlock(tdb, -1, F_WRLCK);
839         return 0;
840  fail:
841         tdb_unlock(tdb, -1, F_WRLCK);
842         return -1;
843 }
844
845 /* allocate some space from the free list. The offset returned points
846    to a unconnected list_struct within the database with room for at
847    least length bytes of total data
848
849    0 is returned if the space could not be allocated
850  */
851 static tdb_off tdb_allocate(TDB_CONTEXT *tdb, tdb_len length,
852                             struct list_struct *rec)
853 {
854         tdb_off rec_ptr, last_ptr, newrec_ptr;
855         struct list_struct newrec;
856
857         if (tdb_lock(tdb, -1, F_WRLCK) == -1)
858                 return 0;
859
860         /* Extra bytes required for tailer */
861         length += sizeof(tdb_off);
862
863  again:
864         last_ptr = FREELIST_TOP;
865
866         /* read in the freelist top */
867         if (ofs_read(tdb, FREELIST_TOP, &rec_ptr) == -1)
868                 goto fail;
869
870         /* keep looking until we find a freelist record big enough */
871         while (rec_ptr) {
872                 if (rec_free_read(tdb, rec_ptr, rec) == -1)
873                         goto fail;
874
875                 if (rec->rec_len >= length) {
876                         /* found it - now possibly split it up  */
877                         if (rec->rec_len > length + MIN_REC_SIZE) {
878                                 /* Length of left piece */
879                                 length = TDB_ALIGN(length, TDB_ALIGNMENT);
880
881                                 /* Right piece to go on free list */
882                                 newrec.rec_len = rec->rec_len
883                                         - (sizeof(*rec) + length);
884                                 newrec_ptr = rec_ptr + sizeof(*rec) + length;
885
886                                 /* And left record is shortened */
887                                 rec->rec_len = length;
888                         } else
889                                 newrec_ptr = 0;
890
891                         /* Remove allocated record from the free list */
892                         if (ofs_write(tdb, last_ptr, &rec->next) == -1)
893                                 goto fail;
894
895                         /* Update header: do this before we drop alloc
896                            lock, otherwise tdb_free() might try to
897                            merge with us, thinking we're free.
898                            (Thanks Jeremy Allison). */
899                         rec->magic = TDB_MAGIC;
900                         if (rec_write(tdb, rec_ptr, rec) == -1)
901                                 goto fail;
902
903                         /* Did we create new block? */
904                         if (newrec_ptr) {
905                                 /* Update allocated record tailer (we
906                                    shortened it). */
907                                 if (update_tailer(tdb, rec_ptr, rec) == -1)
908                                         goto fail;
909
910                                 /* Free new record */
911                                 if (tdb_free(tdb, newrec_ptr, &newrec) == -1)
912                                         goto fail;
913                         }
914
915                         /* all done - return the new record offset */
916                         tdb_unlock(tdb, -1, F_WRLCK);
917                         return rec_ptr;
918                 }
919                 /* move to the next record */
920                 last_ptr = rec_ptr;
921                 rec_ptr = rec->next;
922         }
923         /* we didn't find enough space. See if we can expand the
924            database and if we can then try again */
925         if (tdb_expand(tdb, length + sizeof(*rec)) == 0)
926                 goto again;
927  fail:
928         tdb_unlock(tdb, -1, F_WRLCK);
929         return 0;
930 }
931
932 /* initialise a new database with a specified hash size */
933 static int tdb_new_database(TDB_CONTEXT *tdb, int hash_size)
934 {
935         struct tdb_header *newdb;
936         int size, ret = -1;
937
938         /* We make it up in memory, then write it out if not internal */
939         size = sizeof(struct tdb_header) + (hash_size+1)*sizeof(tdb_off);
940         if (!(newdb = calloc(size, 1)))
941                 return TDB_ERRCODE(TDB_ERR_OOM, -1);
942
943         /* Fill in the header */
944         newdb->version = TDB_VERSION;
945         newdb->hash_size = hash_size;
946 #ifdef USE_SPINLOCKS
947         newdb->rwlocks = size;
948 #endif
949         if (tdb->flags & TDB_INTERNAL) {
950                 tdb->map_size = size;
951                 tdb->map_ptr = (char *)newdb;
952                 memcpy(&tdb->header, newdb, sizeof(tdb->header));
953                 /* Convert the `ondisk' version if asked. */
954                 CONVERT(*newdb);
955                 return 0;
956         }
957         if (lseek(tdb->fd, 0, SEEK_SET) == -1)
958                 goto fail;
959
960         if (ftruncate(tdb->fd, 0) == -1)
961                 goto fail;
962
963         /* This creates an endian-converted header, as if read from disk */
964         CONVERT(*newdb);
965         memcpy(&tdb->header, newdb, sizeof(tdb->header));
966         /* Don't endian-convert the magic food! */
967         memcpy(newdb->magic_food, TDB_MAGIC_FOOD, strlen(TDB_MAGIC_FOOD)+1);
968         if (write(tdb->fd, newdb, size) != size)
969                 ret = -1;
970         else
971                 ret = tdb_create_rwlocks(tdb->fd, hash_size);
972
973   fail:
974         SAFE_FREE(newdb);
975         return ret;
976 }
977
978 /* Returns 0 on fail.  On success, return offset of record, and fills
979    in rec */
980 static tdb_off tdb_find(TDB_CONTEXT *tdb, TDB_DATA key, u32 hash,
981                         struct list_struct *r)
982 {
983         tdb_off rec_ptr;
984         
985         /* read in the hash top */
986         if (ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
987                 return 0;
988
989         /* keep looking until we find the right record */
990         while (rec_ptr) {
991                 if (rec_read(tdb, rec_ptr, r) == -1)
992                         return 0;
993
994                 if (!TDB_DEAD(r) && hash==r->full_hash && key.dsize==r->key_len) {
995                         char *k;
996                         /* a very likely hit - read the key */
997                         k = tdb_alloc_read(tdb, rec_ptr + sizeof(*r), 
998                                            r->key_len);
999                         if (!k)
1000                                 return 0;
1001
1002                         if (memcmp(key.dptr, k, key.dsize) == 0) {
1003                                 SAFE_FREE(k);
1004                                 return rec_ptr;
1005                         }
1006                         SAFE_FREE(k);
1007                 }
1008                 rec_ptr = r->next;
1009         }
1010         return TDB_ERRCODE(TDB_ERR_NOEXIST, 0);
1011 }
1012
1013 /* If they do lockkeys, check that this hash is one they locked */
1014 static int tdb_keylocked(TDB_CONTEXT *tdb, u32 hash)
1015 {
1016         u32 i;
1017         if (!tdb->lockedkeys)
1018                 return 1;
1019         for (i = 0; i < tdb->lockedkeys[0]; i++)
1020                 if (tdb->lockedkeys[i+1] == hash)
1021                         return 1;
1022         return TDB_ERRCODE(TDB_ERR_NOLOCK, 0);
1023 }
1024
1025 /* As tdb_find, but if you succeed, keep the lock */
1026 static tdb_off tdb_find_lock(TDB_CONTEXT *tdb, TDB_DATA key, int locktype,
1027                              struct list_struct *rec)
1028 {
1029         u32 hash, rec_ptr;
1030
1031         hash = tdb_hash(&key);
1032         if (!tdb_keylocked(tdb, hash))
1033                 return 0;
1034         if (tdb_lock(tdb, BUCKET(hash), locktype) == -1)
1035                 return 0;
1036         if (!(rec_ptr = tdb_find(tdb, key, hash, rec)))
1037                 tdb_unlock(tdb, BUCKET(hash), locktype);
1038         return rec_ptr;
1039 }
1040
1041 enum TDB_ERROR tdb_error(TDB_CONTEXT *tdb)
1042 {
1043         return tdb->ecode;
1044 }
1045
1046 static struct tdb_errname {
1047         enum TDB_ERROR ecode; const char *estring;
1048 } emap[] = { {TDB_SUCCESS, "Success"},
1049              {TDB_ERR_CORRUPT, "Corrupt database"},
1050              {TDB_ERR_IO, "IO Error"},
1051              {TDB_ERR_LOCK, "Locking error"},
1052              {TDB_ERR_OOM, "Out of memory"},
1053              {TDB_ERR_EXISTS, "Record exists"},
1054              {TDB_ERR_NOLOCK, "Lock exists on other keys"},
1055              {TDB_ERR_NOEXIST, "Record does not exist"} };
1056
1057 /* Error string for the last tdb error */
1058 const char *tdb_errorstr(TDB_CONTEXT *tdb)
1059 {
1060         u32 i;
1061         for (i = 0; i < sizeof(emap) / sizeof(struct tdb_errname); i++)
1062                 if (tdb->ecode == emap[i].ecode)
1063                         return emap[i].estring;
1064         return "Invalid error code";
1065 }
1066
1067 /* update an entry in place - this only works if the new data size
1068    is <= the old data size and the key exists.
1069    on failure return -1.
1070 */
1071
1072 static int tdb_update(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA dbuf)
1073 {
1074         struct list_struct rec;
1075         tdb_off rec_ptr;
1076
1077         /* find entry */
1078         if (!(rec_ptr = tdb_find(tdb, key, tdb_hash(&key), &rec)))
1079                 return -1;
1080
1081         /* must be long enough key, data and tailer */
1082         if (rec.rec_len < key.dsize + dbuf.dsize + sizeof(tdb_off)) {
1083                 tdb->ecode = TDB_SUCCESS; /* Not really an error */
1084                 return -1;
1085         }
1086
1087         if (tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len,
1088                       dbuf.dptr, dbuf.dsize) == -1)
1089                 return -1;
1090
1091         if (dbuf.dsize != rec.data_len) {
1092                 /* update size */
1093                 rec.data_len = dbuf.dsize;
1094                 return rec_write(tdb, rec_ptr, &rec);
1095         }
1096  
1097         return 0;
1098 }
1099
1100 /* find an entry in the database given a key */
1101 /* If an entry doesn't exist tdb_err will be set to
1102  * TDB_ERR_NOEXIST. If a key has no data attached
1103  * tdb_err will not be set. Both will return a
1104  * zero pptr and zero dsize.
1105  */
1106
1107 TDB_DATA tdb_fetch(TDB_CONTEXT *tdb, TDB_DATA key)
1108 {
1109         tdb_off rec_ptr;
1110         struct list_struct rec;
1111         TDB_DATA ret;
1112
1113         /* find which hash bucket it is in */
1114         if (!(rec_ptr = tdb_find_lock(tdb,key,F_RDLCK,&rec)))
1115                 return tdb_null;
1116
1117         if (rec.data_len)
1118                 ret.dptr = tdb_alloc_read(tdb, rec_ptr + sizeof(rec) + rec.key_len,
1119                                           rec.data_len);
1120         else
1121                 ret.dptr = NULL;
1122         ret.dsize = rec.data_len;
1123         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
1124         return ret;
1125 }
1126
1127 /* check if an entry in the database exists 
1128
1129    note that 1 is returned if the key is found and 0 is returned if not found
1130    this doesn't match the conventions in the rest of this module, but is
1131    compatible with gdbm
1132 */
1133 int tdb_exists(TDB_CONTEXT *tdb, TDB_DATA key)
1134 {
1135         struct list_struct rec;
1136         
1137         if (tdb_find_lock(tdb, key, F_RDLCK, &rec) == 0)
1138                 return 0;
1139         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
1140         return 1;
1141 }
1142
1143 /* record lock stops delete underneath */
1144 static int lock_record(TDB_CONTEXT *tdb, tdb_off off)
1145 {
1146         return off ? tdb_brlock(tdb, off, F_RDLCK, F_SETLKW, 0) : 0;
1147 }
1148 /*
1149   Write locks override our own fcntl readlocks, so check it here.
1150   Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
1151   an error to fail to get the lock here.
1152 */
1153  
1154 static int write_lock_record(TDB_CONTEXT *tdb, tdb_off off)
1155 {
1156         struct tdb_traverse_lock *i;
1157         for (i = &tdb->travlocks; i; i = i->next)
1158                 if (i->off == off)
1159                         return -1;
1160         return tdb_brlock(tdb, off, F_WRLCK, F_SETLK, 1);
1161 }
1162
1163 /*
1164   Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
1165   an error to fail to get the lock here.
1166 */
1167
1168 static int write_unlock_record(TDB_CONTEXT *tdb, tdb_off off)
1169 {
1170         return tdb_brlock(tdb, off, F_UNLCK, F_SETLK, 0);
1171 }
1172 /* fcntl locks don't stack: avoid unlocking someone else's */
1173 static int unlock_record(TDB_CONTEXT *tdb, tdb_off off)
1174 {
1175         struct tdb_traverse_lock *i;
1176         u32 count = 0;
1177
1178         if (off == 0)
1179                 return 0;
1180         for (i = &tdb->travlocks; i; i = i->next)
1181                 if (i->off == off)
1182                         count++;
1183         return (count == 1 ? tdb_brlock(tdb, off, F_UNLCK, F_SETLKW, 0) : 0);
1184 }
1185
1186 /* actually delete an entry in the database given the offset */
1187 static int do_delete(TDB_CONTEXT *tdb, tdb_off rec_ptr, struct list_struct*rec)
1188 {
1189         tdb_off last_ptr, i;
1190         struct list_struct lastrec;
1191
1192         if (tdb->read_only) return -1;
1193
1194         if (write_lock_record(tdb, rec_ptr) == -1) {
1195                 /* Someone traversing here: mark it as dead */
1196                 rec->magic = TDB_DEAD_MAGIC;
1197                 return rec_write(tdb, rec_ptr, rec);
1198         }
1199         if (write_unlock_record(tdb, rec_ptr) != 0)
1200                 return -1;
1201
1202         /* find previous record in hash chain */
1203         if (ofs_read(tdb, TDB_HASH_TOP(rec->full_hash), &i) == -1)
1204                 return -1;
1205         for (last_ptr = 0; i != rec_ptr; last_ptr = i, i = lastrec.next)
1206                 if (rec_read(tdb, i, &lastrec) == -1)
1207                         return -1;
1208
1209         /* unlink it: next ptr is at start of record. */
1210         if (last_ptr == 0)
1211                 last_ptr = TDB_HASH_TOP(rec->full_hash);
1212         if (ofs_write(tdb, last_ptr, &rec->next) == -1)
1213                 return -1;
1214
1215         /* recover the space */
1216         if (tdb_free(tdb, rec_ptr, rec) == -1)
1217                 return -1;
1218         return 0;
1219 }
1220
1221 /* Uses traverse lock: 0 = finish, -1 = error, other = record offset */
1222 static int tdb_next_lock(TDB_CONTEXT *tdb, struct tdb_traverse_lock *tlock,
1223                          struct list_struct *rec)
1224 {
1225         int want_next = (tlock->off != 0);
1226
1227         /* No traversal allows if you've called tdb_lockkeys() */
1228         if (tdb->lockedkeys)
1229                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1230
1231         /* Lock each chain from the start one. */
1232         for (; tlock->hash < tdb->header.hash_size; tlock->hash++) {
1233                 if (tdb_lock(tdb, tlock->hash, F_WRLCK) == -1)
1234                         return -1;
1235
1236                 /* No previous record?  Start at top of chain. */
1237                 if (!tlock->off) {
1238                         if (ofs_read(tdb, TDB_HASH_TOP(tlock->hash),
1239                                      &tlock->off) == -1)
1240                                 goto fail;
1241                 } else {
1242                         /* Otherwise unlock the previous record. */
1243                         if (unlock_record(tdb, tlock->off) != 0)
1244                                 goto fail;
1245                 }
1246
1247                 if (want_next) {
1248                         /* We have offset of old record: grab next */
1249                         if (rec_read(tdb, tlock->off, rec) == -1)
1250                                 goto fail;
1251                         tlock->off = rec->next;
1252                 }
1253
1254                 /* Iterate through chain */
1255                 while( tlock->off) {
1256                         tdb_off current;
1257                         if (rec_read(tdb, tlock->off, rec) == -1)
1258                                 goto fail;
1259                         if (!TDB_DEAD(rec)) {
1260                                 /* Woohoo: we found one! */
1261                                 if (lock_record(tdb, tlock->off) != 0)
1262                                         goto fail;
1263                                 return tlock->off;
1264                         }
1265                         /* Try to clean dead ones from old traverses */
1266                         current = tlock->off;
1267                         tlock->off = rec->next;
1268                         if (!tdb->read_only && 
1269                             do_delete(tdb, current, rec) != 0)
1270                                 goto fail;
1271                 }
1272                 tdb_unlock(tdb, tlock->hash, F_WRLCK);
1273                 want_next = 0;
1274         }
1275         /* We finished iteration without finding anything */
1276         return TDB_ERRCODE(TDB_SUCCESS, 0);
1277
1278  fail:
1279         tlock->off = 0;
1280         if (tdb_unlock(tdb, tlock->hash, F_WRLCK) != 0)
1281                 TDB_LOG((tdb, 0, "tdb_next_lock: On error unlock failed!\n"));
1282         return -1;
1283 }
1284
1285 /* traverse the entire database - calling fn(tdb, key, data) on each element.
1286    return -1 on error or the record count traversed
1287    if fn is NULL then it is not called
1288    a non-zero return value from fn() indicates that the traversal should stop
1289   */
1290 int tdb_traverse(TDB_CONTEXT *tdb, tdb_traverse_func fn, void *state)
1291 {
1292         TDB_DATA key, dbuf;
1293         struct list_struct rec;
1294         struct tdb_traverse_lock tl = { NULL, 0, 0 };
1295         int ret, count = 0;
1296
1297         /* This was in the initializaton, above, but the IRIX compiler
1298          * did not like it.  crh
1299          */
1300         tl.next = tdb->travlocks.next;
1301
1302         /* fcntl locks don't stack: beware traverse inside traverse */
1303         tdb->travlocks.next = &tl;
1304
1305         /* tdb_next_lock places locks on the record returned, and its chain */
1306         while ((ret = tdb_next_lock(tdb, &tl, &rec)) > 0) {
1307                 count++;
1308                 /* now read the full record */
1309                 key.dptr = tdb_alloc_read(tdb, tl.off + sizeof(rec), 
1310                                           rec.key_len + rec.data_len);
1311                 if (!key.dptr) {
1312                         ret = -1;
1313                         if (tdb_unlock(tdb, tl.hash, F_WRLCK) != 0)
1314                                 goto out;
1315                         if (unlock_record(tdb, tl.off) != 0)
1316                                 TDB_LOG((tdb, 0, "tdb_traverse: key.dptr == NULL and unlock_record failed!\n"));
1317                         goto out;
1318                 }
1319                 key.dsize = rec.key_len;
1320                 dbuf.dptr = key.dptr + rec.key_len;
1321                 dbuf.dsize = rec.data_len;
1322
1323                 /* Drop chain lock, call out */
1324                 if (tdb_unlock(tdb, tl.hash, F_WRLCK) != 0) {
1325                         ret = -1;
1326                         goto out;
1327                 }
1328                 if (fn && fn(tdb, key, dbuf, state)) {
1329                         /* They want us to terminate traversal */
1330                         ret = count;
1331                         if (unlock_record(tdb, tl.off) != 0) {
1332                                 TDB_LOG((tdb, 0, "tdb_traverse: unlock_record failed!\n"));;
1333                                 ret = -1;
1334                         }
1335                         tdb->travlocks.next = tl.next;
1336                         SAFE_FREE(key.dptr);
1337                         return count;
1338                 }
1339                 SAFE_FREE(key.dptr);
1340         }
1341 out:
1342         tdb->travlocks.next = tl.next;
1343         if (ret < 0)
1344                 return -1;
1345         else
1346                 return count;
1347 }
1348
1349 /* find the first entry in the database and return its key */
1350 TDB_DATA tdb_firstkey(TDB_CONTEXT *tdb)
1351 {
1352         TDB_DATA key;
1353         struct list_struct rec;
1354
1355         /* release any old lock */
1356         if (unlock_record(tdb, tdb->travlocks.off) != 0)
1357                 return tdb_null;
1358         tdb->travlocks.off = tdb->travlocks.hash = 0;
1359
1360         if (tdb_next_lock(tdb, &tdb->travlocks, &rec) <= 0)
1361                 return tdb_null;
1362         /* now read the key */
1363         key.dsize = rec.key_len;
1364         key.dptr =tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),key.dsize);
1365         if (tdb_unlock(tdb, BUCKET(tdb->travlocks.hash), F_WRLCK) != 0)
1366                 TDB_LOG((tdb, 0, "tdb_firstkey: error occurred while tdb_unlocking!\n"));
1367         return key;
1368 }
1369
1370 /* find the next entry in the database, returning its key */
1371 TDB_DATA tdb_nextkey(TDB_CONTEXT *tdb, TDB_DATA oldkey)
1372 {
1373         u32 oldhash;
1374         TDB_DATA key = tdb_null;
1375         struct list_struct rec;
1376         char *k = NULL;
1377
1378         /* Is locked key the old key?  If so, traverse will be reliable. */
1379         if (tdb->travlocks.off) {
1380                 if (tdb_lock(tdb,tdb->travlocks.hash,F_WRLCK))
1381                         return tdb_null;
1382                 if (rec_read(tdb, tdb->travlocks.off, &rec) == -1
1383                     || !(k = tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),
1384                                             rec.key_len))
1385                     || memcmp(k, oldkey.dptr, oldkey.dsize) != 0) {
1386                         /* No, it wasn't: unlock it and start from scratch */
1387                         if (unlock_record(tdb, tdb->travlocks.off) != 0)
1388                                 return tdb_null;
1389                         if (tdb_unlock(tdb, tdb->travlocks.hash, F_WRLCK) != 0)
1390                                 return tdb_null;
1391                         tdb->travlocks.off = 0;
1392                 }
1393
1394                 SAFE_FREE(k);
1395         }
1396
1397         if (!tdb->travlocks.off) {
1398                 /* No previous element: do normal find, and lock record */
1399                 tdb->travlocks.off = tdb_find_lock(tdb, oldkey, F_WRLCK, &rec);
1400                 if (!tdb->travlocks.off)
1401                         return tdb_null;
1402                 tdb->travlocks.hash = BUCKET(rec.full_hash);
1403                 if (lock_record(tdb, tdb->travlocks.off) != 0) {
1404                         TDB_LOG((tdb, 0, "tdb_nextkey: lock_record failed (%s)!\n", strerror(errno)));
1405                         return tdb_null;
1406                 }
1407         }
1408         oldhash = tdb->travlocks.hash;
1409
1410         /* Grab next record: locks chain and returned record,
1411            unlocks old record */
1412         if (tdb_next_lock(tdb, &tdb->travlocks, &rec) > 0) {
1413                 key.dsize = rec.key_len;
1414                 key.dptr = tdb_alloc_read(tdb, tdb->travlocks.off+sizeof(rec),
1415                                           key.dsize);
1416                 /* Unlock the chain of this new record */
1417                 if (tdb_unlock(tdb, tdb->travlocks.hash, F_WRLCK) != 0)
1418                         TDB_LOG((tdb, 0, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
1419         }
1420         /* Unlock the chain of old record */
1421         if (tdb_unlock(tdb, BUCKET(oldhash), F_WRLCK) != 0)
1422                 TDB_LOG((tdb, 0, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
1423         return key;
1424 }
1425
1426 /* delete an entry in the database given a key */
1427 int tdb_delete(TDB_CONTEXT *tdb, TDB_DATA key)
1428 {
1429         tdb_off rec_ptr;
1430         struct list_struct rec;
1431         int ret;
1432
1433         if (!(rec_ptr = tdb_find_lock(tdb, key, F_WRLCK, &rec)))
1434                 return -1;
1435         ret = do_delete(tdb, rec_ptr, &rec);
1436         if (tdb_unlock(tdb, BUCKET(rec.full_hash), F_WRLCK) != 0)
1437                 TDB_LOG((tdb, 0, "tdb_delete: WARNING tdb_unlock failed!\n"));
1438         return ret;
1439 }
1440
1441 /* store an element in the database, replacing any existing element
1442    with the same key 
1443
1444    return 0 on success, -1 on failure
1445 */
1446 int tdb_store(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
1447 {
1448         struct list_struct rec;
1449         u32 hash;
1450         tdb_off rec_ptr;
1451         char *p = NULL;
1452         int ret = 0;
1453
1454         /* find which hash bucket it is in */
1455         hash = tdb_hash(&key);
1456         if (!tdb_keylocked(tdb, hash))
1457                 return -1;
1458         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
1459                 return -1;
1460
1461         /* check for it existing, on insert. */
1462         if (flag == TDB_INSERT) {
1463                 if (tdb_exists(tdb, key)) {
1464                         tdb->ecode = TDB_ERR_EXISTS;
1465                         goto fail;
1466                 }
1467         } else {
1468                 /* first try in-place update, on modify or replace. */
1469                 if (tdb_update(tdb, key, dbuf) == 0)
1470                         goto out;
1471                 if (flag == TDB_MODIFY && tdb->ecode == TDB_ERR_NOEXIST)
1472                         goto fail;
1473         }
1474         /* reset the error code potentially set by the tdb_update() */
1475         tdb->ecode = TDB_SUCCESS;
1476
1477         /* delete any existing record - if it doesn't exist we don't
1478            care.  Doing this first reduces fragmentation, and avoids
1479            coalescing with `allocated' block before it's updated. */
1480         if (flag != TDB_INSERT)
1481                 tdb_delete(tdb, key);
1482
1483         /* Copy key+value *before* allocating free space in case malloc
1484            fails and we are left with a dead spot in the tdb. */
1485
1486         if (!(p = (char *)malloc(key.dsize + dbuf.dsize))) {
1487                 tdb->ecode = TDB_ERR_OOM;
1488                 goto fail;
1489         }
1490
1491         memcpy(p, key.dptr, key.dsize);
1492         if (dbuf.dsize)
1493                 memcpy(p+key.dsize, dbuf.dptr, dbuf.dsize);
1494
1495         /* now we're into insert / modify / replace of a record which
1496          * we know could not be optimised by an in-place store (for
1497          * various reasons).  */
1498         if (!(rec_ptr = tdb_allocate(tdb, key.dsize + dbuf.dsize, &rec)))
1499                 goto fail;
1500
1501         /* Read hash top into next ptr */
1502         if (ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
1503                 goto fail;
1504
1505         rec.key_len = key.dsize;
1506         rec.data_len = dbuf.dsize;
1507         rec.full_hash = hash;
1508         rec.magic = TDB_MAGIC;
1509
1510         /* write out and point the top of the hash chain at it */
1511         if (rec_write(tdb, rec_ptr, &rec) == -1
1512             || tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+dbuf.dsize)==-1
1513             || ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
1514                 /* Need to tdb_unallocate() here */
1515                 goto fail;
1516         }
1517  out:
1518         SAFE_FREE(p); 
1519         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
1520         return ret;
1521 fail:
1522         ret = -1;
1523         goto out;
1524 }
1525
1526 /* Attempt to append data to an entry in place - this only works if the new data size
1527    is <= the old data size and the key exists.
1528    on failure return -1. Record must be locked before calling.
1529 */
1530 static int tdb_append_inplace(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA new_dbuf)
1531 {
1532         struct list_struct rec;
1533         tdb_off rec_ptr;
1534
1535         /* find entry */
1536         if (!(rec_ptr = tdb_find(tdb, key, tdb_hash(&key), &rec)))
1537                 return -1;
1538
1539         /* Append of 0 is always ok. */
1540         if (new_dbuf.dsize == 0)
1541                 return 0;
1542
1543         /* must be long enough for key, old data + new data and tailer */
1544         if (rec.rec_len < key.dsize + rec.data_len + new_dbuf.dsize + sizeof(tdb_off)) {
1545                 /* No room. */
1546                 tdb->ecode = TDB_SUCCESS; /* Not really an error */
1547                 return -1;
1548         }
1549
1550         if (tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len + rec.data_len,
1551                       new_dbuf.dptr, new_dbuf.dsize) == -1)
1552                 return -1;
1553
1554         /* update size */
1555         rec.data_len += new_dbuf.dsize;
1556         return rec_write(tdb, rec_ptr, &rec);
1557 }
1558
1559 /* Append to an entry. Create if not exist. */
1560
1561 int tdb_append(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA new_dbuf)
1562 {
1563         struct list_struct rec;
1564         u32 hash;
1565         tdb_off rec_ptr;
1566         char *p = NULL;
1567         int ret = 0;
1568         size_t new_data_size = 0;
1569
1570         /* find which hash bucket it is in */
1571         hash = tdb_hash(&key);
1572         if (!tdb_keylocked(tdb, hash))
1573                 return -1;
1574         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
1575                 return -1;
1576
1577         /* first try in-place. */
1578         if (tdb_append_inplace(tdb, key, new_dbuf) == 0)
1579                 goto out;
1580
1581         /* reset the error code potentially set by the tdb_append_inplace() */
1582         tdb->ecode = TDB_SUCCESS;
1583
1584         /* find entry */
1585         if (!(rec_ptr = tdb_find(tdb, key, hash, &rec))) {
1586                 if (tdb->ecode != TDB_ERR_NOEXIST)
1587                         goto fail;
1588
1589                 /* Not found - create. */
1590
1591                 ret = tdb_store(tdb, key, new_dbuf, TDB_INSERT);
1592                 goto out;
1593         }
1594
1595         new_data_size = rec.data_len + new_dbuf.dsize;
1596
1597         /* Copy key+old_value+value *before* allocating free space in case malloc
1598            fails and we are left with a dead spot in the tdb. */
1599
1600         if (!(p = (char *)malloc(key.dsize + new_data_size))) {
1601                 tdb->ecode = TDB_ERR_OOM;
1602                 goto fail;
1603         }
1604
1605         /* Copy the key in place. */
1606         memcpy(p, key.dptr, key.dsize);
1607
1608         /* Now read the old data into place. */
1609         if (rec.data_len &&
1610                 tdb_read(tdb, rec_ptr + sizeof(rec) + rec.key_len, p + key.dsize, rec.data_len, 0) == -1)
1611                         goto fail;
1612
1613         /* Finally append the new data. */
1614         if (new_dbuf.dsize)
1615                 memcpy(p+key.dsize+rec.data_len, new_dbuf.dptr, new_dbuf.dsize);
1616
1617         /* delete any existing record - if it doesn't exist we don't
1618            care.  Doing this first reduces fragmentation, and avoids
1619            coalescing with `allocated' block before it's updated. */
1620
1621         tdb_delete(tdb, key);
1622
1623         if (!(rec_ptr = tdb_allocate(tdb, key.dsize + new_data_size, &rec)))
1624                 goto fail;
1625
1626         /* Read hash top into next ptr */
1627         if (ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
1628                 goto fail;
1629
1630         rec.key_len = key.dsize;
1631         rec.data_len = new_data_size;
1632         rec.full_hash = hash;
1633         rec.magic = TDB_MAGIC;
1634
1635         /* write out and point the top of the hash chain at it */
1636         if (rec_write(tdb, rec_ptr, &rec) == -1
1637             || tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+new_data_size)==-1
1638             || ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
1639                 /* Need to tdb_unallocate() here */
1640                 goto fail;
1641         }
1642
1643  out:
1644         SAFE_FREE(p); 
1645         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
1646         return ret;
1647
1648 fail:
1649         ret = -1;
1650         goto out;
1651 }
1652
1653 static int tdb_already_open(dev_t device,
1654                             ino_t ino)
1655 {
1656         TDB_CONTEXT *i;
1657         
1658         for (i = tdbs; i; i = i->next) {
1659                 if (i->device == device && i->inode == ino) {
1660                         return 1;
1661                 }
1662         }
1663
1664         return 0;
1665 }
1666
1667 /* open the database, creating it if necessary 
1668
1669    The open_flags and mode are passed straight to the open call on the
1670    database file. A flags value of O_WRONLY is invalid. The hash size
1671    is advisory, use zero for a default value.
1672
1673    Return is NULL on error, in which case errno is also set.  Don't 
1674    try to call tdb_error or tdb_errname, just do strerror(errno).
1675
1676    @param name may be NULL for internal databases. */
1677 TDB_CONTEXT *tdb_open(const char *name, int hash_size, int tdb_flags,
1678                       int open_flags, mode_t mode)
1679 {
1680         return tdb_open_ex(name, hash_size, tdb_flags, open_flags, mode, NULL);
1681 }
1682
1683
1684 TDB_CONTEXT *tdb_open_ex(const char *name, int hash_size, int tdb_flags,
1685                          int open_flags, mode_t mode,
1686                          tdb_log_func log_fn)
1687 {
1688         TDB_CONTEXT *tdb;
1689         struct stat st;
1690         int rev = 0, locked;
1691         unsigned char *vp;
1692         u32 vertest;
1693
1694         if (!(tdb = calloc(1, sizeof *tdb))) {
1695                 /* Can't log this */
1696                 errno = ENOMEM;
1697                 goto fail;
1698         }
1699         tdb->fd = -1;
1700         tdb->name = NULL;
1701         tdb->map_ptr = NULL;
1702         tdb->lockedkeys = NULL;
1703         tdb->flags = tdb_flags;
1704         tdb->open_flags = open_flags;
1705         tdb->log_fn = log_fn;
1706         
1707         if ((open_flags & O_ACCMODE) == O_WRONLY) {
1708                 TDB_LOG((tdb, 0, "tdb_open_ex: can't open tdb %s write-only\n",
1709                          name));
1710                 errno = EINVAL;
1711                 goto fail;
1712         }
1713         
1714         if (hash_size == 0)
1715                 hash_size = DEFAULT_HASH_SIZE;
1716         if ((open_flags & O_ACCMODE) == O_RDONLY) {
1717                 tdb->read_only = 1;
1718                 /* read only databases don't do locking or clear if first */
1719                 tdb->flags |= TDB_NOLOCK;
1720                 tdb->flags &= ~TDB_CLEAR_IF_FIRST;
1721         }
1722
1723         /* internal databases don't mmap or lock, and start off cleared */
1724         if (tdb->flags & TDB_INTERNAL) {
1725                 tdb->flags |= (TDB_NOLOCK | TDB_NOMMAP);
1726                 tdb->flags &= ~TDB_CLEAR_IF_FIRST;
1727                 if (tdb_new_database(tdb, hash_size) != 0) {
1728                         TDB_LOG((tdb, 0, "tdb_open_ex: tdb_new_database failed!"));
1729                         goto fail;
1730                 }
1731                 goto internal;
1732         }
1733
1734         if ((tdb->fd = open(name, open_flags, mode)) == -1) {
1735                 TDB_LOG((tdb, 5, "tdb_open_ex: could not open file %s: %s\n",
1736                          name, strerror(errno)));
1737                 goto fail;      /* errno set by open(2) */
1738         }
1739
1740         /* 
1741            Close file when execing another process.  
1742            Prevents SELinux access errors.
1743         */
1744         set_cloexec_flag(tdb->fd, 1);
1745
1746         /* ensure there is only one process initialising at once */
1747         if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0) == -1) {
1748                 TDB_LOG((tdb, 0, "tdb_open_ex: failed to get global lock on %s: %s\n",
1749                          name, strerror(errno)));
1750                 goto fail;      /* errno set by tdb_brlock */
1751         }
1752
1753         /* we need to zero database if we are the only one with it open */
1754         if ((locked = (tdb_brlock(tdb, ACTIVE_LOCK, F_WRLCK, F_SETLK, 0) == 0))
1755             && (tdb_flags & TDB_CLEAR_IF_FIRST)) {
1756                 open_flags |= O_CREAT;
1757                 if (ftruncate(tdb->fd, 0) == -1) {
1758                         TDB_LOG((tdb, 0, "tdb_open_ex: "
1759                                  "failed to truncate %s: %s\n",
1760                                  name, strerror(errno)));
1761                         goto fail; /* errno set by ftruncate */
1762                 }
1763         }
1764
1765         if (read(tdb->fd, &tdb->header, sizeof(tdb->header)) != sizeof(tdb->header)
1766             || strcmp(tdb->header.magic_food, TDB_MAGIC_FOOD) != 0
1767             || (tdb->header.version != TDB_VERSION
1768                 && !(rev = (tdb->header.version==TDB_BYTEREV(TDB_VERSION))))) {
1769                 /* its not a valid database - possibly initialise it */
1770                 if (!(open_flags & O_CREAT) || tdb_new_database(tdb, hash_size) == -1) {
1771                         errno = EIO; /* ie bad format or something */
1772                         goto fail;
1773                 }
1774                 rev = (tdb->flags & TDB_CONVERT);
1775         }
1776         vp = (unsigned char *)&tdb->header.version;
1777         vertest = (((u32)vp[0]) << 24) | (((u32)vp[1]) << 16) |
1778                   (((u32)vp[2]) << 8) | (u32)vp[3];
1779         tdb->flags |= (vertest==TDB_VERSION) ? TDB_BIGENDIAN : 0;
1780         if (!rev)
1781                 tdb->flags &= ~TDB_CONVERT;
1782         else {
1783                 tdb->flags |= TDB_CONVERT;
1784                 convert(&tdb->header, sizeof(tdb->header));
1785         }
1786         if (fstat(tdb->fd, &st) == -1)
1787                 goto fail;
1788
1789         /* Is it already in the open list?  If so, fail. */
1790         if (tdb_already_open(st.st_dev, st.st_ino)) {
1791                 TDB_LOG((tdb, 2, "tdb_open_ex: "
1792                          "%s (%d,%d) is already open in this process\n",
1793                          name, st.st_dev, st.st_ino));
1794                 errno = EBUSY;
1795                 goto fail;
1796         }
1797
1798         if (!(tdb->name = (char *)strdup(name))) {
1799                 errno = ENOMEM;
1800                 goto fail;
1801         }
1802
1803         tdb->map_size = st.st_size;
1804         tdb->device = st.st_dev;
1805         tdb->inode = st.st_ino;
1806         tdb->locked = calloc(tdb->header.hash_size+1, sizeof(tdb->locked[0]));
1807         if (!tdb->locked) {
1808                 TDB_LOG((tdb, 2, "tdb_open_ex: "
1809                          "failed to allocate lock structure for %s\n",
1810                          name));
1811                 errno = ENOMEM;
1812                 goto fail;
1813         }
1814         tdb_mmap(tdb);
1815         if (locked) {
1816                 if (!tdb->read_only)
1817                         if (tdb_clear_spinlocks(tdb) != 0) {
1818                                 TDB_LOG((tdb, 0, "tdb_open_ex: "
1819                                 "failed to clear spinlock\n"));
1820                                 goto fail;
1821                         }
1822                 if (tdb_brlock(tdb, ACTIVE_LOCK, F_UNLCK, F_SETLK, 0) == -1) {
1823                         TDB_LOG((tdb, 0, "tdb_open_ex: "
1824                                  "failed to take ACTIVE_LOCK on %s: %s\n",
1825                                  name, strerror(errno)));
1826                         goto fail;
1827                 }
1828         }
1829         /* leave this lock in place to indicate it's in use */
1830         if (tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0) == -1)
1831                 goto fail;
1832
1833  internal:
1834         /* Internal (memory-only) databases skip all the code above to
1835          * do with disk files, and resume here by releasing their
1836          * global lock and hooking into the active list. */
1837         if (tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0) == -1)
1838                 goto fail;
1839         tdb->next = tdbs;
1840         tdbs = tdb;
1841         return tdb;
1842
1843  fail:
1844         { int save_errno = errno;
1845
1846         if (!tdb)
1847                 return NULL;
1848         
1849         if (tdb->map_ptr) {
1850                 if (tdb->flags & TDB_INTERNAL)
1851                         SAFE_FREE(tdb->map_ptr);
1852                 else
1853                         tdb_munmap(tdb);
1854         }
1855         SAFE_FREE(tdb->name);
1856         if (tdb->fd != -1)
1857                 if (close(tdb->fd) != 0)
1858                         TDB_LOG((tdb, 5, "tdb_open_ex: failed to close tdb->fd on error!\n"));
1859         SAFE_FREE(tdb->locked);
1860         SAFE_FREE(tdb);
1861         errno = save_errno;
1862         return NULL;
1863         }
1864 }
1865
1866 /**
1867  * Close a database.
1868  *
1869  * @returns -1 for error; 0 for success.
1870  **/
1871 int tdb_close(TDB_CONTEXT *tdb)
1872 {
1873         TDB_CONTEXT **i;
1874         int ret = 0;
1875
1876         if (tdb->map_ptr) {
1877                 if (tdb->flags & TDB_INTERNAL)
1878                         SAFE_FREE(tdb->map_ptr);
1879                 else
1880                         tdb_munmap(tdb);
1881         }
1882         SAFE_FREE(tdb->name);
1883         if (tdb->fd != -1)
1884                 ret = close(tdb->fd);
1885         SAFE_FREE(tdb->locked);
1886         SAFE_FREE(tdb->lockedkeys);
1887
1888         /* Remove from contexts list */
1889         for (i = &tdbs; *i; i = &(*i)->next) {
1890                 if (*i == tdb) {
1891                         *i = tdb->next;
1892                         break;
1893                 }
1894         }
1895
1896         memset(tdb, 0, sizeof(*tdb));
1897         SAFE_FREE(tdb);
1898
1899         return ret;
1900 }
1901
1902 /* lock/unlock entire database */
1903 int tdb_lockall(TDB_CONTEXT *tdb)
1904 {
1905         u32 i;
1906
1907         /* There are no locks on read-only dbs */
1908         if (tdb->read_only)
1909                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
1910         if (tdb->lockedkeys)
1911                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1912         for (i = 0; i < tdb->header.hash_size; i++) 
1913                 if (tdb_lock(tdb, i, F_WRLCK))
1914                         break;
1915
1916         /* If error, release locks we have... */
1917         if (i < tdb->header.hash_size) {
1918                 u32 j;
1919
1920                 for ( j = 0; j < i; j++)
1921                         tdb_unlock(tdb, j, F_WRLCK);
1922                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1923         }
1924
1925         return 0;
1926 }
1927 void tdb_unlockall(TDB_CONTEXT *tdb)
1928 {
1929         u32 i;
1930         for (i=0; i < tdb->header.hash_size; i++)
1931                 tdb_unlock(tdb, i, F_WRLCK);
1932 }
1933
1934 int tdb_lockkeys(TDB_CONTEXT *tdb, u32 number, TDB_DATA keys[])
1935 {
1936         u32 i, j, hash;
1937
1938         /* Can't lock more keys if already locked */
1939         if (tdb->lockedkeys)
1940                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1941         if (!(tdb->lockedkeys = malloc(sizeof(u32) * (number+1))))
1942                 return TDB_ERRCODE(TDB_ERR_OOM, -1);
1943         /* First number in array is # keys */
1944         tdb->lockedkeys[0] = number;
1945
1946         /* Insertion sort by bucket */
1947         for (i = 0; i < number; i++) {
1948                 hash = tdb_hash(&keys[i]);
1949                 for (j = 0; j < i && BUCKET(tdb->lockedkeys[j+1]) < BUCKET(hash); j++);
1950                         memmove(&tdb->lockedkeys[j+2], &tdb->lockedkeys[j+1], sizeof(u32) * (i-j));
1951                 tdb->lockedkeys[j+1] = hash;
1952         }
1953         /* Finally, lock in order */
1954         for (i = 0; i < number; i++)
1955                 if (tdb_lock(tdb, i, F_WRLCK))
1956                         break;
1957
1958         /* If error, release locks we have... */
1959         if (i < number) {
1960                 for ( j = 0; j < i; j++)
1961                         tdb_unlock(tdb, j, F_WRLCK);
1962                 SAFE_FREE(tdb->lockedkeys);
1963                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1964         }
1965         return 0;
1966 }
1967
1968 /* Unlock the keys previously locked by tdb_lockkeys() */
1969 void tdb_unlockkeys(TDB_CONTEXT *tdb)
1970 {
1971         u32 i;
1972         if (!tdb->lockedkeys)
1973                 return;
1974         for (i = 0; i < tdb->lockedkeys[0]; i++)
1975                 tdb_unlock(tdb, tdb->lockedkeys[i+1], F_WRLCK);
1976         SAFE_FREE(tdb->lockedkeys);
1977 }
1978
1979 /* lock/unlock one hash chain. This is meant to be used to reduce
1980    contention - it cannot guarantee how many records will be locked */
1981 int tdb_chainlock(TDB_CONTEXT *tdb, TDB_DATA key)
1982 {
1983         return tdb_lock(tdb, BUCKET(tdb_hash(&key)), F_WRLCK);
1984 }
1985
1986 int tdb_chainunlock(TDB_CONTEXT *tdb, TDB_DATA key)
1987 {
1988         return tdb_unlock(tdb, BUCKET(tdb_hash(&key)), F_WRLCK);
1989 }
1990
1991 int tdb_chainlock_read(TDB_CONTEXT *tdb, TDB_DATA key)
1992 {
1993         return tdb_lock(tdb, BUCKET(tdb_hash(&key)), F_RDLCK);
1994 }
1995
1996 int tdb_chainunlock_read(TDB_CONTEXT *tdb, TDB_DATA key)
1997 {
1998         return tdb_unlock(tdb, BUCKET(tdb_hash(&key)), F_RDLCK);
1999 }
2000
2001
2002 /* register a loging function */
2003 void tdb_logging_function(TDB_CONTEXT *tdb, void (*fn)(TDB_CONTEXT *, int , const char *, ...))
2004 {
2005         tdb->log_fn = fn;
2006 }
2007
2008
2009 /* reopen a tdb - this is used after a fork to ensure that we have an independent
2010    seek pointer from our parent and to re-establish locks */
2011 int tdb_reopen(TDB_CONTEXT *tdb)
2012 {
2013         struct stat st;
2014
2015         if (tdb_munmap(tdb) != 0) {
2016                 TDB_LOG((tdb, 0, "tdb_reopen: munmap failed (%s)\n", strerror(errno)));
2017                 goto fail;
2018         }
2019         if (close(tdb->fd) != 0)
2020                 TDB_LOG((tdb, 0, "tdb_reopen: WARNING closing tdb->fd failed!\n"));
2021         tdb->fd = open(tdb->name, tdb->open_flags & ~(O_CREAT|O_TRUNC), 0);
2022         if (tdb->fd == -1) {
2023                 TDB_LOG((tdb, 0, "tdb_reopen: open failed (%s)\n", strerror(errno)));
2024                 goto fail;
2025         }
2026         if (fstat(tdb->fd, &st) != 0) {
2027                 TDB_LOG((tdb, 0, "tdb_reopen: fstat failed (%s)\n", strerror(errno)));
2028                 goto fail;
2029         }
2030         if (st.st_ino != tdb->inode || st.st_dev != tdb->device) {
2031                 TDB_LOG((tdb, 0, "tdb_reopen: file dev/inode has changed!\n"));
2032                 goto fail;
2033         }
2034         tdb_mmap(tdb);
2035         if (tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0) == -1) {
2036                 TDB_LOG((tdb, 0, "tdb_reopen: failed to obtain active lock\n"));
2037                 goto fail;
2038         }
2039
2040         return 0;
2041
2042 fail:
2043         tdb_close(tdb);
2044         return -1;
2045 }
2046
2047 /* reopen all tdb's */
2048 int tdb_reopen_all(void)
2049 {
2050         TDB_CONTEXT *tdb;
2051
2052         for (tdb=tdbs; tdb; tdb = tdb->next) {
2053                 if (tdb_reopen(tdb) != 0) return -1;
2054         }
2055
2056         return 0;
2057 }