chiark / gitweb /
af475bb7adee8ed65a15336c5f3016d9c2dd339c
[elogind.git] / tdb / tdb.c
1  /* 
2    Unix SMB/CIFS implementation.
3    Samba database functions
4    Copyright (C) Andrew Tridgell              1999-2000
5    Copyright (C) Luke Kenneth Casson Leighton      2000
6    Copyright (C) Paul `Rusty' Russell              2000
7    Copyright (C) Jeremy Allison                    2000-2003
8    
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 2 of the License, or
12    (at your option) any later version.
13    
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18    
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22 */
23
24
25 /* NOTE: If you use tdbs under valgrind, and in particular if you run
26  * tdbtorture, you may get spurious "uninitialized value" warnings.  I
27  * think this is because valgrind doesn't understand that the mmap'd
28  * area may be written to by other processes.  Memory can, from the
29  * point of view of the grinded process, spontaneously become
30  * initialized.
31  *
32  * I can think of a few solutions.  [mbp 20030311]
33  *
34  * 1 - Write suppressions for Valgrind so that it doesn't complain
35  * about this.  Probably the most reasonable but people need to
36  * remember to use them.
37  *
38  * 2 - Use IO not mmap when running under valgrind.  Not so nice.
39  *
40  * 3 - Use the special valgrind macros to mark memory as valid at the
41  * right time.  Probably too hard -- the process just doesn't know.
42  */ 
43
44 /* udev defines */
45 #define STANDALONE
46 #define TDB_DEBUG
47 #define HAVE_MMAP       1
48 /* this should prevent deadlocks loops on corrupt databases
49  * we've discovered. Most deadlocks happend by iterating over the
50  * list of entries with the same hash value. */
51 #define LOOP_MAX        100000
52 #define TDB_LOG(x) TDB_LOG_UDEV x
53 #define TDB_LOG_UDEV(tdb, level, format, arg...) info(format, ##arg)
54
55 #ifdef STANDALONE
56 #if HAVE_CONFIG_H
57 #include <config.h>
58 #endif
59
60 #define _KLIBC_HAS_ARCH_SIG_ATOMIC_T
61 #include <stdlib.h>
62 #include <stdio.h>
63 #include <fcntl.h>
64 #include <unistd.h>
65 #include <string.h>
66 #include <fcntl.h>
67 #include <errno.h>
68 #include <sys/mman.h>
69 #include <sys/stat.h>
70 #include <signal.h>
71 #include "tdb.h"
72 #include "spinlock.h"
73 #include "../udev_lib.h"
74 #include "../logging.h"
75 #else
76 #include "includes.h"
77 #endif
78
79 #define TDB_MAGIC_FOOD "TDB file\n"
80 #define TDB_VERSION (0x26011967 + 6)
81 #define TDB_MAGIC (0x26011999U)
82 #define TDB_FREE_MAGIC (~TDB_MAGIC)
83 #define TDB_DEAD_MAGIC (0xFEE1DEAD)
84 #define TDB_ALIGNMENT 4
85 #define MIN_REC_SIZE (2*sizeof(struct list_struct) + TDB_ALIGNMENT)
86 #define DEFAULT_HASH_SIZE 131
87 #define TDB_PAGE_SIZE 0x2000
88 #define FREELIST_TOP (sizeof(struct tdb_header))
89 #define TDB_ALIGN(x,a) (((x) + (a)-1) & ~((a)-1))
90 #define TDB_BYTEREV(x) (((((x)&0xff)<<24)|((x)&0xFF00)<<8)|(((x)>>8)&0xFF00)|((x)>>24))
91 #define TDB_DEAD(r) ((r)->magic == TDB_DEAD_MAGIC)
92 #define TDB_BAD_MAGIC(r) ((r)->magic != TDB_MAGIC && !TDB_DEAD(r))
93 #define TDB_HASH_TOP(hash) (FREELIST_TOP + (BUCKET(hash)+1)*sizeof(tdb_off))
94
95 /* NB assumes there is a local variable called "tdb" that is the
96  * current context, also takes doubly-parenthesized print-style
97  * argument. */
98 #ifndef TDB_LOG
99 #define TDB_LOG(x) (tdb->log_fn?((tdb->log_fn x),0) : 0)
100 #endif
101
102 /* lock offsets */
103 #define GLOBAL_LOCK 0
104 #define ACTIVE_LOCK 4
105
106 #ifndef MAP_FILE
107 #define MAP_FILE 0
108 #endif
109
110 #ifndef MAP_FAILED
111 #define MAP_FAILED ((void *)-1)
112 #endif
113
114 /* free memory if the pointer is valid and zero the pointer */
115 #ifndef SAFE_FREE
116 #define SAFE_FREE(x) do { if ((x) != NULL) {free((x)); (x)=NULL;} } while(0)
117 #endif
118
119 #define BUCKET(hash) ((hash) % tdb->header.hash_size)
120 TDB_DATA tdb_null;
121
122 /* all contexts, to ensure no double-opens (fcntl locks don't nest!) */
123 static TDB_CONTEXT *tdbs = NULL;
124
125 static int tdb_munmap(TDB_CONTEXT *tdb)
126 {
127         if (tdb->flags & TDB_INTERNAL)
128                 return 0;
129
130 #ifdef HAVE_MMAP
131         if (tdb->map_ptr) {
132                 int ret = munmap(tdb->map_ptr, tdb->map_size);
133                 if (ret != 0)
134                         return ret;
135         }
136 #endif
137         tdb->map_ptr = NULL;
138         return 0;
139 }
140
141 static void tdb_mmap(TDB_CONTEXT *tdb)
142 {
143         if (tdb->flags & TDB_INTERNAL)
144                 return;
145
146 #ifdef HAVE_MMAP
147         if (!(tdb->flags & TDB_NOMMAP)) {
148                 tdb->map_ptr = mmap(NULL, tdb->map_size, 
149                                     PROT_READ|(tdb->read_only? 0:PROT_WRITE), 
150                                     MAP_SHARED|MAP_FILE, tdb->fd, 0);
151
152                 /*
153                  * NB. When mmap fails it returns MAP_FAILED *NOT* NULL !!!!
154                  */
155
156                 if (tdb->map_ptr == MAP_FAILED) {
157                         tdb->map_ptr = NULL;
158                         TDB_LOG((tdb, 2, "tdb_mmap failed for size %d (%s)\n", 
159                                  tdb->map_size, strerror(errno)));
160                 }
161         } else {
162                 tdb->map_ptr = NULL;
163         }
164 #else
165         tdb->map_ptr = NULL;
166 #endif
167 }
168
169 /* Endian conversion: we only ever deal with 4 byte quantities */
170 static void *convert(void *buf, u32 size)
171 {
172         u32 i, *p = buf;
173         for (i = 0; i < size / 4; i++)
174                 p[i] = TDB_BYTEREV(p[i]);
175         return buf;
176 }
177 #define DOCONV() (tdb->flags & TDB_CONVERT)
178 #define CONVERT(x) (DOCONV() ? convert(&x, sizeof(x)) : &x)
179
180 /* the body of the database is made of one list_struct for the free space
181    plus a separate data list for each hash value */
182 struct list_struct {
183         tdb_off next; /* offset of the next record in the list */
184         tdb_len rec_len; /* total byte length of record */
185         tdb_len key_len; /* byte length of key */
186         tdb_len data_len; /* byte length of data */
187         u32 full_hash; /* the full 32 bit hash of the key */
188         u32 magic;   /* try to catch errors */
189         /* the following union is implied:
190                 union {
191                         char record[rec_len];
192                         struct {
193                                 char key[key_len];
194                                 char data[data_len];
195                         }
196                         u32 totalsize; (tailer)
197                 }
198         */
199 };
200
201 /***************************************************************
202  Allow a caller to set a "alarm" flag that tdb can check to abort
203  a blocking lock on SIGALRM.
204 ***************************************************************/
205
206 static sig_atomic_t *palarm_fired;
207
208 void tdb_set_lock_alarm(sig_atomic_t *palarm)
209 {
210         palarm_fired = palarm;
211 }
212
213 /* a byte range locking function - return 0 on success
214    this functions locks/unlocks 1 byte at the specified offset.
215
216    On error, errno is also set so that errors are passed back properly
217    through tdb_open(). */
218 static int tdb_brlock(TDB_CONTEXT *tdb, tdb_off offset, 
219                       int rw_type, int lck_type, int probe)
220 {
221         struct flock fl;
222         int ret;
223
224         if (tdb->flags & TDB_NOLOCK)
225                 return 0;
226         if ((rw_type == F_WRLCK) && (tdb->read_only)) {
227                 errno = EACCES;
228                 return -1;
229         }
230
231         fl.l_type = rw_type;
232         fl.l_whence = SEEK_SET;
233         fl.l_start = offset;
234         fl.l_len = 1;
235         fl.l_pid = 0;
236
237         do {
238                 ret = fcntl(tdb->fd,lck_type,&fl);
239                 if (ret == -1 && errno == EINTR && palarm_fired && *palarm_fired)
240                         break;
241         } while (ret == -1 && errno == EINTR);
242
243         if (ret == -1) {
244                 if (!probe && lck_type != F_SETLK) {
245                         /* Ensure error code is set for log fun to examine. */
246                         if (errno == EINTR && palarm_fired && *palarm_fired)
247                                 tdb->ecode = TDB_ERR_LOCK_TIMEOUT;
248                         else
249                                 tdb->ecode = TDB_ERR_LOCK;
250                         TDB_LOG((tdb, 5,"tdb_brlock failed (fd=%d) at offset %d rw_type=%d lck_type=%d\n", 
251                                  tdb->fd, offset, rw_type, lck_type));
252                 }
253                 /* Was it an alarm timeout ? */
254                 if (errno == EINTR && palarm_fired && *palarm_fired)
255                         return TDB_ERRCODE(TDB_ERR_LOCK_TIMEOUT, -1);
256                 /* Otherwise - generic lock error. */
257                 /* errno set by fcntl */
258                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
259         }
260         return 0;
261 }
262
263 /* lock a list in the database. list -1 is the alloc list */
264 static int tdb_lock(TDB_CONTEXT *tdb, int list, int ltype)
265 {
266         if (list < -1 || list >= (int)tdb->header.hash_size) {
267                 TDB_LOG((tdb, 0,"tdb_lock: invalid list %d for ltype=%d\n", 
268                            list, ltype));
269                 return -1;
270         }
271         if (tdb->flags & TDB_NOLOCK)
272                 return 0;
273
274         /* Since fcntl locks don't nest, we do a lock for the first one,
275            and simply bump the count for future ones */
276         if (tdb->locked[list+1].count == 0) {
277                 if (!tdb->read_only && tdb->header.rwlocks) {
278                         if (tdb_spinlock(tdb, list, ltype)) {
279                                 TDB_LOG((tdb, 0, "tdb_lock spinlock failed on list %d ltype=%d\n", 
280                                            list, ltype));
281                                 return -1;
282                         }
283                 } else if (tdb_brlock(tdb,FREELIST_TOP+4*list,ltype,F_SETLKW, 0)) {
284                         TDB_LOG((tdb, 0,"tdb_lock failed on list %d ltype=%d (%s)\n", 
285                                            list, ltype, strerror(errno)));
286                         return -1;
287                 }
288                 tdb->locked[list+1].ltype = ltype;
289         }
290         tdb->locked[list+1].count++;
291         return 0;
292 }
293
294 /* unlock the database: returns void because it's too late for errors. */
295         /* changed to return int it may be interesting to know there
296            has been an error  --simo */
297 static int tdb_unlock(TDB_CONTEXT *tdb, int list, int ltype)
298 {
299         int ret = -1;
300
301         if (tdb->flags & TDB_NOLOCK)
302                 return 0;
303
304         /* Sanity checks */
305         if (list < -1 || list >= (int)tdb->header.hash_size) {
306                 TDB_LOG((tdb, 0, "tdb_unlock: list %d invalid (%d)\n", list, tdb->header.hash_size));
307                 return ret;
308         }
309
310         if (tdb->locked[list+1].count==0) {
311                 TDB_LOG((tdb, 0, "tdb_unlock: count is 0\n"));
312                 return ret;
313         }
314
315         if (tdb->locked[list+1].count == 1) {
316                 /* Down to last nested lock: unlock underneath */
317                 if (!tdb->read_only && tdb->header.rwlocks) {
318                         ret = tdb_spinunlock(tdb, list, ltype);
319                 } else {
320                         ret = tdb_brlock(tdb, FREELIST_TOP+4*list, F_UNLCK, F_SETLKW, 0);
321                 }
322         } else {
323                 ret = 0;
324         }
325         tdb->locked[list+1].count--;
326
327         if (ret)
328                 TDB_LOG((tdb, 0,"tdb_unlock: An error occurred unlocking!\n")); 
329         return ret;
330 }
331
332 /* This is based on the hash algorithm from gdbm */
333 static u32 tdb_hash(TDB_DATA *key)
334 {
335         u32 value;      /* Used to compute the hash value.  */
336         u32   i;        /* Used to cycle through random values. */
337
338         /* Set the initial value from the key size. */
339         for (value = 0x238F13AF * key->dsize, i=0; i < key->dsize; i++)
340                 value = (value + (key->dptr[i] << (i*5 % 24)));
341
342         return (1103515243 * value + 12345);  
343 }
344
345 /* check for an out of bounds access - if it is out of bounds then
346    see if the database has been expanded by someone else and expand
347    if necessary 
348    note that "len" is the minimum length needed for the db
349 */
350 static int tdb_oob(TDB_CONTEXT *tdb, tdb_off len, int probe)
351 {
352         struct stat st;
353         if (len <= tdb->map_size)
354                 return 0;
355         if (tdb->flags & TDB_INTERNAL) {
356                 if (!probe) {
357                         /* Ensure ecode is set for log fn. */
358                         tdb->ecode = TDB_ERR_IO;
359                         TDB_LOG((tdb, 0,"tdb_oob len %d beyond internal malloc size %d\n",
360                                  (int)len, (int)tdb->map_size));
361                 }
362                 return TDB_ERRCODE(TDB_ERR_IO, -1);
363         }
364
365         if (fstat(tdb->fd, &st) == -1)
366                 return TDB_ERRCODE(TDB_ERR_IO, -1);
367
368         if (st.st_size < (size_t)len) {
369                 if (!probe) {
370                         /* Ensure ecode is set for log fn. */
371                         tdb->ecode = TDB_ERR_IO;
372                         TDB_LOG((tdb, 0,"tdb_oob len %d beyond eof at %d\n",
373                                  (int)len, (int)st.st_size));
374                 }
375                 return TDB_ERRCODE(TDB_ERR_IO, -1);
376         }
377
378         /* Unmap, update size, remap */
379         if (tdb_munmap(tdb) == -1)
380                 return TDB_ERRCODE(TDB_ERR_IO, -1);
381         tdb->map_size = st.st_size;
382         tdb_mmap(tdb);
383         return 0;
384 }
385
386 /* write a lump of data at a specified offset */
387 static int tdb_write(TDB_CONTEXT *tdb, tdb_off off, void *buf, tdb_len len)
388 {
389         if (tdb_oob(tdb, off + len, 0) != 0)
390                 return -1;
391
392         if (tdb->map_ptr)
393                 memcpy(off + (char *)tdb->map_ptr, buf, len);
394 #ifdef HAVE_PWRITE
395         else if (pwrite(tdb->fd, buf, len, off) != (ssize_t)len) {
396 #else
397         else if (lseek(tdb->fd, off, SEEK_SET) != off
398                  || write(tdb->fd, buf, len) != (ssize_t)len) {
399 #endif
400                 /* Ensure ecode is set for log fn. */
401                 tdb->ecode = TDB_ERR_IO;
402                 TDB_LOG((tdb, 0,"tdb_write failed at %d len=%d (%s)\n",
403                            off, len, strerror(errno)));
404                 return TDB_ERRCODE(TDB_ERR_IO, -1);
405         }
406         return 0;
407 }
408
409 /* read a lump of data at a specified offset, maybe convert */
410 static int tdb_read(TDB_CONTEXT *tdb,tdb_off off,void *buf,tdb_len len,int cv)
411 {
412         if (tdb_oob(tdb, off + len, 0) != 0)
413                 return -1;
414
415         if (tdb->map_ptr)
416                 memcpy(buf, off + (char *)tdb->map_ptr, len);
417 #ifdef HAVE_PREAD
418         else if (pread(tdb->fd, buf, len, off) != (ssize_t)len) {
419 #else
420         else if (lseek(tdb->fd, off, SEEK_SET) != off
421                  || read(tdb->fd, buf, len) != (ssize_t)len) {
422 #endif
423                 /* Ensure ecode is set for log fn. */
424                 tdb->ecode = TDB_ERR_IO;
425                 TDB_LOG((tdb, 0,"tdb_read failed at %d len=%d (%s)\n",
426                            off, len, strerror(errno)));
427                 return TDB_ERRCODE(TDB_ERR_IO, -1);
428         }
429         if (cv)
430                 convert(buf, len);
431         return 0;
432 }
433
434 /* read a lump of data, allocating the space for it */
435 static char *tdb_alloc_read(TDB_CONTEXT *tdb, tdb_off offset, tdb_len len)
436 {
437         char *buf;
438
439         if (!(buf = malloc(len))) {
440                 /* Ensure ecode is set for log fn. */
441                 tdb->ecode = TDB_ERR_OOM;
442                 TDB_LOG((tdb, 0,"tdb_alloc_read malloc failed len=%d (%s)\n",
443                            len, strerror(errno)));
444                 return TDB_ERRCODE(TDB_ERR_OOM, buf);
445         }
446         if (tdb_read(tdb, offset, buf, len, 0) == -1) {
447                 SAFE_FREE(buf);
448                 return NULL;
449         }
450         return buf;
451 }
452
453 /* read/write a tdb_off */
454 static int ofs_read(TDB_CONTEXT *tdb, tdb_off offset, tdb_off *d)
455 {
456         return tdb_read(tdb, offset, (char*)d, sizeof(*d), DOCONV());
457 }
458 static int ofs_write(TDB_CONTEXT *tdb, tdb_off offset, tdb_off *d)
459 {
460         tdb_off off = *d;
461         return tdb_write(tdb, offset, CONVERT(off), sizeof(*d));
462 }
463
464 /* read/write a record */
465 static int rec_read(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
466 {
467         if (tdb_read(tdb, offset, rec, sizeof(*rec),DOCONV()) == -1)
468                 return -1;
469         if (TDB_BAD_MAGIC(rec)) {
470                 /* Ensure ecode is set for log fn. */
471                 tdb->ecode = TDB_ERR_CORRUPT;
472                 TDB_LOG((tdb, 0,"rec_read bad magic 0x%x at offset=%d\n", rec->magic, offset));
473                 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
474         }
475         return tdb_oob(tdb, rec->next+sizeof(*rec), 0);
476 }
477 static int rec_write(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
478 {
479         struct list_struct r = *rec;
480         return tdb_write(tdb, offset, CONVERT(r), sizeof(r));
481 }
482
483 /* read a freelist record and check for simple errors */
484 static int rec_free_read(TDB_CONTEXT *tdb, tdb_off off, struct list_struct *rec)
485 {
486         if (tdb_read(tdb, off, rec, sizeof(*rec),DOCONV()) == -1)
487                 return -1;
488
489         if (rec->magic == TDB_MAGIC) {
490                 /* this happens when a app is showdown while deleting a record - we should
491                    not completely fail when this happens */
492                 TDB_LOG((tdb, 0,"rec_free_read non-free magic 0x%x at offset=%d - fixing\n", 
493                          rec->magic, off));
494                 rec->magic = TDB_FREE_MAGIC;
495                 if (tdb_write(tdb, off, rec, sizeof(*rec)) == -1)
496                         return -1;
497         }
498
499         if (rec->magic != TDB_FREE_MAGIC) {
500                 /* Ensure ecode is set for log fn. */
501                 tdb->ecode = TDB_ERR_CORRUPT;
502                 TDB_LOG((tdb, 0,"rec_free_read bad magic 0x%x at offset=%d\n", 
503                            rec->magic, off));
504                 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
505         }
506         if (tdb_oob(tdb, rec->next+sizeof(*rec), 0) != 0)
507                 return -1;
508         return 0;
509 }
510
511 /* update a record tailer (must hold allocation lock) */
512 static int update_tailer(TDB_CONTEXT *tdb, tdb_off offset,
513                          const struct list_struct *rec)
514 {
515         tdb_off totalsize;
516
517         /* Offset of tailer from record header */
518         totalsize = sizeof(*rec) + rec->rec_len;
519         return ofs_write(tdb, offset + totalsize - sizeof(tdb_off),
520                          &totalsize);
521 }
522
523 static tdb_off tdb_dump_record(TDB_CONTEXT *tdb, tdb_off offset)
524 {
525         struct list_struct rec;
526         tdb_off tailer_ofs, tailer;
527
528         if (tdb_read(tdb, offset, (char *)&rec, sizeof(rec), DOCONV()) == -1) {
529                 printf("ERROR: failed to read record at %u\n", offset);
530                 return 0;
531         }
532
533         printf(" rec: offset=%u next=%d rec_len=%d key_len=%d data_len=%d full_hash=0x%x magic=0x%x\n",
534                offset, rec.next, rec.rec_len, rec.key_len, rec.data_len, rec.full_hash, rec.magic);
535
536         tailer_ofs = offset + sizeof(rec) + rec.rec_len - sizeof(tdb_off);
537         if (ofs_read(tdb, tailer_ofs, &tailer) == -1) {
538                 printf("ERROR: failed to read tailer at %u\n", tailer_ofs);
539                 return rec.next;
540         }
541
542         if (tailer != rec.rec_len + sizeof(rec)) {
543                 printf("ERROR: tailer does not match record! tailer=%u totalsize=%u\n",
544                                 (unsigned)tailer, (unsigned)(rec.rec_len + sizeof(rec)));
545         }
546         return rec.next;
547 }
548
549 static int tdb_dump_chain(TDB_CONTEXT *tdb, int i)
550 {
551         tdb_off rec_ptr, top;
552
553         top = TDB_HASH_TOP(i);
554
555         if (tdb_lock(tdb, i, F_WRLCK) != 0)
556                 return -1;
557
558         if (ofs_read(tdb, top, &rec_ptr) == -1)
559                 return tdb_unlock(tdb, i, F_WRLCK);
560
561         if (rec_ptr)
562                 printf("hash=%d\n", i);
563
564         while (rec_ptr) {
565                 rec_ptr = tdb_dump_record(tdb, rec_ptr);
566         }
567
568         return tdb_unlock(tdb, i, F_WRLCK);
569 }
570
571 void tdb_dump_all(TDB_CONTEXT *tdb)
572 {
573         int i;
574         for (i=0;i<tdb->header.hash_size;i++) {
575                 tdb_dump_chain(tdb, i);
576         }
577         printf("freelist:\n");
578         tdb_dump_chain(tdb, -1);
579 }
580
581 int tdb_printfreelist(TDB_CONTEXT *tdb)
582 {
583         int ret;
584         long total_free = 0;
585         tdb_off offset, rec_ptr;
586         struct list_struct rec;
587
588         if ((ret = tdb_lock(tdb, -1, F_WRLCK)) != 0)
589                 return ret;
590
591         offset = FREELIST_TOP;
592
593         /* read in the freelist top */
594         if (ofs_read(tdb, offset, &rec_ptr) == -1) {
595                 tdb_unlock(tdb, -1, F_WRLCK);
596                 return 0;
597         }
598
599         printf("freelist top=[0x%08x]\n", rec_ptr );
600         while (rec_ptr) {
601                 if (tdb_read(tdb, rec_ptr, (char *)&rec, sizeof(rec), DOCONV()) == -1) {
602                         tdb_unlock(tdb, -1, F_WRLCK);
603                         return -1;
604                 }
605
606                 if (rec.magic != TDB_FREE_MAGIC) {
607                         printf("bad magic 0x%08x in free list\n", rec.magic);
608                         tdb_unlock(tdb, -1, F_WRLCK);
609                         return -1;
610                 }
611
612                 printf("entry offset=[0x%08x], rec.rec_len = [0x%08x (%d)]\n", rec.next, rec.rec_len, rec.rec_len );
613                 total_free += rec.rec_len;
614
615                 /* move to the next record */
616                 rec_ptr = rec.next;
617         }
618         printf("total rec_len = [0x%08x (%d)]\n", (int)total_free, 
619                (int)total_free);
620
621         return tdb_unlock(tdb, -1, F_WRLCK);
622 }
623
624 /* Remove an element from the freelist.  Must have alloc lock. */
625 static int remove_from_freelist(TDB_CONTEXT *tdb, tdb_off off, tdb_off next)
626 {
627         tdb_off last_ptr, i;
628         int maxloop;
629
630         /* read in the freelist top */
631         maxloop = LOOP_MAX;
632         last_ptr = FREELIST_TOP;
633         while (ofs_read(tdb, last_ptr, &i) != -1 && i != 0) {
634                 if (i == off) {
635                         /* We've found it! */
636                         return ofs_write(tdb, last_ptr, &next);
637                 }
638                 /* Follow chain (next offset is at start of record) */
639                 last_ptr = i;
640
641                 maxloop--;
642                 if (maxloop == 0) {
643                         TDB_LOG((tdb, 0, "remove_from_freelist: maxloop reached; corrupt database!\n"));
644                         return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
645                 }
646         }
647         TDB_LOG((tdb, 0,"remove_from_freelist: not on list at off=%d\n", off));
648         return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
649 }
650
651 /* Add an element into the freelist. Merge adjacent records if
652    neccessary. */
653 static int tdb_free(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
654 {
655         tdb_off right, left;
656
657         /* Allocation and tailer lock */
658         if (tdb_lock(tdb, -1, F_WRLCK) != 0)
659                 return -1;
660
661         /* set an initial tailer, so if we fail we don't leave a bogus record */
662         if (update_tailer(tdb, offset, rec) != 0) {
663                 TDB_LOG((tdb, 0, "tdb_free: upfate_tailer failed!\n"));
664                 goto fail;
665         }
666
667         /* Look right first (I'm an Australian, dammit) */
668         right = offset + sizeof(*rec) + rec->rec_len;
669         if (right + sizeof(*rec) <= tdb->map_size) {
670                 struct list_struct r;
671
672                 if (tdb_read(tdb, right, &r, sizeof(r), DOCONV()) == -1) {
673                         TDB_LOG((tdb, 0, "tdb_free: right read failed at %u\n", right));
674                         goto left;
675                 }
676
677                 /* If it's free, expand to include it. */
678                 if (r.magic == TDB_FREE_MAGIC) {
679                         if (remove_from_freelist(tdb, right, r.next) == -1) {
680                                 TDB_LOG((tdb, 0, "tdb_free: right free failed at %u\n", right));
681                                 goto left;
682                         }
683                         rec->rec_len += sizeof(r) + r.rec_len;
684                 }
685         }
686
687 left:
688         /* Look left */
689         left = offset - sizeof(tdb_off);
690         if (left > TDB_HASH_TOP(tdb->header.hash_size-1)) {
691                 struct list_struct l;
692                 tdb_off leftsize;
693
694                 /* Read in tailer and jump back to header */
695                 if (ofs_read(tdb, left, &leftsize) == -1) {
696                         TDB_LOG((tdb, 0, "tdb_free: left offset read failed at %u\n", left));
697                         goto update;
698                 }
699                 left = offset - leftsize;
700
701                 /* Now read in record */
702                 if (tdb_read(tdb, left, &l, sizeof(l), DOCONV()) == -1) {
703                         TDB_LOG((tdb, 0, "tdb_free: left read failed at %u (%u)\n", left, leftsize));
704                         goto update;
705                 }
706
707                 /* If it's free, expand to include it. */
708                 if (l.magic == TDB_FREE_MAGIC) {
709                         if (remove_from_freelist(tdb, left, l.next) == -1) {
710                                 TDB_LOG((tdb, 0, "tdb_free: left free failed at %u\n", left));
711                                 goto update;
712                         } else {
713                                 offset = left;
714                                 rec->rec_len += leftsize;
715                         }
716                 }
717         }
718
719 update:
720         if (update_tailer(tdb, offset, rec) == -1) {
721                 TDB_LOG((tdb, 0, "tdb_free: update_tailer failed at %u\n", offset));
722                 goto fail;
723         }
724
725         /* Now, prepend to free list */
726         rec->magic = TDB_FREE_MAGIC;
727
728         if (ofs_read(tdb, FREELIST_TOP, &rec->next) == -1 ||
729             rec_write(tdb, offset, rec) == -1 ||
730             ofs_write(tdb, FREELIST_TOP, &offset) == -1) {
731                 TDB_LOG((tdb, 0, "tdb_free record write failed at offset=%d\n", offset));
732                 goto fail;
733         }
734
735         /* And we're done. */
736         tdb_unlock(tdb, -1, F_WRLCK);
737         return 0;
738
739  fail:
740         tdb_unlock(tdb, -1, F_WRLCK);
741         return -1;
742 }
743
744
745 /* expand a file.  we prefer to use ftruncate, as that is what posix
746   says to use for mmap expansion */
747 static int expand_file(TDB_CONTEXT *tdb, tdb_off size, tdb_off addition)
748 {
749         char buf[1024];
750 #if HAVE_FTRUNCATE_EXTEND
751         if (ftruncate(tdb->fd, size+addition) != 0) {
752                 TDB_LOG((tdb, 0, "expand_file ftruncate to %d failed (%s)\n", 
753                            size+addition, strerror(errno)));
754                 return -1;
755         }
756 #else
757         char b = 0;
758
759 #ifdef HAVE_PWRITE
760         if (pwrite(tdb->fd,  &b, 1, (size+addition) - 1) != 1) {
761 #else
762         if (lseek(tdb->fd, (size+addition) - 1, SEEK_SET) != (size+addition) - 1 || 
763             write(tdb->fd, &b, 1) != 1) {
764 #endif
765                 TDB_LOG((tdb, 0, "expand_file to %d failed (%s)\n", 
766                            size+addition, strerror(errno)));
767                 return -1;
768         }
769 #endif
770
771         /* now fill the file with something. This ensures that the file isn't sparse, which would be
772            very bad if we ran out of disk. This must be done with write, not via mmap */
773         memset(buf, 0x42, sizeof(buf));
774         while (addition) {
775                 int n = addition>sizeof(buf)?sizeof(buf):addition;
776 #ifdef HAVE_PWRITE
777                 int ret = pwrite(tdb->fd, buf, n, size);
778 #else
779                 int ret;
780                 if (lseek(tdb->fd, size, SEEK_SET) != size)
781                         return -1;
782                 ret = write(tdb->fd, buf, n);
783 #endif
784                 if (ret != n) {
785                         TDB_LOG((tdb, 0, "expand_file write of %d failed (%s)\n", 
786                                    n, strerror(errno)));
787                         return -1;
788                 }
789                 addition -= n;
790                 size += n;
791         }
792         return 0;
793 }
794
795
796 /* expand the database at least size bytes by expanding the underlying
797    file and doing the mmap again if necessary */
798 static int tdb_expand(TDB_CONTEXT *tdb, tdb_off size)
799 {
800         struct list_struct rec;
801         tdb_off offset;
802
803         if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
804                 TDB_LOG((tdb, 0, "lock failed in tdb_expand\n"));
805                 return -1;
806         }
807
808         /* must know about any previous expansions by another process */
809         tdb_oob(tdb, tdb->map_size + 1, 1);
810
811         /* always make room for at least 10 more records, and round
812            the database up to a multiple of TDB_PAGE_SIZE */
813         size = TDB_ALIGN(tdb->map_size + size*10, TDB_PAGE_SIZE) - tdb->map_size;
814
815         if (!(tdb->flags & TDB_INTERNAL))
816                 tdb_munmap(tdb);
817
818         /*
819          * We must ensure the file is unmapped before doing this
820          * to ensure consistency with systems like OpenBSD where
821          * writes and mmaps are not consistent.
822          */
823
824         /* expand the file itself */
825         if (!(tdb->flags & TDB_INTERNAL)) {
826                 if (expand_file(tdb, tdb->map_size, size) != 0)
827                         goto fail;
828         }
829
830         tdb->map_size += size;
831
832         if (tdb->flags & TDB_INTERNAL)
833                 tdb->map_ptr = realloc(tdb->map_ptr, tdb->map_size);
834         else {
835                 /*
836                  * We must ensure the file is remapped before adding the space
837                  * to ensure consistency with systems like OpenBSD where
838                  * writes and mmaps are not consistent.
839                  */
840
841                 /* We're ok if the mmap fails as we'll fallback to read/write */
842                 tdb_mmap(tdb);
843         }
844
845         /* form a new freelist record */
846         memset(&rec,'\0',sizeof(rec));
847         rec.rec_len = size - sizeof(rec);
848
849         /* link it into the free list */
850         offset = tdb->map_size - size;
851         if (tdb_free(tdb, offset, &rec) == -1)
852                 goto fail;
853
854         tdb_unlock(tdb, -1, F_WRLCK);
855         return 0;
856  fail:
857         tdb_unlock(tdb, -1, F_WRLCK);
858         return -1;
859 }
860
861 /* allocate some space from the free list. The offset returned points
862    to a unconnected list_struct within the database with room for at
863    least length bytes of total data
864
865    0 is returned if the space could not be allocated
866  */
867 static tdb_off tdb_allocate(TDB_CONTEXT *tdb, tdb_len length,
868                             struct list_struct *rec)
869 {
870         tdb_off rec_ptr, last_ptr, newrec_ptr;
871         struct list_struct newrec;
872         int maxloop;
873
874         if (tdb_lock(tdb, -1, F_WRLCK) == -1)
875                 return 0;
876
877         /* Extra bytes required for tailer */
878         length += sizeof(tdb_off);
879
880  again:
881         last_ptr = FREELIST_TOP;
882
883         /* read in the freelist top */
884         if (ofs_read(tdb, FREELIST_TOP, &rec_ptr) == -1)
885                 goto fail;
886
887         /* keep looking until we find a freelist record big enough */
888         maxloop = LOOP_MAX;
889         while (rec_ptr) {
890                 if (rec_free_read(tdb, rec_ptr, rec) == -1)
891                         goto fail;
892
893                 if (rec->rec_len >= length) {
894                         /* found it - now possibly split it up  */
895                         if (rec->rec_len > length + MIN_REC_SIZE) {
896                                 /* Length of left piece */
897                                 length = TDB_ALIGN(length, TDB_ALIGNMENT);
898
899                                 /* Right piece to go on free list */
900                                 newrec.rec_len = rec->rec_len
901                                         - (sizeof(*rec) + length);
902                                 newrec_ptr = rec_ptr + sizeof(*rec) + length;
903
904                                 /* And left record is shortened */
905                                 rec->rec_len = length;
906                         } else
907                                 newrec_ptr = 0;
908
909                         /* Remove allocated record from the free list */
910                         if (ofs_write(tdb, last_ptr, &rec->next) == -1)
911                                 goto fail;
912
913                         /* Update header: do this before we drop alloc
914                            lock, otherwise tdb_free() might try to
915                            merge with us, thinking we're free.
916                            (Thanks Jeremy Allison). */
917                         rec->magic = TDB_MAGIC;
918                         if (rec_write(tdb, rec_ptr, rec) == -1)
919                                 goto fail;
920
921                         /* Did we create new block? */
922                         if (newrec_ptr) {
923                                 /* Update allocated record tailer (we
924                                    shortened it). */
925                                 if (update_tailer(tdb, rec_ptr, rec) == -1)
926                                         goto fail;
927
928                                 /* Free new record */
929                                 if (tdb_free(tdb, newrec_ptr, &newrec) == -1)
930                                         goto fail;
931                         }
932
933                         /* all done - return the new record offset */
934                         tdb_unlock(tdb, -1, F_WRLCK);
935                         return rec_ptr;
936                 }
937                 /* move to the next record */
938                 last_ptr = rec_ptr;
939                 rec_ptr = rec->next;
940
941                 maxloop--;
942                 if (maxloop == 0) {
943                         TDB_LOG((tdb, 0, "tdb_allocate: maxloop reached; corrupt database!\n"));
944                         return TDB_ERRCODE(TDB_ERR_CORRUPT, 0);
945                 }
946         }
947         /* we didn't find enough space. See if we can expand the
948            database and if we can then try again */
949         if (tdb_expand(tdb, length + sizeof(*rec)) == 0)
950                 goto again;
951  fail:
952         tdb_unlock(tdb, -1, F_WRLCK);
953         return 0;
954 }
955
956 /* initialise a new database with a specified hash size */
957 static int tdb_new_database(TDB_CONTEXT *tdb, int hash_size)
958 {
959         struct tdb_header *newdb;
960         int size, ret = -1;
961
962         /* We make it up in memory, then write it out if not internal */
963         size = sizeof(struct tdb_header) + (hash_size+1)*sizeof(tdb_off);
964         if (!(newdb = calloc(size, 1)))
965                 return TDB_ERRCODE(TDB_ERR_OOM, -1);
966
967         /* Fill in the header */
968         newdb->version = TDB_VERSION;
969         newdb->hash_size = hash_size;
970 #ifdef USE_SPINLOCKS
971         newdb->rwlocks = size;
972 #endif
973         if (tdb->flags & TDB_INTERNAL) {
974                 tdb->map_size = size;
975                 tdb->map_ptr = (char *)newdb;
976                 memcpy(&tdb->header, newdb, sizeof(tdb->header));
977                 /* Convert the `ondisk' version if asked. */
978                 CONVERT(*newdb);
979                 return 0;
980         }
981         if (lseek(tdb->fd, 0, SEEK_SET) == -1)
982                 goto fail;
983
984         if (ftruncate(tdb->fd, 0) == -1)
985                 goto fail;
986
987         /* This creates an endian-converted header, as if read from disk */
988         CONVERT(*newdb);
989         memcpy(&tdb->header, newdb, sizeof(tdb->header));
990         /* Don't endian-convert the magic food! */
991         memcpy(newdb->magic_food, TDB_MAGIC_FOOD, strlen(TDB_MAGIC_FOOD)+1);
992         if (write(tdb->fd, newdb, size) != size)
993                 ret = -1;
994         else
995                 ret = tdb_create_rwlocks(tdb->fd, hash_size);
996
997   fail:
998         SAFE_FREE(newdb);
999         return ret;
1000 }
1001
1002 /* Returns 0 on fail.  On success, return offset of record, and fills
1003    in rec */
1004 static tdb_off tdb_find(TDB_CONTEXT *tdb, TDB_DATA key, u32 hash,
1005                         struct list_struct *r)
1006 {
1007         tdb_off rec_ptr;
1008         int maxloop;
1009
1010         /* read in the hash top */
1011         if (ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
1012                 return 0;
1013
1014         /* keep looking until we find the right record */
1015         maxloop = LOOP_MAX;
1016         while (rec_ptr) {
1017                 if (rec_read(tdb, rec_ptr, r) == -1)
1018                         return 0;
1019
1020                 if (!TDB_DEAD(r) && hash==r->full_hash && key.dsize==r->key_len) {
1021                         char *k;
1022                         /* a very likely hit - read the key */
1023                         k = tdb_alloc_read(tdb, rec_ptr + sizeof(*r), 
1024                                            r->key_len);
1025                         if (!k)
1026                                 return 0;
1027
1028                         if (memcmp(key.dptr, k, key.dsize) == 0) {
1029                                 SAFE_FREE(k);
1030                                 return rec_ptr;
1031                         }
1032                         SAFE_FREE(k);
1033                 }
1034                 rec_ptr = r->next;
1035
1036                 maxloop--;
1037                 if (maxloop == 0) {
1038                         TDB_LOG((tdb, 0, "tdb_find maxloop reached; corrupt database!\n"));
1039                         return TDB_ERRCODE(TDB_ERR_CORRUPT, 0);
1040                 }
1041         }
1042         return TDB_ERRCODE(TDB_ERR_NOEXIST, 0);
1043 }
1044
1045 /* If they do lockkeys, check that this hash is one they locked */
1046 static int tdb_keylocked(TDB_CONTEXT *tdb, u32 hash)
1047 {
1048         u32 i;
1049         if (!tdb->lockedkeys)
1050                 return 1;
1051         for (i = 0; i < tdb->lockedkeys[0]; i++)
1052                 if (tdb->lockedkeys[i+1] == hash)
1053                         return 1;
1054         return TDB_ERRCODE(TDB_ERR_NOLOCK, 0);
1055 }
1056
1057 /* As tdb_find, but if you succeed, keep the lock */
1058 static tdb_off tdb_find_lock(TDB_CONTEXT *tdb, TDB_DATA key, int locktype,
1059                              struct list_struct *rec)
1060 {
1061         u32 hash, rec_ptr;
1062
1063         hash = tdb_hash(&key);
1064         if (!tdb_keylocked(tdb, hash))
1065                 return 0;
1066         if (tdb_lock(tdb, BUCKET(hash), locktype) == -1)
1067                 return 0;
1068         if (!(rec_ptr = tdb_find(tdb, key, hash, rec)))
1069                 tdb_unlock(tdb, BUCKET(hash), locktype);
1070         return rec_ptr;
1071 }
1072
1073 enum TDB_ERROR tdb_error(TDB_CONTEXT *tdb)
1074 {
1075         return tdb->ecode;
1076 }
1077
1078 static struct tdb_errname {
1079         enum TDB_ERROR ecode; const char *estring;
1080 } emap[] = { {TDB_SUCCESS, "Success"},
1081              {TDB_ERR_CORRUPT, "Corrupt database"},
1082              {TDB_ERR_IO, "IO Error"},
1083              {TDB_ERR_LOCK, "Locking error"},
1084              {TDB_ERR_OOM, "Out of memory"},
1085              {TDB_ERR_EXISTS, "Record exists"},
1086              {TDB_ERR_NOLOCK, "Lock exists on other keys"},
1087              {TDB_ERR_NOEXIST, "Record does not exist"} };
1088
1089 /* Error string for the last tdb error */
1090 const char *tdb_errorstr(TDB_CONTEXT *tdb)
1091 {
1092         u32 i;
1093         for (i = 0; i < sizeof(emap) / sizeof(struct tdb_errname); i++)
1094                 if (tdb->ecode == emap[i].ecode)
1095                         return emap[i].estring;
1096         return "Invalid error code";
1097 }
1098
1099 /* update an entry in place - this only works if the new data size
1100    is <= the old data size and the key exists.
1101    on failure return -1.
1102 */
1103
1104 static int tdb_update(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA dbuf)
1105 {
1106         struct list_struct rec;
1107         tdb_off rec_ptr;
1108
1109         /* find entry */
1110         if (!(rec_ptr = tdb_find(tdb, key, tdb_hash(&key), &rec)))
1111                 return -1;
1112
1113         /* must be long enough key, data and tailer */
1114         if (rec.rec_len < key.dsize + dbuf.dsize + sizeof(tdb_off)) {
1115                 tdb->ecode = TDB_SUCCESS; /* Not really an error */
1116                 return -1;
1117         }
1118
1119         if (tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len,
1120                       dbuf.dptr, dbuf.dsize) == -1)
1121                 return -1;
1122
1123         if (dbuf.dsize != rec.data_len) {
1124                 /* update size */
1125                 rec.data_len = dbuf.dsize;
1126                 return rec_write(tdb, rec_ptr, &rec);
1127         }
1128  
1129         return 0;
1130 }
1131
1132 /* find an entry in the database given a key */
1133 /* If an entry doesn't exist tdb_err will be set to
1134  * TDB_ERR_NOEXIST. If a key has no data attached
1135  * tdb_err will not be set. Both will return a
1136  * zero pptr and zero dsize.
1137  */
1138
1139 TDB_DATA tdb_fetch(TDB_CONTEXT *tdb, TDB_DATA key)
1140 {
1141         tdb_off rec_ptr;
1142         struct list_struct rec;
1143         TDB_DATA ret;
1144
1145         /* find which hash bucket it is in */
1146         if (!(rec_ptr = tdb_find_lock(tdb,key,F_RDLCK,&rec)))
1147                 return tdb_null;
1148
1149         if (rec.data_len)
1150                 ret.dptr = tdb_alloc_read(tdb, rec_ptr + sizeof(rec) + rec.key_len,
1151                                           rec.data_len);
1152         else
1153                 ret.dptr = NULL;
1154         ret.dsize = rec.data_len;
1155         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
1156         return ret;
1157 }
1158
1159 /* check if an entry in the database exists 
1160
1161    note that 1 is returned if the key is found and 0 is returned if not found
1162    this doesn't match the conventions in the rest of this module, but is
1163    compatible with gdbm
1164 */
1165 int tdb_exists(TDB_CONTEXT *tdb, TDB_DATA key)
1166 {
1167         struct list_struct rec;
1168         
1169         if (tdb_find_lock(tdb, key, F_RDLCK, &rec) == 0)
1170                 return 0;
1171         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
1172         return 1;
1173 }
1174
1175 /* record lock stops delete underneath */
1176 static int lock_record(TDB_CONTEXT *tdb, tdb_off off)
1177 {
1178         return off ? tdb_brlock(tdb, off, F_RDLCK, F_SETLKW, 0) : 0;
1179 }
1180 /*
1181   Write locks override our own fcntl readlocks, so check it here.
1182   Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
1183   an error to fail to get the lock here.
1184 */
1185  
1186 static int write_lock_record(TDB_CONTEXT *tdb, tdb_off off)
1187 {
1188         struct tdb_traverse_lock *i;
1189         for (i = &tdb->travlocks; i; i = i->next)
1190                 if (i->off == off)
1191                         return -1;
1192         return tdb_brlock(tdb, off, F_WRLCK, F_SETLK, 1);
1193 }
1194
1195 /*
1196   Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
1197   an error to fail to get the lock here.
1198 */
1199
1200 static int write_unlock_record(TDB_CONTEXT *tdb, tdb_off off)
1201 {
1202         return tdb_brlock(tdb, off, F_UNLCK, F_SETLK, 0);
1203 }
1204 /* fcntl locks don't stack: avoid unlocking someone else's */
1205 static int unlock_record(TDB_CONTEXT *tdb, tdb_off off)
1206 {
1207         struct tdb_traverse_lock *i;
1208         u32 count = 0;
1209
1210         if (off == 0)
1211                 return 0;
1212         for (i = &tdb->travlocks; i; i = i->next)
1213                 if (i->off == off)
1214                         count++;
1215         return (count == 1 ? tdb_brlock(tdb, off, F_UNLCK, F_SETLKW, 0) : 0);
1216 }
1217
1218 /* actually delete an entry in the database given the offset */
1219 static int do_delete(TDB_CONTEXT *tdb, tdb_off rec_ptr, struct list_struct*rec)
1220 {
1221         tdb_off last_ptr, i;
1222         struct list_struct lastrec;
1223         int maxloop;
1224
1225         if (tdb->read_only) return -1;
1226
1227         if (write_lock_record(tdb, rec_ptr) == -1) {
1228                 /* Someone traversing here: mark it as dead */
1229                 rec->magic = TDB_DEAD_MAGIC;
1230                 return rec_write(tdb, rec_ptr, rec);
1231         }
1232         if (write_unlock_record(tdb, rec_ptr) != 0)
1233                 return -1;
1234
1235         /* find previous record in hash chain */
1236         if (ofs_read(tdb, TDB_HASH_TOP(rec->full_hash), &i) == -1)
1237                 return -1;
1238
1239         maxloop = LOOP_MAX;
1240         for (last_ptr = 0; i != rec_ptr; last_ptr = i, i = lastrec.next) {
1241                 if (rec_read(tdb, i, &lastrec) == -1)
1242                         return -1;
1243
1244                 maxloop--;
1245                 if (maxloop == 0) {
1246                         TDB_LOG((tdb, 0, "(tdb)do_delete: maxloop reached; corrupt database!\n"));
1247                         return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
1248                 }
1249         }
1250
1251         /* unlink it: next ptr is at start of record. */
1252         if (last_ptr == 0)
1253                 last_ptr = TDB_HASH_TOP(rec->full_hash);
1254         if (ofs_write(tdb, last_ptr, &rec->next) == -1)
1255                 return -1;
1256
1257         /* recover the space */
1258         if (tdb_free(tdb, rec_ptr, rec) == -1)
1259                 return -1;
1260         return 0;
1261 }
1262
1263 /* Uses traverse lock: 0 = finish, -1 = error, other = record offset */
1264 static int tdb_next_lock(TDB_CONTEXT *tdb, struct tdb_traverse_lock *tlock,
1265                          struct list_struct *rec)
1266 {
1267         int want_next = (tlock->off != 0);
1268
1269         /* No traversal allows if you've called tdb_lockkeys() */
1270         if (tdb->lockedkeys)
1271                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1272
1273         /* Lock each chain from the start one. */
1274         for (; tlock->hash < tdb->header.hash_size; tlock->hash++) {
1275                 if (tdb_lock(tdb, tlock->hash, F_WRLCK) == -1)
1276                         return -1;
1277
1278                 /* No previous record?  Start at top of chain. */
1279                 if (!tlock->off) {
1280                         if (ofs_read(tdb, TDB_HASH_TOP(tlock->hash),
1281                                      &tlock->off) == -1)
1282                                 goto fail;
1283                 } else {
1284                         /* Otherwise unlock the previous record. */
1285                         if (unlock_record(tdb, tlock->off) != 0)
1286                                 goto fail;
1287                 }
1288
1289                 if (want_next) {
1290                         /* We have offset of old record: grab next */
1291                         if (rec_read(tdb, tlock->off, rec) == -1)
1292                                 goto fail;
1293                         tlock->off = rec->next;
1294                 }
1295
1296                 /* Iterate through chain */
1297                 while( tlock->off) {
1298                         tdb_off current;
1299                         if (rec_read(tdb, tlock->off, rec) == -1)
1300                                 goto fail;
1301                         if (!TDB_DEAD(rec)) {
1302                                 /* Woohoo: we found one! */
1303                                 if (lock_record(tdb, tlock->off) != 0)
1304                                         goto fail;
1305                                 return tlock->off;
1306                         }
1307                         /* Try to clean dead ones from old traverses */
1308                         current = tlock->off;
1309                         tlock->off = rec->next;
1310                         if (!tdb->read_only && 
1311                             do_delete(tdb, current, rec) != 0)
1312                                 goto fail;
1313                 }
1314                 tdb_unlock(tdb, tlock->hash, F_WRLCK);
1315                 want_next = 0;
1316         }
1317         /* We finished iteration without finding anything */
1318         return TDB_ERRCODE(TDB_SUCCESS, 0);
1319
1320  fail:
1321         tlock->off = 0;
1322         if (tdb_unlock(tdb, tlock->hash, F_WRLCK) != 0)
1323                 TDB_LOG((tdb, 0, "tdb_next_lock: On error unlock failed!\n"));
1324         return -1;
1325 }
1326
1327 /* traverse the entire database - calling fn(tdb, key, data) on each element.
1328    return -1 on error or the record count traversed
1329    if fn is NULL then it is not called
1330    a non-zero return value from fn() indicates that the traversal should stop
1331   */
1332 int tdb_traverse(TDB_CONTEXT *tdb, tdb_traverse_func fn, void *state)
1333 {
1334         TDB_DATA key, dbuf;
1335         struct list_struct rec;
1336         struct tdb_traverse_lock tl = { NULL, 0, 0 };
1337         int ret, count = 0;
1338
1339         /* This was in the initializaton, above, but the IRIX compiler
1340          * did not like it.  crh
1341          */
1342         tl.next = tdb->travlocks.next;
1343
1344         /* fcntl locks don't stack: beware traverse inside traverse */
1345         tdb->travlocks.next = &tl;
1346
1347         /* tdb_next_lock places locks on the record returned, and its chain */
1348         while ((ret = tdb_next_lock(tdb, &tl, &rec)) > 0) {
1349                 count++;
1350                 /* now read the full record */
1351                 key.dptr = tdb_alloc_read(tdb, tl.off + sizeof(rec), 
1352                                           rec.key_len + rec.data_len);
1353                 if (!key.dptr) {
1354                         ret = -1;
1355                         if (tdb_unlock(tdb, tl.hash, F_WRLCK) != 0)
1356                                 goto out;
1357                         if (unlock_record(tdb, tl.off) != 0)
1358                                 TDB_LOG((tdb, 0, "tdb_traverse: key.dptr == NULL and unlock_record failed!\n"));
1359                         goto out;
1360                 }
1361                 key.dsize = rec.key_len;
1362                 dbuf.dptr = key.dptr + rec.key_len;
1363                 dbuf.dsize = rec.data_len;
1364
1365                 /* Drop chain lock, call out */
1366                 if (tdb_unlock(tdb, tl.hash, F_WRLCK) != 0) {
1367                         ret = -1;
1368                         goto out;
1369                 }
1370                 if (fn && fn(tdb, key, dbuf, state)) {
1371                         /* They want us to terminate traversal */
1372                         ret = count;
1373                         if (unlock_record(tdb, tl.off) != 0) {
1374                                 TDB_LOG((tdb, 0, "tdb_traverse: unlock_record failed!\n"));;
1375                                 ret = -1;
1376                         }
1377                         tdb->travlocks.next = tl.next;
1378                         SAFE_FREE(key.dptr);
1379                         return count;
1380                 }
1381                 SAFE_FREE(key.dptr);
1382         }
1383 out:
1384         tdb->travlocks.next = tl.next;
1385         if (ret < 0)
1386                 return -1;
1387         else
1388                 return count;
1389 }
1390
1391 /* find the first entry in the database and return its key */
1392 TDB_DATA tdb_firstkey(TDB_CONTEXT *tdb)
1393 {
1394         TDB_DATA key;
1395         struct list_struct rec;
1396
1397         /* release any old lock */
1398         if (unlock_record(tdb, tdb->travlocks.off) != 0)
1399                 return tdb_null;
1400         tdb->travlocks.off = tdb->travlocks.hash = 0;
1401
1402         if (tdb_next_lock(tdb, &tdb->travlocks, &rec) <= 0)
1403                 return tdb_null;
1404         /* now read the key */
1405         key.dsize = rec.key_len;
1406         key.dptr =tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),key.dsize);
1407         if (tdb_unlock(tdb, BUCKET(tdb->travlocks.hash), F_WRLCK) != 0)
1408                 TDB_LOG((tdb, 0, "tdb_firstkey: error occurred while tdb_unlocking!\n"));
1409         return key;
1410 }
1411
1412 /* find the next entry in the database, returning its key */
1413 TDB_DATA tdb_nextkey(TDB_CONTEXT *tdb, TDB_DATA oldkey)
1414 {
1415         u32 oldhash;
1416         TDB_DATA key = tdb_null;
1417         struct list_struct rec;
1418         char *k = NULL;
1419
1420         /* Is locked key the old key?  If so, traverse will be reliable. */
1421         if (tdb->travlocks.off) {
1422                 if (tdb_lock(tdb,tdb->travlocks.hash,F_WRLCK))
1423                         return tdb_null;
1424                 if (rec_read(tdb, tdb->travlocks.off, &rec) == -1
1425                     || !(k = tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),
1426                                             rec.key_len))
1427                     || memcmp(k, oldkey.dptr, oldkey.dsize) != 0) {
1428                         /* No, it wasn't: unlock it and start from scratch */
1429                         if (unlock_record(tdb, tdb->travlocks.off) != 0)
1430                                 return tdb_null;
1431                         if (tdb_unlock(tdb, tdb->travlocks.hash, F_WRLCK) != 0)
1432                                 return tdb_null;
1433                         tdb->travlocks.off = 0;
1434                 }
1435
1436                 SAFE_FREE(k);
1437         }
1438
1439         if (!tdb->travlocks.off) {
1440                 /* No previous element: do normal find, and lock record */
1441                 tdb->travlocks.off = tdb_find_lock(tdb, oldkey, F_WRLCK, &rec);
1442                 if (!tdb->travlocks.off)
1443                         return tdb_null;
1444                 tdb->travlocks.hash = BUCKET(rec.full_hash);
1445                 if (lock_record(tdb, tdb->travlocks.off) != 0) {
1446                         TDB_LOG((tdb, 0, "tdb_nextkey: lock_record failed (%s)!\n", strerror(errno)));
1447                         return tdb_null;
1448                 }
1449         }
1450         oldhash = tdb->travlocks.hash;
1451
1452         /* Grab next record: locks chain and returned record,
1453            unlocks old record */
1454         if (tdb_next_lock(tdb, &tdb->travlocks, &rec) > 0) {
1455                 key.dsize = rec.key_len;
1456                 key.dptr = tdb_alloc_read(tdb, tdb->travlocks.off+sizeof(rec),
1457                                           key.dsize);
1458                 /* Unlock the chain of this new record */
1459                 if (tdb_unlock(tdb, tdb->travlocks.hash, F_WRLCK) != 0)
1460                         TDB_LOG((tdb, 0, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
1461         }
1462         /* Unlock the chain of old record */
1463         if (tdb_unlock(tdb, BUCKET(oldhash), F_WRLCK) != 0)
1464                 TDB_LOG((tdb, 0, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
1465         return key;
1466 }
1467
1468 /* delete an entry in the database given a key */
1469 int tdb_delete(TDB_CONTEXT *tdb, TDB_DATA key)
1470 {
1471         tdb_off rec_ptr;
1472         struct list_struct rec;
1473         int ret;
1474
1475         if (!(rec_ptr = tdb_find_lock(tdb, key, F_WRLCK, &rec)))
1476                 return -1;
1477         ret = do_delete(tdb, rec_ptr, &rec);
1478         if (tdb_unlock(tdb, BUCKET(rec.full_hash), F_WRLCK) != 0)
1479                 TDB_LOG((tdb, 0, "tdb_delete: WARNING tdb_unlock failed!\n"));
1480         return ret;
1481 }
1482
1483 /* store an element in the database, replacing any existing element
1484    with the same key 
1485
1486    return 0 on success, -1 on failure
1487 */
1488 int tdb_store(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
1489 {
1490         struct list_struct rec;
1491         u32 hash;
1492         tdb_off rec_ptr;
1493         char *p = NULL;
1494         int ret = 0;
1495
1496         /* find which hash bucket it is in */
1497         hash = tdb_hash(&key);
1498         if (!tdb_keylocked(tdb, hash))
1499                 return -1;
1500         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
1501                 return -1;
1502
1503         /* check for it existing, on insert. */
1504         if (flag == TDB_INSERT) {
1505                 if (tdb_exists(tdb, key)) {
1506                         tdb->ecode = TDB_ERR_EXISTS;
1507                         goto fail;
1508                 }
1509         } else {
1510                 /* first try in-place update, on modify or replace. */
1511                 if (tdb_update(tdb, key, dbuf) == 0)
1512                         goto out;
1513                 if (flag == TDB_MODIFY && tdb->ecode == TDB_ERR_NOEXIST)
1514                         goto fail;
1515         }
1516         /* reset the error code potentially set by the tdb_update() */
1517         tdb->ecode = TDB_SUCCESS;
1518
1519         /* delete any existing record - if it doesn't exist we don't
1520            care.  Doing this first reduces fragmentation, and avoids
1521            coalescing with `allocated' block before it's updated. */
1522         if (flag != TDB_INSERT)
1523                 tdb_delete(tdb, key);
1524
1525         /* Copy key+value *before* allocating free space in case malloc
1526            fails and we are left with a dead spot in the tdb. */
1527
1528         if (!(p = (char *)malloc(key.dsize + dbuf.dsize))) {
1529                 tdb->ecode = TDB_ERR_OOM;
1530                 goto fail;
1531         }
1532
1533         memcpy(p, key.dptr, key.dsize);
1534         if (dbuf.dsize)
1535                 memcpy(p+key.dsize, dbuf.dptr, dbuf.dsize);
1536
1537         /* now we're into insert / modify / replace of a record which
1538          * we know could not be optimised by an in-place store (for
1539          * various reasons).  */
1540         if (!(rec_ptr = tdb_allocate(tdb, key.dsize + dbuf.dsize, &rec)))
1541                 goto fail;
1542
1543         /* Read hash top into next ptr */
1544         if (ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
1545                 goto fail;
1546
1547         rec.key_len = key.dsize;
1548         rec.data_len = dbuf.dsize;
1549         rec.full_hash = hash;
1550         rec.magic = TDB_MAGIC;
1551
1552         /* write out and point the top of the hash chain at it */
1553         if (rec_write(tdb, rec_ptr, &rec) == -1
1554             || tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+dbuf.dsize)==-1
1555             || ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
1556                 /* Need to tdb_unallocate() here */
1557                 goto fail;
1558         }
1559  out:
1560         SAFE_FREE(p); 
1561         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
1562         return ret;
1563 fail:
1564         ret = -1;
1565         goto out;
1566 }
1567
1568 /* Attempt to append data to an entry in place - this only works if the new data size
1569    is <= the old data size and the key exists.
1570    on failure return -1. Record must be locked before calling.
1571 */
1572 static int tdb_append_inplace(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA new_dbuf)
1573 {
1574         struct list_struct rec;
1575         tdb_off rec_ptr;
1576
1577         /* find entry */
1578         if (!(rec_ptr = tdb_find(tdb, key, tdb_hash(&key), &rec)))
1579                 return -1;
1580
1581         /* Append of 0 is always ok. */
1582         if (new_dbuf.dsize == 0)
1583                 return 0;
1584
1585         /* must be long enough for key, old data + new data and tailer */
1586         if (rec.rec_len < key.dsize + rec.data_len + new_dbuf.dsize + sizeof(tdb_off)) {
1587                 /* No room. */
1588                 tdb->ecode = TDB_SUCCESS; /* Not really an error */
1589                 return -1;
1590         }
1591
1592         if (tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len + rec.data_len,
1593                       new_dbuf.dptr, new_dbuf.dsize) == -1)
1594                 return -1;
1595
1596         /* update size */
1597         rec.data_len += new_dbuf.dsize;
1598         return rec_write(tdb, rec_ptr, &rec);
1599 }
1600
1601 /* Append to an entry. Create if not exist. */
1602
1603 int tdb_append(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA new_dbuf)
1604 {
1605         struct list_struct rec;
1606         u32 hash;
1607         tdb_off rec_ptr;
1608         char *p = NULL;
1609         int ret = 0;
1610         size_t new_data_size = 0;
1611
1612         /* find which hash bucket it is in */
1613         hash = tdb_hash(&key);
1614         if (!tdb_keylocked(tdb, hash))
1615                 return -1;
1616         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
1617                 return -1;
1618
1619         /* first try in-place. */
1620         if (tdb_append_inplace(tdb, key, new_dbuf) == 0)
1621                 goto out;
1622
1623         /* reset the error code potentially set by the tdb_append_inplace() */
1624         tdb->ecode = TDB_SUCCESS;
1625
1626         /* find entry */
1627         if (!(rec_ptr = tdb_find(tdb, key, hash, &rec))) {
1628                 if (tdb->ecode != TDB_ERR_NOEXIST)
1629                         goto fail;
1630
1631                 /* Not found - create. */
1632
1633                 ret = tdb_store(tdb, key, new_dbuf, TDB_INSERT);
1634                 goto out;
1635         }
1636
1637         new_data_size = rec.data_len + new_dbuf.dsize;
1638
1639         /* Copy key+old_value+value *before* allocating free space in case malloc
1640            fails and we are left with a dead spot in the tdb. */
1641
1642         if (!(p = (char *)malloc(key.dsize + new_data_size))) {
1643                 tdb->ecode = TDB_ERR_OOM;
1644                 goto fail;
1645         }
1646
1647         /* Copy the key in place. */
1648         memcpy(p, key.dptr, key.dsize);
1649
1650         /* Now read the old data into place. */
1651         if (rec.data_len &&
1652                 tdb_read(tdb, rec_ptr + sizeof(rec) + rec.key_len, p + key.dsize, rec.data_len, 0) == -1)
1653                         goto fail;
1654
1655         /* Finally append the new data. */
1656         if (new_dbuf.dsize)
1657                 memcpy(p+key.dsize+rec.data_len, new_dbuf.dptr, new_dbuf.dsize);
1658
1659         /* delete any existing record - if it doesn't exist we don't
1660            care.  Doing this first reduces fragmentation, and avoids
1661            coalescing with `allocated' block before it's updated. */
1662
1663         tdb_delete(tdb, key);
1664
1665         if (!(rec_ptr = tdb_allocate(tdb, key.dsize + new_data_size, &rec)))
1666                 goto fail;
1667
1668         /* Read hash top into next ptr */
1669         if (ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
1670                 goto fail;
1671
1672         rec.key_len = key.dsize;
1673         rec.data_len = new_data_size;
1674         rec.full_hash = hash;
1675         rec.magic = TDB_MAGIC;
1676
1677         /* write out and point the top of the hash chain at it */
1678         if (rec_write(tdb, rec_ptr, &rec) == -1
1679             || tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+new_data_size)==-1
1680             || ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
1681                 /* Need to tdb_unallocate() here */
1682                 goto fail;
1683         }
1684
1685  out:
1686         SAFE_FREE(p); 
1687         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
1688         return ret;
1689
1690 fail:
1691         ret = -1;
1692         goto out;
1693 }
1694
1695 static int tdb_already_open(dev_t device,
1696                             ino_t ino)
1697 {
1698         TDB_CONTEXT *i;
1699         
1700         for (i = tdbs; i; i = i->next) {
1701                 if (i->device == device && i->inode == ino) {
1702                         return 1;
1703                 }
1704         }
1705
1706         return 0;
1707 }
1708
1709 /* open the database, creating it if necessary 
1710
1711    The open_flags and mode are passed straight to the open call on the
1712    database file. A flags value of O_WRONLY is invalid. The hash size
1713    is advisory, use zero for a default value.
1714
1715    Return is NULL on error, in which case errno is also set.  Don't 
1716    try to call tdb_error or tdb_errname, just do strerror(errno).
1717
1718    @param name may be NULL for internal databases. */
1719 TDB_CONTEXT *tdb_open(const char *name, int hash_size, int tdb_flags,
1720                       int open_flags, mode_t mode)
1721 {
1722         return tdb_open_ex(name, hash_size, tdb_flags, open_flags, mode, NULL);
1723 }
1724
1725
1726 TDB_CONTEXT *tdb_open_ex(const char *name, int hash_size, int tdb_flags,
1727                          int open_flags, mode_t mode,
1728                          tdb_log_func log_fn)
1729 {
1730         TDB_CONTEXT *tdb;
1731         struct stat st;
1732         int rev = 0, locked;
1733         unsigned char *vp;
1734         u32 vertest;
1735
1736         if (!(tdb = calloc(1, sizeof *tdb))) {
1737                 /* Can't log this */
1738                 errno = ENOMEM;
1739                 goto fail;
1740         }
1741         tdb->fd = -1;
1742         tdb->name = NULL;
1743         tdb->map_ptr = NULL;
1744         tdb->lockedkeys = NULL;
1745         tdb->flags = tdb_flags;
1746         tdb->open_flags = open_flags;
1747         tdb->log_fn = log_fn;
1748         
1749         if ((open_flags & O_ACCMODE) == O_WRONLY) {
1750                 TDB_LOG((tdb, 0, "tdb_open_ex: can't open tdb %s write-only\n",
1751                          name));
1752                 errno = EINVAL;
1753                 goto fail;
1754         }
1755         
1756         if (hash_size == 0)
1757                 hash_size = DEFAULT_HASH_SIZE;
1758         if ((open_flags & O_ACCMODE) == O_RDONLY) {
1759                 tdb->read_only = 1;
1760                 /* read only databases don't do locking or clear if first */
1761                 tdb->flags |= TDB_NOLOCK;
1762                 tdb->flags &= ~TDB_CLEAR_IF_FIRST;
1763         }
1764
1765         /* internal databases don't mmap or lock, and start off cleared */
1766         if (tdb->flags & TDB_INTERNAL) {
1767                 tdb->flags |= (TDB_NOLOCK | TDB_NOMMAP);
1768                 tdb->flags &= ~TDB_CLEAR_IF_FIRST;
1769                 if (tdb_new_database(tdb, hash_size) != 0) {
1770                         TDB_LOG((tdb, 0, "tdb_open_ex: tdb_new_database failed!"));
1771                         goto fail;
1772                 }
1773                 goto internal;
1774         }
1775
1776         if ((tdb->fd = open(name, open_flags, mode)) == -1) {
1777                 TDB_LOG((tdb, 5, "tdb_open_ex: could not open file %s: %s\n",
1778                          name, strerror(errno)));
1779                 goto fail;      /* errno set by open(2) */
1780         }
1781
1782         /* 
1783            Close file when execing another process.  
1784            Prevents SELinux access errors.
1785         */
1786         set_cloexec_flag(tdb->fd, 1);
1787
1788         /* ensure there is only one process initialising at once */
1789         if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0) == -1) {
1790                 TDB_LOG((tdb, 0, "tdb_open_ex: failed to get global lock on %s: %s\n",
1791                          name, strerror(errno)));
1792                 goto fail;      /* errno set by tdb_brlock */
1793         }
1794
1795         /* we need to zero database if we are the only one with it open */
1796         if ((locked = (tdb_brlock(tdb, ACTIVE_LOCK, F_WRLCK, F_SETLK, 0) == 0))
1797             && (tdb_flags & TDB_CLEAR_IF_FIRST)) {
1798                 open_flags |= O_CREAT;
1799                 if (ftruncate(tdb->fd, 0) == -1) {
1800                         TDB_LOG((tdb, 0, "tdb_open_ex: "
1801                                  "failed to truncate %s: %s\n",
1802                                  name, strerror(errno)));
1803                         goto fail; /* errno set by ftruncate */
1804                 }
1805         }
1806
1807         if (read(tdb->fd, &tdb->header, sizeof(tdb->header)) != sizeof(tdb->header)
1808             || strcmp(tdb->header.magic_food, TDB_MAGIC_FOOD) != 0
1809             || (tdb->header.version != TDB_VERSION
1810                 && !(rev = (tdb->header.version==TDB_BYTEREV(TDB_VERSION))))) {
1811                 /* its not a valid database - possibly initialise it */
1812                 if (!(open_flags & O_CREAT) || tdb_new_database(tdb, hash_size) == -1) {
1813                         errno = EIO; /* ie bad format or something */
1814                         goto fail;
1815                 }
1816                 rev = (tdb->flags & TDB_CONVERT);
1817         }
1818         vp = (unsigned char *)&tdb->header.version;
1819         vertest = (((u32)vp[0]) << 24) | (((u32)vp[1]) << 16) |
1820                   (((u32)vp[2]) << 8) | (u32)vp[3];
1821         tdb->flags |= (vertest==TDB_VERSION) ? TDB_BIGENDIAN : 0;
1822         if (!rev)
1823                 tdb->flags &= ~TDB_CONVERT;
1824         else {
1825                 tdb->flags |= TDB_CONVERT;
1826                 convert(&tdb->header, sizeof(tdb->header));
1827         }
1828         if (fstat(tdb->fd, &st) == -1)
1829                 goto fail;
1830
1831         /* Is it already in the open list?  If so, fail. */
1832         if (tdb_already_open(st.st_dev, st.st_ino)) {
1833                 TDB_LOG((tdb, 2, "tdb_open_ex: "
1834                          "%s (%d:%d,%lld) is already open in this process\n",
1835                          name, major(st.st_dev), minor(st.st_dev), (unsigned long long)st.st_ino));
1836                 errno = EBUSY;
1837                 goto fail;
1838         }
1839
1840         if (!(tdb->name = (char *)strdup(name))) {
1841                 errno = ENOMEM;
1842                 goto fail;
1843         }
1844
1845         tdb->map_size = st.st_size;
1846         tdb->device = st.st_dev;
1847         tdb->inode = st.st_ino;
1848         tdb->locked = calloc(tdb->header.hash_size+1, sizeof(tdb->locked[0]));
1849         if (!tdb->locked) {
1850                 TDB_LOG((tdb, 2, "tdb_open_ex: "
1851                          "failed to allocate lock structure for %s\n",
1852                          name));
1853                 errno = ENOMEM;
1854                 goto fail;
1855         }
1856         tdb_mmap(tdb);
1857         if (locked) {
1858                 if (!tdb->read_only)
1859                         if (tdb_clear_spinlocks(tdb) != 0) {
1860                                 TDB_LOG((tdb, 0, "tdb_open_ex: "
1861                                 "failed to clear spinlock\n"));
1862                                 goto fail;
1863                         }
1864                 if (tdb_brlock(tdb, ACTIVE_LOCK, F_UNLCK, F_SETLK, 0) == -1) {
1865                         TDB_LOG((tdb, 0, "tdb_open_ex: "
1866                                  "failed to take ACTIVE_LOCK on %s: %s\n",
1867                                  name, strerror(errno)));
1868                         goto fail;
1869                 }
1870         }
1871         /* leave this lock in place to indicate it's in use */
1872         if (tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0) == -1)
1873                 goto fail;
1874
1875  internal:
1876         /* Internal (memory-only) databases skip all the code above to
1877          * do with disk files, and resume here by releasing their
1878          * global lock and hooking into the active list. */
1879         if (tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0) == -1)
1880                 goto fail;
1881         tdb->next = tdbs;
1882         tdbs = tdb;
1883         return tdb;
1884
1885  fail:
1886         { int save_errno = errno;
1887
1888         if (!tdb)
1889                 return NULL;
1890         
1891         if (tdb->map_ptr) {
1892                 if (tdb->flags & TDB_INTERNAL)
1893                         SAFE_FREE(tdb->map_ptr);
1894                 else
1895                         tdb_munmap(tdb);
1896         }
1897         SAFE_FREE(tdb->name);
1898         if (tdb->fd != -1)
1899                 if (close(tdb->fd) != 0)
1900                         TDB_LOG((tdb, 5, "tdb_open_ex: failed to close tdb->fd on error!\n"));
1901         SAFE_FREE(tdb->locked);
1902         SAFE_FREE(tdb);
1903         errno = save_errno;
1904         return NULL;
1905         }
1906 }
1907
1908 /**
1909  * Close a database.
1910  *
1911  * @returns -1 for error; 0 for success.
1912  **/
1913 int tdb_close(TDB_CONTEXT *tdb)
1914 {
1915         TDB_CONTEXT **i;
1916         int ret = 0;
1917
1918         if (tdb->map_ptr) {
1919                 if (tdb->flags & TDB_INTERNAL)
1920                         SAFE_FREE(tdb->map_ptr);
1921                 else
1922                         tdb_munmap(tdb);
1923         }
1924         SAFE_FREE(tdb->name);
1925         if (tdb->fd != -1)
1926                 ret = close(tdb->fd);
1927         SAFE_FREE(tdb->locked);
1928         SAFE_FREE(tdb->lockedkeys);
1929
1930         /* Remove from contexts list */
1931         for (i = &tdbs; *i; i = &(*i)->next) {
1932                 if (*i == tdb) {
1933                         *i = tdb->next;
1934                         break;
1935                 }
1936         }
1937
1938         memset(tdb, 0, sizeof(*tdb));
1939         SAFE_FREE(tdb);
1940
1941         return ret;
1942 }
1943
1944 /* lock/unlock entire database */
1945 int tdb_lockall(TDB_CONTEXT *tdb)
1946 {
1947         u32 i;
1948
1949         /* There are no locks on read-only dbs */
1950         if (tdb->read_only)
1951                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
1952         if (tdb->lockedkeys)
1953                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1954         for (i = 0; i < tdb->header.hash_size; i++) 
1955                 if (tdb_lock(tdb, i, F_WRLCK))
1956                         break;
1957
1958         /* If error, release locks we have... */
1959         if (i < tdb->header.hash_size) {
1960                 u32 j;
1961
1962                 for ( j = 0; j < i; j++)
1963                         tdb_unlock(tdb, j, F_WRLCK);
1964                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1965         }
1966
1967         return 0;
1968 }
1969 void tdb_unlockall(TDB_CONTEXT *tdb)
1970 {
1971         u32 i;
1972         for (i=0; i < tdb->header.hash_size; i++)
1973                 tdb_unlock(tdb, i, F_WRLCK);
1974 }
1975
1976 int tdb_lockkeys(TDB_CONTEXT *tdb, u32 number, TDB_DATA keys[])
1977 {
1978         u32 i, j, hash;
1979
1980         /* Can't lock more keys if already locked */
1981         if (tdb->lockedkeys)
1982                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1983         if (!(tdb->lockedkeys = malloc(sizeof(u32) * (number+1))))
1984                 return TDB_ERRCODE(TDB_ERR_OOM, -1);
1985         /* First number in array is # keys */
1986         tdb->lockedkeys[0] = number;
1987
1988         /* Insertion sort by bucket */
1989         for (i = 0; i < number; i++) {
1990                 hash = tdb_hash(&keys[i]);
1991                 for (j = 0; j < i && BUCKET(tdb->lockedkeys[j+1]) < BUCKET(hash); j++);
1992                         memmove(&tdb->lockedkeys[j+2], &tdb->lockedkeys[j+1], sizeof(u32) * (i-j));
1993                 tdb->lockedkeys[j+1] = hash;
1994         }
1995         /* Finally, lock in order */
1996         for (i = 0; i < number; i++)
1997                 if (tdb_lock(tdb, i, F_WRLCK))
1998                         break;
1999
2000         /* If error, release locks we have... */
2001         if (i < number) {
2002                 for ( j = 0; j < i; j++)
2003                         tdb_unlock(tdb, j, F_WRLCK);
2004                 SAFE_FREE(tdb->lockedkeys);
2005                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
2006         }
2007         return 0;
2008 }
2009
2010 /* Unlock the keys previously locked by tdb_lockkeys() */
2011 void tdb_unlockkeys(TDB_CONTEXT *tdb)
2012 {
2013         u32 i;
2014         if (!tdb->lockedkeys)
2015                 return;
2016         for (i = 0; i < tdb->lockedkeys[0]; i++)
2017                 tdb_unlock(tdb, tdb->lockedkeys[i+1], F_WRLCK);
2018         SAFE_FREE(tdb->lockedkeys);
2019 }
2020
2021 /* lock/unlock one hash chain. This is meant to be used to reduce
2022    contention - it cannot guarantee how many records will be locked */
2023 int tdb_chainlock(TDB_CONTEXT *tdb, TDB_DATA key)
2024 {
2025         return tdb_lock(tdb, BUCKET(tdb_hash(&key)), F_WRLCK);
2026 }
2027
2028 int tdb_chainunlock(TDB_CONTEXT *tdb, TDB_DATA key)
2029 {
2030         return tdb_unlock(tdb, BUCKET(tdb_hash(&key)), F_WRLCK);
2031 }
2032
2033 int tdb_chainlock_read(TDB_CONTEXT *tdb, TDB_DATA key)
2034 {
2035         return tdb_lock(tdb, BUCKET(tdb_hash(&key)), F_RDLCK);
2036 }
2037
2038 int tdb_chainunlock_read(TDB_CONTEXT *tdb, TDB_DATA key)
2039 {
2040         return tdb_unlock(tdb, BUCKET(tdb_hash(&key)), F_RDLCK);
2041 }
2042
2043
2044 /* register a loging function */
2045 void tdb_logging_function(TDB_CONTEXT *tdb, void (*fn)(TDB_CONTEXT *, int , const char *, ...))
2046 {
2047         tdb->log_fn = fn;
2048 }
2049
2050
2051 /* reopen a tdb - this is used after a fork to ensure that we have an independent
2052    seek pointer from our parent and to re-establish locks */
2053 int tdb_reopen(TDB_CONTEXT *tdb)
2054 {
2055         struct stat st;
2056
2057         if (tdb_munmap(tdb) != 0) {
2058                 TDB_LOG((tdb, 0, "tdb_reopen: munmap failed (%s)\n", strerror(errno)));
2059                 goto fail;
2060         }
2061         if (close(tdb->fd) != 0)
2062                 TDB_LOG((tdb, 0, "tdb_reopen: WARNING closing tdb->fd failed!\n"));
2063         tdb->fd = open(tdb->name, tdb->open_flags & ~(O_CREAT|O_TRUNC), 0);
2064         if (tdb->fd == -1) {
2065                 TDB_LOG((tdb, 0, "tdb_reopen: open failed (%s)\n", strerror(errno)));
2066                 goto fail;
2067         }
2068         if (fstat(tdb->fd, &st) != 0) {
2069                 TDB_LOG((tdb, 0, "tdb_reopen: fstat failed (%s)\n", strerror(errno)));
2070                 goto fail;
2071         }
2072         if (st.st_ino != tdb->inode || st.st_dev != tdb->device) {
2073                 TDB_LOG((tdb, 0, "tdb_reopen: file dev/inode has changed!\n"));
2074                 goto fail;
2075         }
2076         tdb_mmap(tdb);
2077         if (tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0) == -1) {
2078                 TDB_LOG((tdb, 0, "tdb_reopen: failed to obtain active lock\n"));
2079                 goto fail;
2080         }
2081
2082         return 0;
2083
2084 fail:
2085         tdb_close(tdb);
2086         return -1;
2087 }
2088
2089 /* reopen all tdb's */
2090 int tdb_reopen_all(void)
2091 {
2092         TDB_CONTEXT *tdb;
2093
2094         for (tdb=tdbs; tdb; tdb = tdb->next) {
2095                 if (tdb_reopen(tdb) != 0) return -1;
2096         }
2097
2098         return 0;
2099 }