chiark / gitweb /
[PATCH] 014_bk mark
[elogind.git] / tdb / tdb.c
1  /* 
2    Unix SMB/CIFS implementation.
3    Samba database functions
4    Copyright (C) Andrew Tridgell              1999-2000
5    Copyright (C) Luke Kenneth Casson Leighton      2000
6    Copyright (C) Paul `Rusty' Russell              2000
7    Copyright (C) Jeremy Allison                    2000-2003
8    
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 2 of the License, or
12    (at your option) any later version.
13    
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18    
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22 */
23
24
25 /* NOTE: If you use tdbs under valgrind, and in particular if you run
26  * tdbtorture, you may get spurious "uninitialized value" warnings.  I
27  * think this is because valgrind doesn't understand that the mmap'd
28  * area may be written to by other processes.  Memory can, from the
29  * point of view of the grinded process, spontaneously become
30  * initialized.
31  *
32  * I can think of a few solutions.  [mbp 20030311]
33  *
34  * 1 - Write suppressions for Valgrind so that it doesn't complain
35  * about this.  Probably the most reasonable but people need to
36  * remember to use them.
37  *
38  * 2 - Use IO not mmap when running under valgrind.  Not so nice.
39  *
40  * 3 - Use the special valgrind macros to mark memory as valid at the
41  * right time.  Probably too hard -- the process just doesn't know.
42  */ 
43
44 /* udev defines */
45 #define STANDALONE
46 #define TDB_DEBUG
47 #define HAVE_MMAP       1
48
49
50 #ifdef STANDALONE
51 #if HAVE_CONFIG_H
52 #include <config.h>
53 #endif
54
55 #define _KLIBC_HAS_ARCH_SIG_ATOMIC_T
56 #include <stdlib.h>
57 #include <stdio.h>
58 #include <fcntl.h>
59 #include <unistd.h>
60 #include <string.h>
61 #include <fcntl.h>
62 #include <errno.h>
63 #include <sys/mman.h>
64 #include <sys/stat.h>
65 #include <signal.h>
66 #include "tdb.h"
67 #include "spinlock.h"
68 #else
69 #include "includes.h"
70 #endif
71
72 #define TDB_MAGIC_FOOD "TDB file\n"
73 #define TDB_VERSION (0x26011967 + 6)
74 #define TDB_MAGIC (0x26011999U)
75 #define TDB_FREE_MAGIC (~TDB_MAGIC)
76 #define TDB_DEAD_MAGIC (0xFEE1DEAD)
77 #define TDB_ALIGNMENT 4
78 #define MIN_REC_SIZE (2*sizeof(struct list_struct) + TDB_ALIGNMENT)
79 #define DEFAULT_HASH_SIZE 131
80 #define TDB_PAGE_SIZE 0x2000
81 #define FREELIST_TOP (sizeof(struct tdb_header))
82 #define TDB_ALIGN(x,a) (((x) + (a)-1) & ~((a)-1))
83 #define TDB_BYTEREV(x) (((((x)&0xff)<<24)|((x)&0xFF00)<<8)|(((x)>>8)&0xFF00)|((x)>>24))
84 #define TDB_DEAD(r) ((r)->magic == TDB_DEAD_MAGIC)
85 #define TDB_BAD_MAGIC(r) ((r)->magic != TDB_MAGIC && !TDB_DEAD(r))
86 #define TDB_HASH_TOP(hash) (FREELIST_TOP + (BUCKET(hash)+1)*sizeof(tdb_off))
87
88 /* NB assumes there is a local variable called "tdb" that is the
89  * current context, also takes doubly-parenthesized print-style
90  * argument. */
91 #define TDB_LOG(x) (tdb->log_fn?((tdb->log_fn x),0) : 0)
92
93 /* lock offsets */
94 #define GLOBAL_LOCK 0
95 #define ACTIVE_LOCK 4
96
97 #ifndef MAP_FILE
98 #define MAP_FILE 0
99 #endif
100
101 #ifndef MAP_FAILED
102 #define MAP_FAILED ((void *)-1)
103 #endif
104
105 /* free memory if the pointer is valid and zero the pointer */
106 #ifndef SAFE_FREE
107 #define SAFE_FREE(x) do { if ((x) != NULL) {free((x)); (x)=NULL;} } while(0)
108 #endif
109
110 #define BUCKET(hash) ((hash) % tdb->header.hash_size)
111 TDB_DATA tdb_null;
112
113 /* all contexts, to ensure no double-opens (fcntl locks don't nest!) */
114 static TDB_CONTEXT *tdbs = NULL;
115
116 static int tdb_munmap(TDB_CONTEXT *tdb)
117 {
118         if (tdb->flags & TDB_INTERNAL)
119                 return 0;
120
121 #ifdef HAVE_MMAP
122         if (tdb->map_ptr) {
123                 int ret = munmap(tdb->map_ptr, tdb->map_size);
124                 if (ret != 0)
125                         return ret;
126         }
127 #endif
128         tdb->map_ptr = NULL;
129         return 0;
130 }
131
132 static void tdb_mmap(TDB_CONTEXT *tdb)
133 {
134         if (tdb->flags & TDB_INTERNAL)
135                 return;
136
137 #ifdef HAVE_MMAP
138         if (!(tdb->flags & TDB_NOMMAP)) {
139                 tdb->map_ptr = mmap(NULL, tdb->map_size, 
140                                     PROT_READ|(tdb->read_only? 0:PROT_WRITE), 
141                                     MAP_SHARED|MAP_FILE, tdb->fd, 0);
142
143                 /*
144                  * NB. When mmap fails it returns MAP_FAILED *NOT* NULL !!!!
145                  */
146
147                 if (tdb->map_ptr == MAP_FAILED) {
148                         tdb->map_ptr = NULL;
149                         TDB_LOG((tdb, 2, "tdb_mmap failed for size %d (%s)\n", 
150                                  tdb->map_size, strerror(errno)));
151                 }
152         } else {
153                 tdb->map_ptr = NULL;
154         }
155 #else
156         tdb->map_ptr = NULL;
157 #endif
158 }
159
160 /* Endian conversion: we only ever deal with 4 byte quantities */
161 static void *convert(void *buf, u32 size)
162 {
163         u32 i, *p = buf;
164         for (i = 0; i < size / 4; i++)
165                 p[i] = TDB_BYTEREV(p[i]);
166         return buf;
167 }
168 #define DOCONV() (tdb->flags & TDB_CONVERT)
169 #define CONVERT(x) (DOCONV() ? convert(&x, sizeof(x)) : &x)
170
171 /* the body of the database is made of one list_struct for the free space
172    plus a separate data list for each hash value */
173 struct list_struct {
174         tdb_off next; /* offset of the next record in the list */
175         tdb_len rec_len; /* total byte length of record */
176         tdb_len key_len; /* byte length of key */
177         tdb_len data_len; /* byte length of data */
178         u32 full_hash; /* the full 32 bit hash of the key */
179         u32 magic;   /* try to catch errors */
180         /* the following union is implied:
181                 union {
182                         char record[rec_len];
183                         struct {
184                                 char key[key_len];
185                                 char data[data_len];
186                         }
187                         u32 totalsize; (tailer)
188                 }
189         */
190 };
191
192 /***************************************************************
193  Allow a caller to set a "alarm" flag that tdb can check to abort
194  a blocking lock on SIGALRM.
195 ***************************************************************/
196
197 static sig_atomic_t *palarm_fired;
198
199 void tdb_set_lock_alarm(sig_atomic_t *palarm)
200 {
201         palarm_fired = palarm;
202 }
203
204 /* a byte range locking function - return 0 on success
205    this functions locks/unlocks 1 byte at the specified offset.
206
207    On error, errno is also set so that errors are passed back properly
208    through tdb_open(). */
209 static int tdb_brlock(TDB_CONTEXT *tdb, tdb_off offset, 
210                       int rw_type, int lck_type, int probe)
211 {
212         struct flock fl;
213         int ret;
214
215         if (tdb->flags & TDB_NOLOCK)
216                 return 0;
217         if ((rw_type == F_WRLCK) && (tdb->read_only)) {
218                 errno = EACCES;
219                 return -1;
220         }
221
222         fl.l_type = rw_type;
223         fl.l_whence = SEEK_SET;
224         fl.l_start = offset;
225         fl.l_len = 1;
226         fl.l_pid = 0;
227
228         do {
229                 ret = fcntl(tdb->fd,lck_type,&fl);
230                 if (ret == -1 && errno == EINTR && palarm_fired && *palarm_fired)
231                         break;
232         } while (ret == -1 && errno == EINTR);
233
234         if (ret == -1) {
235                 if (!probe && lck_type != F_SETLK) {
236                         /* Ensure error code is set for log fun to examine. */
237                         if (errno == EINTR && palarm_fired && *palarm_fired)
238                                 tdb->ecode = TDB_ERR_LOCK_TIMEOUT;
239                         else
240                                 tdb->ecode = TDB_ERR_LOCK;
241                         TDB_LOG((tdb, 5,"tdb_brlock failed (fd=%d) at offset %d rw_type=%d lck_type=%d\n", 
242                                  tdb->fd, offset, rw_type, lck_type));
243                 }
244                 /* Was it an alarm timeout ? */
245                 if (errno == EINTR && palarm_fired && *palarm_fired)
246                         return TDB_ERRCODE(TDB_ERR_LOCK_TIMEOUT, -1);
247                 /* Otherwise - generic lock error. */
248                 /* errno set by fcntl */
249                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
250         }
251         return 0;
252 }
253
254 /* lock a list in the database. list -1 is the alloc list */
255 static int tdb_lock(TDB_CONTEXT *tdb, int list, int ltype)
256 {
257         if (list < -1 || list >= (int)tdb->header.hash_size) {
258                 TDB_LOG((tdb, 0,"tdb_lock: invalid list %d for ltype=%d\n", 
259                            list, ltype));
260                 return -1;
261         }
262         if (tdb->flags & TDB_NOLOCK)
263                 return 0;
264
265         /* Since fcntl locks don't nest, we do a lock for the first one,
266            and simply bump the count for future ones */
267         if (tdb->locked[list+1].count == 0) {
268                 if (!tdb->read_only && tdb->header.rwlocks) {
269                         if (tdb_spinlock(tdb, list, ltype)) {
270                                 TDB_LOG((tdb, 0, "tdb_lock spinlock failed on list ltype=%d\n", 
271                                            list, ltype));
272                                 return -1;
273                         }
274                 } else if (tdb_brlock(tdb,FREELIST_TOP+4*list,ltype,F_SETLKW, 0)) {
275                         TDB_LOG((tdb, 0,"tdb_lock failed on list %d ltype=%d (%s)\n", 
276                                            list, ltype, strerror(errno)));
277                         return -1;
278                 }
279                 tdb->locked[list+1].ltype = ltype;
280         }
281         tdb->locked[list+1].count++;
282         return 0;
283 }
284
285 /* unlock the database: returns void because it's too late for errors. */
286         /* changed to return int it may be interesting to know there
287            has been an error  --simo */
288 static int tdb_unlock(TDB_CONTEXT *tdb, int list, int ltype)
289 {
290         int ret = -1;
291
292         if (tdb->flags & TDB_NOLOCK)
293                 return 0;
294
295         /* Sanity checks */
296         if (list < -1 || list >= (int)tdb->header.hash_size) {
297                 TDB_LOG((tdb, 0, "tdb_unlock: list %d invalid (%d)\n", list, tdb->header.hash_size));
298                 return ret;
299         }
300
301         if (tdb->locked[list+1].count==0) {
302                 TDB_LOG((tdb, 0, "tdb_unlock: count is 0\n"));
303                 return ret;
304         }
305
306         if (tdb->locked[list+1].count == 1) {
307                 /* Down to last nested lock: unlock underneath */
308                 if (!tdb->read_only && tdb->header.rwlocks) {
309                         ret = tdb_spinunlock(tdb, list, ltype);
310                 } else {
311                         ret = tdb_brlock(tdb, FREELIST_TOP+4*list, F_UNLCK, F_SETLKW, 0);
312                 }
313         } else {
314                 ret = 0;
315         }
316         tdb->locked[list+1].count--;
317
318         if (ret)
319                 TDB_LOG((tdb, 0,"tdb_unlock: An error occurred unlocking!\n")); 
320         return ret;
321 }
322
323 /* This is based on the hash algorithm from gdbm */
324 static u32 tdb_hash(TDB_DATA *key)
325 {
326         u32 value;      /* Used to compute the hash value.  */
327         u32   i;        /* Used to cycle through random values. */
328
329         /* Set the initial value from the key size. */
330         for (value = 0x238F13AF * key->dsize, i=0; i < key->dsize; i++)
331                 value = (value + (key->dptr[i] << (i*5 % 24)));
332
333         return (1103515243 * value + 12345);  
334 }
335
336 /* check for an out of bounds access - if it is out of bounds then
337    see if the database has been expanded by someone else and expand
338    if necessary 
339    note that "len" is the minimum length needed for the db
340 */
341 static int tdb_oob(TDB_CONTEXT *tdb, tdb_off len, int probe)
342 {
343         struct stat st;
344         if (len <= tdb->map_size)
345                 return 0;
346         if (tdb->flags & TDB_INTERNAL) {
347                 if (!probe) {
348                         /* Ensure ecode is set for log fn. */
349                         tdb->ecode = TDB_ERR_IO;
350                         TDB_LOG((tdb, 0,"tdb_oob len %d beyond internal malloc size %d\n",
351                                  (int)len, (int)tdb->map_size));
352                 }
353                 return TDB_ERRCODE(TDB_ERR_IO, -1);
354         }
355
356         if (fstat(tdb->fd, &st) == -1)
357                 return TDB_ERRCODE(TDB_ERR_IO, -1);
358
359         if (st.st_size < (size_t)len) {
360                 if (!probe) {
361                         /* Ensure ecode is set for log fn. */
362                         tdb->ecode = TDB_ERR_IO;
363                         TDB_LOG((tdb, 0,"tdb_oob len %d beyond eof at %d\n",
364                                  (int)len, (int)st.st_size));
365                 }
366                 return TDB_ERRCODE(TDB_ERR_IO, -1);
367         }
368
369         /* Unmap, update size, remap */
370         if (tdb_munmap(tdb) == -1)
371                 return TDB_ERRCODE(TDB_ERR_IO, -1);
372         tdb->map_size = st.st_size;
373         tdb_mmap(tdb);
374         return 0;
375 }
376
377 /* write a lump of data at a specified offset */
378 static int tdb_write(TDB_CONTEXT *tdb, tdb_off off, void *buf, tdb_len len)
379 {
380         if (tdb_oob(tdb, off + len, 0) != 0)
381                 return -1;
382
383         if (tdb->map_ptr)
384                 memcpy(off + (char *)tdb->map_ptr, buf, len);
385 #ifdef HAVE_PWRITE
386         else if (pwrite(tdb->fd, buf, len, off) != (ssize_t)len) {
387 #else
388         else if (lseek(tdb->fd, off, SEEK_SET) != off
389                  || write(tdb->fd, buf, len) != (ssize_t)len) {
390 #endif
391                 /* Ensure ecode is set for log fn. */
392                 tdb->ecode = TDB_ERR_IO;
393                 TDB_LOG((tdb, 0,"tdb_write failed at %d len=%d (%s)\n",
394                            off, len, strerror(errno)));
395                 return TDB_ERRCODE(TDB_ERR_IO, -1);
396         }
397         return 0;
398 }
399
400 /* read a lump of data at a specified offset, maybe convert */
401 static int tdb_read(TDB_CONTEXT *tdb,tdb_off off,void *buf,tdb_len len,int cv)
402 {
403         if (tdb_oob(tdb, off + len, 0) != 0)
404                 return -1;
405
406         if (tdb->map_ptr)
407                 memcpy(buf, off + (char *)tdb->map_ptr, len);
408 #ifdef HAVE_PREAD
409         else if (pread(tdb->fd, buf, len, off) != (ssize_t)len) {
410 #else
411         else if (lseek(tdb->fd, off, SEEK_SET) != off
412                  || read(tdb->fd, buf, len) != (ssize_t)len) {
413 #endif
414                 /* Ensure ecode is set for log fn. */
415                 tdb->ecode = TDB_ERR_IO;
416                 TDB_LOG((tdb, 0,"tdb_read failed at %d len=%d (%s)\n",
417                            off, len, strerror(errno)));
418                 return TDB_ERRCODE(TDB_ERR_IO, -1);
419         }
420         if (cv)
421                 convert(buf, len);
422         return 0;
423 }
424
425 /* read a lump of data, allocating the space for it */
426 static char *tdb_alloc_read(TDB_CONTEXT *tdb, tdb_off offset, tdb_len len)
427 {
428         char *buf;
429
430         if (!(buf = malloc(len))) {
431                 /* Ensure ecode is set for log fn. */
432                 tdb->ecode = TDB_ERR_OOM;
433                 TDB_LOG((tdb, 0,"tdb_alloc_read malloc failed len=%d (%s)\n",
434                            len, strerror(errno)));
435                 return TDB_ERRCODE(TDB_ERR_OOM, buf);
436         }
437         if (tdb_read(tdb, offset, buf, len, 0) == -1) {
438                 SAFE_FREE(buf);
439                 return NULL;
440         }
441         return buf;
442 }
443
444 /* read/write a tdb_off */
445 static int ofs_read(TDB_CONTEXT *tdb, tdb_off offset, tdb_off *d)
446 {
447         return tdb_read(tdb, offset, (char*)d, sizeof(*d), DOCONV());
448 }
449 static int ofs_write(TDB_CONTEXT *tdb, tdb_off offset, tdb_off *d)
450 {
451         tdb_off off = *d;
452         return tdb_write(tdb, offset, CONVERT(off), sizeof(*d));
453 }
454
455 /* read/write a record */
456 static int rec_read(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
457 {
458         if (tdb_read(tdb, offset, rec, sizeof(*rec),DOCONV()) == -1)
459                 return -1;
460         if (TDB_BAD_MAGIC(rec)) {
461                 /* Ensure ecode is set for log fn. */
462                 tdb->ecode = TDB_ERR_CORRUPT;
463                 TDB_LOG((tdb, 0,"rec_read bad magic 0x%x at offset=%d\n", rec->magic, offset));
464                 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
465         }
466         return tdb_oob(tdb, rec->next+sizeof(*rec), 0);
467 }
468 static int rec_write(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
469 {
470         struct list_struct r = *rec;
471         return tdb_write(tdb, offset, CONVERT(r), sizeof(r));
472 }
473
474 /* read a freelist record and check for simple errors */
475 static int rec_free_read(TDB_CONTEXT *tdb, tdb_off off, struct list_struct *rec)
476 {
477         if (tdb_read(tdb, off, rec, sizeof(*rec),DOCONV()) == -1)
478                 return -1;
479
480         if (rec->magic == TDB_MAGIC) {
481                 /* this happens when a app is showdown while deleting a record - we should
482                    not completely fail when this happens */
483                 TDB_LOG((tdb, 0,"rec_free_read non-free magic at offset=%d - fixing\n", 
484                          rec->magic, off));
485                 rec->magic = TDB_FREE_MAGIC;
486                 if (tdb_write(tdb, off, rec, sizeof(*rec)) == -1)
487                         return -1;
488         }
489
490         if (rec->magic != TDB_FREE_MAGIC) {
491                 /* Ensure ecode is set for log fn. */
492                 tdb->ecode = TDB_ERR_CORRUPT;
493                 TDB_LOG((tdb, 0,"rec_free_read bad magic 0x%x at offset=%d\n", 
494                            rec->magic, off));
495                 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
496         }
497         if (tdb_oob(tdb, rec->next+sizeof(*rec), 0) != 0)
498                 return -1;
499         return 0;
500 }
501
502 /* update a record tailer (must hold allocation lock) */
503 static int update_tailer(TDB_CONTEXT *tdb, tdb_off offset,
504                          const struct list_struct *rec)
505 {
506         tdb_off totalsize;
507
508         /* Offset of tailer from record header */
509         totalsize = sizeof(*rec) + rec->rec_len;
510         return ofs_write(tdb, offset + totalsize - sizeof(tdb_off),
511                          &totalsize);
512 }
513
514 static tdb_off tdb_dump_record(TDB_CONTEXT *tdb, tdb_off offset)
515 {
516         struct list_struct rec;
517         tdb_off tailer_ofs, tailer;
518
519         if (tdb_read(tdb, offset, (char *)&rec, sizeof(rec), DOCONV()) == -1) {
520                 printf("ERROR: failed to read record at %u\n", offset);
521                 return 0;
522         }
523
524         printf(" rec: offset=%u next=%d rec_len=%d key_len=%d data_len=%d full_hash=0x%x magic=0x%x\n",
525                offset, rec.next, rec.rec_len, rec.key_len, rec.data_len, rec.full_hash, rec.magic);
526
527         tailer_ofs = offset + sizeof(rec) + rec.rec_len - sizeof(tdb_off);
528         if (ofs_read(tdb, tailer_ofs, &tailer) == -1) {
529                 printf("ERROR: failed to read tailer at %u\n", tailer_ofs);
530                 return rec.next;
531         }
532
533         if (tailer != rec.rec_len + sizeof(rec)) {
534                 printf("ERROR: tailer does not match record! tailer=%u totalsize=%u\n",
535                                 (unsigned)tailer, (unsigned)(rec.rec_len + sizeof(rec)));
536         }
537         return rec.next;
538 }
539
540 static int tdb_dump_chain(TDB_CONTEXT *tdb, int i)
541 {
542         tdb_off rec_ptr, top;
543
544         top = TDB_HASH_TOP(i);
545
546         if (tdb_lock(tdb, i, F_WRLCK) != 0)
547                 return -1;
548
549         if (ofs_read(tdb, top, &rec_ptr) == -1)
550                 return tdb_unlock(tdb, i, F_WRLCK);
551
552         if (rec_ptr)
553                 printf("hash=%d\n", i);
554
555         while (rec_ptr) {
556                 rec_ptr = tdb_dump_record(tdb, rec_ptr);
557         }
558
559         return tdb_unlock(tdb, i, F_WRLCK);
560 }
561
562 void tdb_dump_all(TDB_CONTEXT *tdb)
563 {
564         int i;
565         for (i=0;i<tdb->header.hash_size;i++) {
566                 tdb_dump_chain(tdb, i);
567         }
568         printf("freelist:\n");
569         tdb_dump_chain(tdb, -1);
570 }
571
572 int tdb_printfreelist(TDB_CONTEXT *tdb)
573 {
574         int ret;
575         long total_free = 0;
576         tdb_off offset, rec_ptr;
577         struct list_struct rec;
578
579         if ((ret = tdb_lock(tdb, -1, F_WRLCK)) != 0)
580                 return ret;
581
582         offset = FREELIST_TOP;
583
584         /* read in the freelist top */
585         if (ofs_read(tdb, offset, &rec_ptr) == -1) {
586                 tdb_unlock(tdb, -1, F_WRLCK);
587                 return 0;
588         }
589
590         printf("freelist top=[0x%08x]\n", rec_ptr );
591         while (rec_ptr) {
592                 if (tdb_read(tdb, rec_ptr, (char *)&rec, sizeof(rec), DOCONV()) == -1) {
593                         tdb_unlock(tdb, -1, F_WRLCK);
594                         return -1;
595                 }
596
597                 if (rec.magic != TDB_FREE_MAGIC) {
598                         printf("bad magic 0x%08x in free list\n", rec.magic);
599                         tdb_unlock(tdb, -1, F_WRLCK);
600                         return -1;
601                 }
602
603                 printf("entry offset=[0x%08x], rec.rec_len = [0x%08x (%d)]\n", rec.next, rec.rec_len, rec.rec_len );
604                 total_free += rec.rec_len;
605
606                 /* move to the next record */
607                 rec_ptr = rec.next;
608         }
609         printf("total rec_len = [0x%08x (%d)]\n", (int)total_free, 
610                (int)total_free);
611
612         return tdb_unlock(tdb, -1, F_WRLCK);
613 }
614
615 /* Remove an element from the freelist.  Must have alloc lock. */
616 static int remove_from_freelist(TDB_CONTEXT *tdb, tdb_off off, tdb_off next)
617 {
618         tdb_off last_ptr, i;
619
620         /* read in the freelist top */
621         last_ptr = FREELIST_TOP;
622         while (ofs_read(tdb, last_ptr, &i) != -1 && i != 0) {
623                 if (i == off) {
624                         /* We've found it! */
625                         return ofs_write(tdb, last_ptr, &next);
626                 }
627                 /* Follow chain (next offset is at start of record) */
628                 last_ptr = i;
629         }
630         TDB_LOG((tdb, 0,"remove_from_freelist: not on list at off=%d\n", off));
631         return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
632 }
633
634 /* Add an element into the freelist. Merge adjacent records if
635    neccessary. */
636 static int tdb_free(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
637 {
638         tdb_off right, left;
639
640         /* Allocation and tailer lock */
641         if (tdb_lock(tdb, -1, F_WRLCK) != 0)
642                 return -1;
643
644         /* set an initial tailer, so if we fail we don't leave a bogus record */
645         if (update_tailer(tdb, offset, rec) != 0) {
646                 TDB_LOG((tdb, 0, "tdb_free: upfate_tailer failed!\n"));
647                 goto fail;
648         }
649
650         /* Look right first (I'm an Australian, dammit) */
651         right = offset + sizeof(*rec) + rec->rec_len;
652         if (right + sizeof(*rec) <= tdb->map_size) {
653                 struct list_struct r;
654
655                 if (tdb_read(tdb, right, &r, sizeof(r), DOCONV()) == -1) {
656                         TDB_LOG((tdb, 0, "tdb_free: right read failed at %u\n", right));
657                         goto left;
658                 }
659
660                 /* If it's free, expand to include it. */
661                 if (r.magic == TDB_FREE_MAGIC) {
662                         if (remove_from_freelist(tdb, right, r.next) == -1) {
663                                 TDB_LOG((tdb, 0, "tdb_free: right free failed at %u\n", right));
664                                 goto left;
665                         }
666                         rec->rec_len += sizeof(r) + r.rec_len;
667                 }
668         }
669
670 left:
671         /* Look left */
672         left = offset - sizeof(tdb_off);
673         if (left > TDB_HASH_TOP(tdb->header.hash_size-1)) {
674                 struct list_struct l;
675                 tdb_off leftsize;
676
677                 /* Read in tailer and jump back to header */
678                 if (ofs_read(tdb, left, &leftsize) == -1) {
679                         TDB_LOG((tdb, 0, "tdb_free: left offset read failed at %u\n", left));
680                         goto update;
681                 }
682                 left = offset - leftsize;
683
684                 /* Now read in record */
685                 if (tdb_read(tdb, left, &l, sizeof(l), DOCONV()) == -1) {
686                         TDB_LOG((tdb, 0, "tdb_free: left read failed at %u (%u)\n", left, leftsize));
687                         goto update;
688                 }
689
690                 /* If it's free, expand to include it. */
691                 if (l.magic == TDB_FREE_MAGIC) {
692                         if (remove_from_freelist(tdb, left, l.next) == -1) {
693                                 TDB_LOG((tdb, 0, "tdb_free: left free failed at %u\n", left));
694                                 goto update;
695                         } else {
696                                 offset = left;
697                                 rec->rec_len += leftsize;
698                         }
699                 }
700         }
701
702 update:
703         if (update_tailer(tdb, offset, rec) == -1) {
704                 TDB_LOG((tdb, 0, "tdb_free: update_tailer failed at %u\n", offset));
705                 goto fail;
706         }
707
708         /* Now, prepend to free list */
709         rec->magic = TDB_FREE_MAGIC;
710
711         if (ofs_read(tdb, FREELIST_TOP, &rec->next) == -1 ||
712             rec_write(tdb, offset, rec) == -1 ||
713             ofs_write(tdb, FREELIST_TOP, &offset) == -1) {
714                 TDB_LOG((tdb, 0, "tdb_free record write failed at offset=%d\n", offset));
715                 goto fail;
716         }
717
718         /* And we're done. */
719         tdb_unlock(tdb, -1, F_WRLCK);
720         return 0;
721
722  fail:
723         tdb_unlock(tdb, -1, F_WRLCK);
724         return -1;
725 }
726
727
728 /* expand a file.  we prefer to use ftruncate, as that is what posix
729   says to use for mmap expansion */
730 static int expand_file(TDB_CONTEXT *tdb, tdb_off size, tdb_off addition)
731 {
732         char buf[1024];
733 #if HAVE_FTRUNCATE_EXTEND
734         if (ftruncate(tdb->fd, size+addition) != 0) {
735                 TDB_LOG((tdb, 0, "expand_file ftruncate to %d failed (%s)\n", 
736                            size+addition, strerror(errno)));
737                 return -1;
738         }
739 #else
740         char b = 0;
741
742 #ifdef HAVE_PWRITE
743         if (pwrite(tdb->fd,  &b, 1, (size+addition) - 1) != 1) {
744 #else
745         if (lseek(tdb->fd, (size+addition) - 1, SEEK_SET) != (size+addition) - 1 || 
746             write(tdb->fd, &b, 1) != 1) {
747 #endif
748                 TDB_LOG((tdb, 0, "expand_file to %d failed (%s)\n", 
749                            size+addition, strerror(errno)));
750                 return -1;
751         }
752 #endif
753
754         /* now fill the file with something. This ensures that the file isn't sparse, which would be
755            very bad if we ran out of disk. This must be done with write, not via mmap */
756         memset(buf, 0x42, sizeof(buf));
757         while (addition) {
758                 int n = addition>sizeof(buf)?sizeof(buf):addition;
759 #ifdef HAVE_PWRITE
760                 int ret = pwrite(tdb->fd, buf, n, size);
761 #else
762                 int ret;
763                 if (lseek(tdb->fd, size, SEEK_SET) != size)
764                         return -1;
765                 ret = write(tdb->fd, buf, n);
766 #endif
767                 if (ret != n) {
768                         TDB_LOG((tdb, 0, "expand_file write of %d failed (%s)\n", 
769                                    n, strerror(errno)));
770                         return -1;
771                 }
772                 addition -= n;
773                 size += n;
774         }
775         return 0;
776 }
777
778
779 /* expand the database at least size bytes by expanding the underlying
780    file and doing the mmap again if necessary */
781 static int tdb_expand(TDB_CONTEXT *tdb, tdb_off size)
782 {
783         struct list_struct rec;
784         tdb_off offset;
785
786         if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
787                 TDB_LOG((tdb, 0, "lock failed in tdb_expand\n"));
788                 return -1;
789         }
790
791         /* must know about any previous expansions by another process */
792         tdb_oob(tdb, tdb->map_size + 1, 1);
793
794         /* always make room for at least 10 more records, and round
795            the database up to a multiple of TDB_PAGE_SIZE */
796         size = TDB_ALIGN(tdb->map_size + size*10, TDB_PAGE_SIZE) - tdb->map_size;
797
798         if (!(tdb->flags & TDB_INTERNAL))
799                 tdb_munmap(tdb);
800
801         /*
802          * We must ensure the file is unmapped before doing this
803          * to ensure consistency with systems like OpenBSD where
804          * writes and mmaps are not consistent.
805          */
806
807         /* expand the file itself */
808         if (!(tdb->flags & TDB_INTERNAL)) {
809                 if (expand_file(tdb, tdb->map_size, size) != 0)
810                         goto fail;
811         }
812
813         tdb->map_size += size;
814
815         if (tdb->flags & TDB_INTERNAL)
816                 tdb->map_ptr = realloc(tdb->map_ptr, tdb->map_size);
817         else {
818                 /*
819                  * We must ensure the file is remapped before adding the space
820                  * to ensure consistency with systems like OpenBSD where
821                  * writes and mmaps are not consistent.
822                  */
823
824                 /* We're ok if the mmap fails as we'll fallback to read/write */
825                 tdb_mmap(tdb);
826         }
827
828         /* form a new freelist record */
829         memset(&rec,'\0',sizeof(rec));
830         rec.rec_len = size - sizeof(rec);
831
832         /* link it into the free list */
833         offset = tdb->map_size - size;
834         if (tdb_free(tdb, offset, &rec) == -1)
835                 goto fail;
836
837         tdb_unlock(tdb, -1, F_WRLCK);
838         return 0;
839  fail:
840         tdb_unlock(tdb, -1, F_WRLCK);
841         return -1;
842 }
843
844 /* allocate some space from the free list. The offset returned points
845    to a unconnected list_struct within the database with room for at
846    least length bytes of total data
847
848    0 is returned if the space could not be allocated
849  */
850 static tdb_off tdb_allocate(TDB_CONTEXT *tdb, tdb_len length,
851                             struct list_struct *rec)
852 {
853         tdb_off rec_ptr, last_ptr, newrec_ptr;
854         struct list_struct newrec;
855
856         if (tdb_lock(tdb, -1, F_WRLCK) == -1)
857                 return 0;
858
859         /* Extra bytes required for tailer */
860         length += sizeof(tdb_off);
861
862  again:
863         last_ptr = FREELIST_TOP;
864
865         /* read in the freelist top */
866         if (ofs_read(tdb, FREELIST_TOP, &rec_ptr) == -1)
867                 goto fail;
868
869         /* keep looking until we find a freelist record big enough */
870         while (rec_ptr) {
871                 if (rec_free_read(tdb, rec_ptr, rec) == -1)
872                         goto fail;
873
874                 if (rec->rec_len >= length) {
875                         /* found it - now possibly split it up  */
876                         if (rec->rec_len > length + MIN_REC_SIZE) {
877                                 /* Length of left piece */
878                                 length = TDB_ALIGN(length, TDB_ALIGNMENT);
879
880                                 /* Right piece to go on free list */
881                                 newrec.rec_len = rec->rec_len
882                                         - (sizeof(*rec) + length);
883                                 newrec_ptr = rec_ptr + sizeof(*rec) + length;
884
885                                 /* And left record is shortened */
886                                 rec->rec_len = length;
887                         } else
888                                 newrec_ptr = 0;
889
890                         /* Remove allocated record from the free list */
891                         if (ofs_write(tdb, last_ptr, &rec->next) == -1)
892                                 goto fail;
893
894                         /* Update header: do this before we drop alloc
895                            lock, otherwise tdb_free() might try to
896                            merge with us, thinking we're free.
897                            (Thanks Jeremy Allison). */
898                         rec->magic = TDB_MAGIC;
899                         if (rec_write(tdb, rec_ptr, rec) == -1)
900                                 goto fail;
901
902                         /* Did we create new block? */
903                         if (newrec_ptr) {
904                                 /* Update allocated record tailer (we
905                                    shortened it). */
906                                 if (update_tailer(tdb, rec_ptr, rec) == -1)
907                                         goto fail;
908
909                                 /* Free new record */
910                                 if (tdb_free(tdb, newrec_ptr, &newrec) == -1)
911                                         goto fail;
912                         }
913
914                         /* all done - return the new record offset */
915                         tdb_unlock(tdb, -1, F_WRLCK);
916                         return rec_ptr;
917                 }
918                 /* move to the next record */
919                 last_ptr = rec_ptr;
920                 rec_ptr = rec->next;
921         }
922         /* we didn't find enough space. See if we can expand the
923            database and if we can then try again */
924         if (tdb_expand(tdb, length + sizeof(*rec)) == 0)
925                 goto again;
926  fail:
927         tdb_unlock(tdb, -1, F_WRLCK);
928         return 0;
929 }
930
931 /* initialise a new database with a specified hash size */
932 static int tdb_new_database(TDB_CONTEXT *tdb, int hash_size)
933 {
934         struct tdb_header *newdb;
935         int size, ret = -1;
936
937         /* We make it up in memory, then write it out if not internal */
938         size = sizeof(struct tdb_header) + (hash_size+1)*sizeof(tdb_off);
939         if (!(newdb = calloc(size, 1)))
940                 return TDB_ERRCODE(TDB_ERR_OOM, -1);
941
942         /* Fill in the header */
943         newdb->version = TDB_VERSION;
944         newdb->hash_size = hash_size;
945 #ifdef USE_SPINLOCKS
946         newdb->rwlocks = size;
947 #endif
948         if (tdb->flags & TDB_INTERNAL) {
949                 tdb->map_size = size;
950                 tdb->map_ptr = (char *)newdb;
951                 memcpy(&tdb->header, newdb, sizeof(tdb->header));
952                 /* Convert the `ondisk' version if asked. */
953                 CONVERT(*newdb);
954                 return 0;
955         }
956         if (lseek(tdb->fd, 0, SEEK_SET) == -1)
957                 goto fail;
958
959         if (ftruncate(tdb->fd, 0) == -1)
960                 goto fail;
961
962         /* This creates an endian-converted header, as if read from disk */
963         CONVERT(*newdb);
964         memcpy(&tdb->header, newdb, sizeof(tdb->header));
965         /* Don't endian-convert the magic food! */
966         memcpy(newdb->magic_food, TDB_MAGIC_FOOD, strlen(TDB_MAGIC_FOOD)+1);
967         if (write(tdb->fd, newdb, size) != size)
968                 ret = -1;
969         else
970                 ret = tdb_create_rwlocks(tdb->fd, hash_size);
971
972   fail:
973         SAFE_FREE(newdb);
974         return ret;
975 }
976
977 /* Returns 0 on fail.  On success, return offset of record, and fills
978    in rec */
979 static tdb_off tdb_find(TDB_CONTEXT *tdb, TDB_DATA key, u32 hash,
980                         struct list_struct *r)
981 {
982         tdb_off rec_ptr;
983         
984         /* read in the hash top */
985         if (ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
986                 return 0;
987
988         /* keep looking until we find the right record */
989         while (rec_ptr) {
990                 if (rec_read(tdb, rec_ptr, r) == -1)
991                         return 0;
992
993                 if (!TDB_DEAD(r) && hash==r->full_hash && key.dsize==r->key_len) {
994                         char *k;
995                         /* a very likely hit - read the key */
996                         k = tdb_alloc_read(tdb, rec_ptr + sizeof(*r), 
997                                            r->key_len);
998                         if (!k)
999                                 return 0;
1000
1001                         if (memcmp(key.dptr, k, key.dsize) == 0) {
1002                                 SAFE_FREE(k);
1003                                 return rec_ptr;
1004                         }
1005                         SAFE_FREE(k);
1006                 }
1007                 rec_ptr = r->next;
1008         }
1009         return TDB_ERRCODE(TDB_ERR_NOEXIST, 0);
1010 }
1011
1012 /* If they do lockkeys, check that this hash is one they locked */
1013 static int tdb_keylocked(TDB_CONTEXT *tdb, u32 hash)
1014 {
1015         u32 i;
1016         if (!tdb->lockedkeys)
1017                 return 1;
1018         for (i = 0; i < tdb->lockedkeys[0]; i++)
1019                 if (tdb->lockedkeys[i+1] == hash)
1020                         return 1;
1021         return TDB_ERRCODE(TDB_ERR_NOLOCK, 0);
1022 }
1023
1024 /* As tdb_find, but if you succeed, keep the lock */
1025 static tdb_off tdb_find_lock(TDB_CONTEXT *tdb, TDB_DATA key, int locktype,
1026                              struct list_struct *rec)
1027 {
1028         u32 hash, rec_ptr;
1029
1030         hash = tdb_hash(&key);
1031         if (!tdb_keylocked(tdb, hash))
1032                 return 0;
1033         if (tdb_lock(tdb, BUCKET(hash), locktype) == -1)
1034                 return 0;
1035         if (!(rec_ptr = tdb_find(tdb, key, hash, rec)))
1036                 tdb_unlock(tdb, BUCKET(hash), locktype);
1037         return rec_ptr;
1038 }
1039
1040 enum TDB_ERROR tdb_error(TDB_CONTEXT *tdb)
1041 {
1042         return tdb->ecode;
1043 }
1044
1045 static struct tdb_errname {
1046         enum TDB_ERROR ecode; const char *estring;
1047 } emap[] = { {TDB_SUCCESS, "Success"},
1048              {TDB_ERR_CORRUPT, "Corrupt database"},
1049              {TDB_ERR_IO, "IO Error"},
1050              {TDB_ERR_LOCK, "Locking error"},
1051              {TDB_ERR_OOM, "Out of memory"},
1052              {TDB_ERR_EXISTS, "Record exists"},
1053              {TDB_ERR_NOLOCK, "Lock exists on other keys"},
1054              {TDB_ERR_NOEXIST, "Record does not exist"} };
1055
1056 /* Error string for the last tdb error */
1057 const char *tdb_errorstr(TDB_CONTEXT *tdb)
1058 {
1059         u32 i;
1060         for (i = 0; i < sizeof(emap) / sizeof(struct tdb_errname); i++)
1061                 if (tdb->ecode == emap[i].ecode)
1062                         return emap[i].estring;
1063         return "Invalid error code";
1064 }
1065
1066 /* update an entry in place - this only works if the new data size
1067    is <= the old data size and the key exists.
1068    on failure return -1.
1069 */
1070
1071 static int tdb_update(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA dbuf)
1072 {
1073         struct list_struct rec;
1074         tdb_off rec_ptr;
1075
1076         /* find entry */
1077         if (!(rec_ptr = tdb_find(tdb, key, tdb_hash(&key), &rec)))
1078                 return -1;
1079
1080         /* must be long enough key, data and tailer */
1081         if (rec.rec_len < key.dsize + dbuf.dsize + sizeof(tdb_off)) {
1082                 tdb->ecode = TDB_SUCCESS; /* Not really an error */
1083                 return -1;
1084         }
1085
1086         if (tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len,
1087                       dbuf.dptr, dbuf.dsize) == -1)
1088                 return -1;
1089
1090         if (dbuf.dsize != rec.data_len) {
1091                 /* update size */
1092                 rec.data_len = dbuf.dsize;
1093                 return rec_write(tdb, rec_ptr, &rec);
1094         }
1095  
1096         return 0;
1097 }
1098
1099 /* find an entry in the database given a key */
1100 /* If an entry doesn't exist tdb_err will be set to
1101  * TDB_ERR_NOEXIST. If a key has no data attached
1102  * tdb_err will not be set. Both will return a
1103  * zero pptr and zero dsize.
1104  */
1105
1106 TDB_DATA tdb_fetch(TDB_CONTEXT *tdb, TDB_DATA key)
1107 {
1108         tdb_off rec_ptr;
1109         struct list_struct rec;
1110         TDB_DATA ret;
1111
1112         /* find which hash bucket it is in */
1113         if (!(rec_ptr = tdb_find_lock(tdb,key,F_RDLCK,&rec)))
1114                 return tdb_null;
1115
1116         if (rec.data_len)
1117                 ret.dptr = tdb_alloc_read(tdb, rec_ptr + sizeof(rec) + rec.key_len,
1118                                           rec.data_len);
1119         else
1120                 ret.dptr = NULL;
1121         ret.dsize = rec.data_len;
1122         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
1123         return ret;
1124 }
1125
1126 /* check if an entry in the database exists 
1127
1128    note that 1 is returned if the key is found and 0 is returned if not found
1129    this doesn't match the conventions in the rest of this module, but is
1130    compatible with gdbm
1131 */
1132 int tdb_exists(TDB_CONTEXT *tdb, TDB_DATA key)
1133 {
1134         struct list_struct rec;
1135         
1136         if (tdb_find_lock(tdb, key, F_RDLCK, &rec) == 0)
1137                 return 0;
1138         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
1139         return 1;
1140 }
1141
1142 /* record lock stops delete underneath */
1143 static int lock_record(TDB_CONTEXT *tdb, tdb_off off)
1144 {
1145         return off ? tdb_brlock(tdb, off, F_RDLCK, F_SETLKW, 0) : 0;
1146 }
1147 /*
1148   Write locks override our own fcntl readlocks, so check it here.
1149   Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
1150   an error to fail to get the lock here.
1151 */
1152  
1153 static int write_lock_record(TDB_CONTEXT *tdb, tdb_off off)
1154 {
1155         struct tdb_traverse_lock *i;
1156         for (i = &tdb->travlocks; i; i = i->next)
1157                 if (i->off == off)
1158                         return -1;
1159         return tdb_brlock(tdb, off, F_WRLCK, F_SETLK, 1);
1160 }
1161
1162 /*
1163   Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
1164   an error to fail to get the lock here.
1165 */
1166
1167 static int write_unlock_record(TDB_CONTEXT *tdb, tdb_off off)
1168 {
1169         return tdb_brlock(tdb, off, F_UNLCK, F_SETLK, 0);
1170 }
1171 /* fcntl locks don't stack: avoid unlocking someone else's */
1172 static int unlock_record(TDB_CONTEXT *tdb, tdb_off off)
1173 {
1174         struct tdb_traverse_lock *i;
1175         u32 count = 0;
1176
1177         if (off == 0)
1178                 return 0;
1179         for (i = &tdb->travlocks; i; i = i->next)
1180                 if (i->off == off)
1181                         count++;
1182         return (count == 1 ? tdb_brlock(tdb, off, F_UNLCK, F_SETLKW, 0) : 0);
1183 }
1184
1185 /* actually delete an entry in the database given the offset */
1186 static int do_delete(TDB_CONTEXT *tdb, tdb_off rec_ptr, struct list_struct*rec)
1187 {
1188         tdb_off last_ptr, i;
1189         struct list_struct lastrec;
1190
1191         if (tdb->read_only) return -1;
1192
1193         if (write_lock_record(tdb, rec_ptr) == -1) {
1194                 /* Someone traversing here: mark it as dead */
1195                 rec->magic = TDB_DEAD_MAGIC;
1196                 return rec_write(tdb, rec_ptr, rec);
1197         }
1198         if (write_unlock_record(tdb, rec_ptr) != 0)
1199                 return -1;
1200
1201         /* find previous record in hash chain */
1202         if (ofs_read(tdb, TDB_HASH_TOP(rec->full_hash), &i) == -1)
1203                 return -1;
1204         for (last_ptr = 0; i != rec_ptr; last_ptr = i, i = lastrec.next)
1205                 if (rec_read(tdb, i, &lastrec) == -1)
1206                         return -1;
1207
1208         /* unlink it: next ptr is at start of record. */
1209         if (last_ptr == 0)
1210                 last_ptr = TDB_HASH_TOP(rec->full_hash);
1211         if (ofs_write(tdb, last_ptr, &rec->next) == -1)
1212                 return -1;
1213
1214         /* recover the space */
1215         if (tdb_free(tdb, rec_ptr, rec) == -1)
1216                 return -1;
1217         return 0;
1218 }
1219
1220 /* Uses traverse lock: 0 = finish, -1 = error, other = record offset */
1221 static int tdb_next_lock(TDB_CONTEXT *tdb, struct tdb_traverse_lock *tlock,
1222                          struct list_struct *rec)
1223 {
1224         int want_next = (tlock->off != 0);
1225
1226         /* No traversal allows if you've called tdb_lockkeys() */
1227         if (tdb->lockedkeys)
1228                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1229
1230         /* Lock each chain from the start one. */
1231         for (; tlock->hash < tdb->header.hash_size; tlock->hash++) {
1232                 if (tdb_lock(tdb, tlock->hash, F_WRLCK) == -1)
1233                         return -1;
1234
1235                 /* No previous record?  Start at top of chain. */
1236                 if (!tlock->off) {
1237                         if (ofs_read(tdb, TDB_HASH_TOP(tlock->hash),
1238                                      &tlock->off) == -1)
1239                                 goto fail;
1240                 } else {
1241                         /* Otherwise unlock the previous record. */
1242                         if (unlock_record(tdb, tlock->off) != 0)
1243                                 goto fail;
1244                 }
1245
1246                 if (want_next) {
1247                         /* We have offset of old record: grab next */
1248                         if (rec_read(tdb, tlock->off, rec) == -1)
1249                                 goto fail;
1250                         tlock->off = rec->next;
1251                 }
1252
1253                 /* Iterate through chain */
1254                 while( tlock->off) {
1255                         tdb_off current;
1256                         if (rec_read(tdb, tlock->off, rec) == -1)
1257                                 goto fail;
1258                         if (!TDB_DEAD(rec)) {
1259                                 /* Woohoo: we found one! */
1260                                 if (lock_record(tdb, tlock->off) != 0)
1261                                         goto fail;
1262                                 return tlock->off;
1263                         }
1264                         /* Try to clean dead ones from old traverses */
1265                         current = tlock->off;
1266                         tlock->off = rec->next;
1267                         if (!tdb->read_only && 
1268                             do_delete(tdb, current, rec) != 0)
1269                                 goto fail;
1270                 }
1271                 tdb_unlock(tdb, tlock->hash, F_WRLCK);
1272                 want_next = 0;
1273         }
1274         /* We finished iteration without finding anything */
1275         return TDB_ERRCODE(TDB_SUCCESS, 0);
1276
1277  fail:
1278         tlock->off = 0;
1279         if (tdb_unlock(tdb, tlock->hash, F_WRLCK) != 0)
1280                 TDB_LOG((tdb, 0, "tdb_next_lock: On error unlock failed!\n"));
1281         return -1;
1282 }
1283
1284 /* traverse the entire database - calling fn(tdb, key, data) on each element.
1285    return -1 on error or the record count traversed
1286    if fn is NULL then it is not called
1287    a non-zero return value from fn() indicates that the traversal should stop
1288   */
1289 int tdb_traverse(TDB_CONTEXT *tdb, tdb_traverse_func fn, void *state)
1290 {
1291         TDB_DATA key, dbuf;
1292         struct list_struct rec;
1293         struct tdb_traverse_lock tl = { NULL, 0, 0 };
1294         int ret, count = 0;
1295
1296         /* This was in the initializaton, above, but the IRIX compiler
1297          * did not like it.  crh
1298          */
1299         tl.next = tdb->travlocks.next;
1300
1301         /* fcntl locks don't stack: beware traverse inside traverse */
1302         tdb->travlocks.next = &tl;
1303
1304         /* tdb_next_lock places locks on the record returned, and its chain */
1305         while ((ret = tdb_next_lock(tdb, &tl, &rec)) > 0) {
1306                 count++;
1307                 /* now read the full record */
1308                 key.dptr = tdb_alloc_read(tdb, tl.off + sizeof(rec), 
1309                                           rec.key_len + rec.data_len);
1310                 if (!key.dptr) {
1311                         ret = -1;
1312                         if (tdb_unlock(tdb, tl.hash, F_WRLCK) != 0)
1313                                 goto out;
1314                         if (unlock_record(tdb, tl.off) != 0)
1315                                 TDB_LOG((tdb, 0, "tdb_traverse: key.dptr == NULL and unlock_record failed!\n"));
1316                         goto out;
1317                 }
1318                 key.dsize = rec.key_len;
1319                 dbuf.dptr = key.dptr + rec.key_len;
1320                 dbuf.dsize = rec.data_len;
1321
1322                 /* Drop chain lock, call out */
1323                 if (tdb_unlock(tdb, tl.hash, F_WRLCK) != 0) {
1324                         ret = -1;
1325                         goto out;
1326                 }
1327                 if (fn && fn(tdb, key, dbuf, state)) {
1328                         /* They want us to terminate traversal */
1329                         ret = count;
1330                         if (unlock_record(tdb, tl.off) != 0) {
1331                                 TDB_LOG((tdb, 0, "tdb_traverse: unlock_record failed!\n"));;
1332                                 ret = -1;
1333                         }
1334                         tdb->travlocks.next = tl.next;
1335                         SAFE_FREE(key.dptr);
1336                         return count;
1337                 }
1338                 SAFE_FREE(key.dptr);
1339         }
1340 out:
1341         tdb->travlocks.next = tl.next;
1342         if (ret < 0)
1343                 return -1;
1344         else
1345                 return count;
1346 }
1347
1348 /* find the first entry in the database and return its key */
1349 TDB_DATA tdb_firstkey(TDB_CONTEXT *tdb)
1350 {
1351         TDB_DATA key;
1352         struct list_struct rec;
1353
1354         /* release any old lock */
1355         if (unlock_record(tdb, tdb->travlocks.off) != 0)
1356                 return tdb_null;
1357         tdb->travlocks.off = tdb->travlocks.hash = 0;
1358
1359         if (tdb_next_lock(tdb, &tdb->travlocks, &rec) <= 0)
1360                 return tdb_null;
1361         /* now read the key */
1362         key.dsize = rec.key_len;
1363         key.dptr =tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),key.dsize);
1364         if (tdb_unlock(tdb, BUCKET(tdb->travlocks.hash), F_WRLCK) != 0)
1365                 TDB_LOG((tdb, 0, "tdb_firstkey: error occurred while tdb_unlocking!\n"));
1366         return key;
1367 }
1368
1369 /* find the next entry in the database, returning its key */
1370 TDB_DATA tdb_nextkey(TDB_CONTEXT *tdb, TDB_DATA oldkey)
1371 {
1372         u32 oldhash;
1373         TDB_DATA key = tdb_null;
1374         struct list_struct rec;
1375         char *k = NULL;
1376
1377         /* Is locked key the old key?  If so, traverse will be reliable. */
1378         if (tdb->travlocks.off) {
1379                 if (tdb_lock(tdb,tdb->travlocks.hash,F_WRLCK))
1380                         return tdb_null;
1381                 if (rec_read(tdb, tdb->travlocks.off, &rec) == -1
1382                     || !(k = tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),
1383                                             rec.key_len))
1384                     || memcmp(k, oldkey.dptr, oldkey.dsize) != 0) {
1385                         /* No, it wasn't: unlock it and start from scratch */
1386                         if (unlock_record(tdb, tdb->travlocks.off) != 0)
1387                                 return tdb_null;
1388                         if (tdb_unlock(tdb, tdb->travlocks.hash, F_WRLCK) != 0)
1389                                 return tdb_null;
1390                         tdb->travlocks.off = 0;
1391                 }
1392
1393                 SAFE_FREE(k);
1394         }
1395
1396         if (!tdb->travlocks.off) {
1397                 /* No previous element: do normal find, and lock record */
1398                 tdb->travlocks.off = tdb_find_lock(tdb, oldkey, F_WRLCK, &rec);
1399                 if (!tdb->travlocks.off)
1400                         return tdb_null;
1401                 tdb->travlocks.hash = BUCKET(rec.full_hash);
1402                 if (lock_record(tdb, tdb->travlocks.off) != 0) {
1403                         TDB_LOG((tdb, 0, "tdb_nextkey: lock_record failed (%s)!\n", strerror(errno)));
1404                         return tdb_null;
1405                 }
1406         }
1407         oldhash = tdb->travlocks.hash;
1408
1409         /* Grab next record: locks chain and returned record,
1410            unlocks old record */
1411         if (tdb_next_lock(tdb, &tdb->travlocks, &rec) > 0) {
1412                 key.dsize = rec.key_len;
1413                 key.dptr = tdb_alloc_read(tdb, tdb->travlocks.off+sizeof(rec),
1414                                           key.dsize);
1415                 /* Unlock the chain of this new record */
1416                 if (tdb_unlock(tdb, tdb->travlocks.hash, F_WRLCK) != 0)
1417                         TDB_LOG((tdb, 0, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
1418         }
1419         /* Unlock the chain of old record */
1420         if (tdb_unlock(tdb, BUCKET(oldhash), F_WRLCK) != 0)
1421                 TDB_LOG((tdb, 0, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
1422         return key;
1423 }
1424
1425 /* delete an entry in the database given a key */
1426 int tdb_delete(TDB_CONTEXT *tdb, TDB_DATA key)
1427 {
1428         tdb_off rec_ptr;
1429         struct list_struct rec;
1430         int ret;
1431
1432         if (!(rec_ptr = tdb_find_lock(tdb, key, F_WRLCK, &rec)))
1433                 return -1;
1434         ret = do_delete(tdb, rec_ptr, &rec);
1435         if (tdb_unlock(tdb, BUCKET(rec.full_hash), F_WRLCK) != 0)
1436                 TDB_LOG((tdb, 0, "tdb_delete: WARNING tdb_unlock failed!\n"));
1437         return ret;
1438 }
1439
1440 /* store an element in the database, replacing any existing element
1441    with the same key 
1442
1443    return 0 on success, -1 on failure
1444 */
1445 int tdb_store(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
1446 {
1447         struct list_struct rec;
1448         u32 hash;
1449         tdb_off rec_ptr;
1450         char *p = NULL;
1451         int ret = 0;
1452
1453         /* find which hash bucket it is in */
1454         hash = tdb_hash(&key);
1455         if (!tdb_keylocked(tdb, hash))
1456                 return -1;
1457         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
1458                 return -1;
1459
1460         /* check for it existing, on insert. */
1461         if (flag == TDB_INSERT) {
1462                 if (tdb_exists(tdb, key)) {
1463                         tdb->ecode = TDB_ERR_EXISTS;
1464                         goto fail;
1465                 }
1466         } else {
1467                 /* first try in-place update, on modify or replace. */
1468                 if (tdb_update(tdb, key, dbuf) == 0)
1469                         goto out;
1470                 if (flag == TDB_MODIFY && tdb->ecode == TDB_ERR_NOEXIST)
1471                         goto fail;
1472         }
1473         /* reset the error code potentially set by the tdb_update() */
1474         tdb->ecode = TDB_SUCCESS;
1475
1476         /* delete any existing record - if it doesn't exist we don't
1477            care.  Doing this first reduces fragmentation, and avoids
1478            coalescing with `allocated' block before it's updated. */
1479         if (flag != TDB_INSERT)
1480                 tdb_delete(tdb, key);
1481
1482         /* Copy key+value *before* allocating free space in case malloc
1483            fails and we are left with a dead spot in the tdb. */
1484
1485         if (!(p = (char *)malloc(key.dsize + dbuf.dsize))) {
1486                 tdb->ecode = TDB_ERR_OOM;
1487                 goto fail;
1488         }
1489
1490         memcpy(p, key.dptr, key.dsize);
1491         if (dbuf.dsize)
1492                 memcpy(p+key.dsize, dbuf.dptr, dbuf.dsize);
1493
1494         /* now we're into insert / modify / replace of a record which
1495          * we know could not be optimised by an in-place store (for
1496          * various reasons).  */
1497         if (!(rec_ptr = tdb_allocate(tdb, key.dsize + dbuf.dsize, &rec)))
1498                 goto fail;
1499
1500         /* Read hash top into next ptr */
1501         if (ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
1502                 goto fail;
1503
1504         rec.key_len = key.dsize;
1505         rec.data_len = dbuf.dsize;
1506         rec.full_hash = hash;
1507         rec.magic = TDB_MAGIC;
1508
1509         /* write out and point the top of the hash chain at it */
1510         if (rec_write(tdb, rec_ptr, &rec) == -1
1511             || tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+dbuf.dsize)==-1
1512             || ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
1513                 /* Need to tdb_unallocate() here */
1514                 goto fail;
1515         }
1516  out:
1517         SAFE_FREE(p); 
1518         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
1519         return ret;
1520 fail:
1521         ret = -1;
1522         goto out;
1523 }
1524
1525 /* Attempt to append data to an entry in place - this only works if the new data size
1526    is <= the old data size and the key exists.
1527    on failure return -1. Record must be locked before calling.
1528 */
1529 static int tdb_append_inplace(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA new_dbuf)
1530 {
1531         struct list_struct rec;
1532         tdb_off rec_ptr;
1533
1534         /* find entry */
1535         if (!(rec_ptr = tdb_find(tdb, key, tdb_hash(&key), &rec)))
1536                 return -1;
1537
1538         /* Append of 0 is always ok. */
1539         if (new_dbuf.dsize == 0)
1540                 return 0;
1541
1542         /* must be long enough for key, old data + new data and tailer */
1543         if (rec.rec_len < key.dsize + rec.data_len + new_dbuf.dsize + sizeof(tdb_off)) {
1544                 /* No room. */
1545                 tdb->ecode = TDB_SUCCESS; /* Not really an error */
1546                 return -1;
1547         }
1548
1549         if (tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len + rec.data_len,
1550                       new_dbuf.dptr, new_dbuf.dsize) == -1)
1551                 return -1;
1552
1553         /* update size */
1554         rec.data_len += new_dbuf.dsize;
1555         return rec_write(tdb, rec_ptr, &rec);
1556 }
1557
1558 /* Append to an entry. Create if not exist. */
1559
1560 int tdb_append(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA new_dbuf)
1561 {
1562         struct list_struct rec;
1563         u32 hash;
1564         tdb_off rec_ptr;
1565         char *p = NULL;
1566         int ret = 0;
1567         size_t new_data_size = 0;
1568
1569         /* find which hash bucket it is in */
1570         hash = tdb_hash(&key);
1571         if (!tdb_keylocked(tdb, hash))
1572                 return -1;
1573         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
1574                 return -1;
1575
1576         /* first try in-place. */
1577         if (tdb_append_inplace(tdb, key, new_dbuf) == 0)
1578                 goto out;
1579
1580         /* reset the error code potentially set by the tdb_append_inplace() */
1581         tdb->ecode = TDB_SUCCESS;
1582
1583         /* find entry */
1584         if (!(rec_ptr = tdb_find(tdb, key, hash, &rec))) {
1585                 if (tdb->ecode != TDB_ERR_NOEXIST)
1586                         goto fail;
1587
1588                 /* Not found - create. */
1589
1590                 ret = tdb_store(tdb, key, new_dbuf, TDB_INSERT);
1591                 goto out;
1592         }
1593
1594         new_data_size = rec.data_len + new_dbuf.dsize;
1595
1596         /* Copy key+old_value+value *before* allocating free space in case malloc
1597            fails and we are left with a dead spot in the tdb. */
1598
1599         if (!(p = (char *)malloc(key.dsize + new_data_size))) {
1600                 tdb->ecode = TDB_ERR_OOM;
1601                 goto fail;
1602         }
1603
1604         /* Copy the key in place. */
1605         memcpy(p, key.dptr, key.dsize);
1606
1607         /* Now read the old data into place. */
1608         if (rec.data_len &&
1609                 tdb_read(tdb, rec_ptr + sizeof(rec) + rec.key_len, p + key.dsize, rec.data_len, 0) == -1)
1610                         goto fail;
1611
1612         /* Finally append the new data. */
1613         if (new_dbuf.dsize)
1614                 memcpy(p+key.dsize+rec.data_len, new_dbuf.dptr, new_dbuf.dsize);
1615
1616         /* delete any existing record - if it doesn't exist we don't
1617            care.  Doing this first reduces fragmentation, and avoids
1618            coalescing with `allocated' block before it's updated. */
1619
1620         tdb_delete(tdb, key);
1621
1622         if (!(rec_ptr = tdb_allocate(tdb, key.dsize + new_data_size, &rec)))
1623                 goto fail;
1624
1625         /* Read hash top into next ptr */
1626         if (ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
1627                 goto fail;
1628
1629         rec.key_len = key.dsize;
1630         rec.data_len = new_data_size;
1631         rec.full_hash = hash;
1632         rec.magic = TDB_MAGIC;
1633
1634         /* write out and point the top of the hash chain at it */
1635         if (rec_write(tdb, rec_ptr, &rec) == -1
1636             || tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+new_data_size)==-1
1637             || ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
1638                 /* Need to tdb_unallocate() here */
1639                 goto fail;
1640         }
1641
1642  out:
1643         SAFE_FREE(p); 
1644         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
1645         return ret;
1646
1647 fail:
1648         ret = -1;
1649         goto out;
1650 }
1651
1652 static int tdb_already_open(dev_t device,
1653                             ino_t ino)
1654 {
1655         TDB_CONTEXT *i;
1656         
1657         for (i = tdbs; i; i = i->next) {
1658                 if (i->device == device && i->inode == ino) {
1659                         return 1;
1660                 }
1661         }
1662
1663         return 0;
1664 }
1665
1666 /* open the database, creating it if necessary 
1667
1668    The open_flags and mode are passed straight to the open call on the
1669    database file. A flags value of O_WRONLY is invalid. The hash size
1670    is advisory, use zero for a default value.
1671
1672    Return is NULL on error, in which case errno is also set.  Don't 
1673    try to call tdb_error or tdb_errname, just do strerror(errno).
1674
1675    @param name may be NULL for internal databases. */
1676 TDB_CONTEXT *tdb_open(const char *name, int hash_size, int tdb_flags,
1677                       int open_flags, mode_t mode)
1678 {
1679         return tdb_open_ex(name, hash_size, tdb_flags, open_flags, mode, NULL);
1680 }
1681
1682
1683 TDB_CONTEXT *tdb_open_ex(const char *name, int hash_size, int tdb_flags,
1684                          int open_flags, mode_t mode,
1685                          tdb_log_func log_fn)
1686 {
1687         TDB_CONTEXT *tdb;
1688         struct stat st;
1689         int rev = 0, locked;
1690         unsigned char *vp;
1691         u32 vertest;
1692
1693         if (!(tdb = calloc(1, sizeof *tdb))) {
1694                 /* Can't log this */
1695                 errno = ENOMEM;
1696                 goto fail;
1697         }
1698         tdb->fd = -1;
1699         tdb->name = NULL;
1700         tdb->map_ptr = NULL;
1701         tdb->lockedkeys = NULL;
1702         tdb->flags = tdb_flags;
1703         tdb->open_flags = open_flags;
1704         tdb->log_fn = log_fn;
1705         
1706         if ((open_flags & O_ACCMODE) == O_WRONLY) {
1707                 TDB_LOG((tdb, 0, "tdb_open_ex: can't open tdb %s write-only\n",
1708                          name));
1709                 errno = EINVAL;
1710                 goto fail;
1711         }
1712         
1713         if (hash_size == 0)
1714                 hash_size = DEFAULT_HASH_SIZE;
1715         if ((open_flags & O_ACCMODE) == O_RDONLY) {
1716                 tdb->read_only = 1;
1717                 /* read only databases don't do locking or clear if first */
1718                 tdb->flags |= TDB_NOLOCK;
1719                 tdb->flags &= ~TDB_CLEAR_IF_FIRST;
1720         }
1721
1722         /* internal databases don't mmap or lock, and start off cleared */
1723         if (tdb->flags & TDB_INTERNAL) {
1724                 tdb->flags |= (TDB_NOLOCK | TDB_NOMMAP);
1725                 tdb->flags &= ~TDB_CLEAR_IF_FIRST;
1726                 if (tdb_new_database(tdb, hash_size) != 0) {
1727                         TDB_LOG((tdb, 0, "tdb_open_ex: tdb_new_database failed!"));
1728                         goto fail;
1729                 }
1730                 goto internal;
1731         }
1732
1733         if ((tdb->fd = open(name, open_flags, mode)) == -1) {
1734                 TDB_LOG((tdb, 5, "tdb_open_ex: could not open file %s: %s\n",
1735                          name, strerror(errno)));
1736                 goto fail;      /* errno set by open(2) */
1737         }
1738
1739         /* ensure there is only one process initialising at once */
1740         if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0) == -1) {
1741                 TDB_LOG((tdb, 0, "tdb_open_ex: failed to get global lock on %s: %s\n",
1742                          name, strerror(errno)));
1743                 goto fail;      /* errno set by tdb_brlock */
1744         }
1745
1746         /* we need to zero database if we are the only one with it open */
1747         if ((locked = (tdb_brlock(tdb, ACTIVE_LOCK, F_WRLCK, F_SETLK, 0) == 0))
1748             && (tdb_flags & TDB_CLEAR_IF_FIRST)) {
1749                 open_flags |= O_CREAT;
1750                 if (ftruncate(tdb->fd, 0) == -1) {
1751                         TDB_LOG((tdb, 0, "tdb_open_ex: "
1752                                  "failed to truncate %s: %s\n",
1753                                  name, strerror(errno)));
1754                         goto fail; /* errno set by ftruncate */
1755                 }
1756         }
1757
1758         if (read(tdb->fd, &tdb->header, sizeof(tdb->header)) != sizeof(tdb->header)
1759             || strcmp(tdb->header.magic_food, TDB_MAGIC_FOOD) != 0
1760             || (tdb->header.version != TDB_VERSION
1761                 && !(rev = (tdb->header.version==TDB_BYTEREV(TDB_VERSION))))) {
1762                 /* its not a valid database - possibly initialise it */
1763                 if (!(open_flags & O_CREAT) || tdb_new_database(tdb, hash_size) == -1) {
1764                         errno = EIO; /* ie bad format or something */
1765                         goto fail;
1766                 }
1767                 rev = (tdb->flags & TDB_CONVERT);
1768         }
1769         vp = (unsigned char *)&tdb->header.version;
1770         vertest = (((u32)vp[0]) << 24) | (((u32)vp[1]) << 16) |
1771                   (((u32)vp[2]) << 8) | (u32)vp[3];
1772         tdb->flags |= (vertest==TDB_VERSION) ? TDB_BIGENDIAN : 0;
1773         if (!rev)
1774                 tdb->flags &= ~TDB_CONVERT;
1775         else {
1776                 tdb->flags |= TDB_CONVERT;
1777                 convert(&tdb->header, sizeof(tdb->header));
1778         }
1779         if (fstat(tdb->fd, &st) == -1)
1780                 goto fail;
1781
1782         /* Is it already in the open list?  If so, fail. */
1783         if (tdb_already_open(st.st_dev, st.st_ino)) {
1784                 TDB_LOG((tdb, 2, "tdb_open_ex: "
1785                          "%s (%d,%d) is already open in this process\n",
1786                          name, st.st_dev, st.st_ino));
1787                 errno = EBUSY;
1788                 goto fail;
1789         }
1790
1791         if (!(tdb->name = (char *)strdup(name))) {
1792                 errno = ENOMEM;
1793                 goto fail;
1794         }
1795
1796         tdb->map_size = st.st_size;
1797         tdb->device = st.st_dev;
1798         tdb->inode = st.st_ino;
1799         tdb->locked = calloc(tdb->header.hash_size+1, sizeof(tdb->locked[0]));
1800         if (!tdb->locked) {
1801                 TDB_LOG((tdb, 2, "tdb_open_ex: "
1802                          "failed to allocate lock structure for %s\n",
1803                          name));
1804                 errno = ENOMEM;
1805                 goto fail;
1806         }
1807         tdb_mmap(tdb);
1808         if (locked) {
1809                 if (!tdb->read_only)
1810                         if (tdb_clear_spinlocks(tdb) != 0) {
1811                                 TDB_LOG((tdb, 0, "tdb_open_ex: "
1812                                 "failed to clear spinlock\n"));
1813                                 goto fail;
1814                         }
1815                 if (tdb_brlock(tdb, ACTIVE_LOCK, F_UNLCK, F_SETLK, 0) == -1) {
1816                         TDB_LOG((tdb, 0, "tdb_open_ex: "
1817                                  "failed to take ACTIVE_LOCK on %s: %s\n",
1818                                  name, strerror(errno)));
1819                         goto fail;
1820                 }
1821         }
1822         /* leave this lock in place to indicate it's in use */
1823         if (tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0) == -1)
1824                 goto fail;
1825
1826  internal:
1827         /* Internal (memory-only) databases skip all the code above to
1828          * do with disk files, and resume here by releasing their
1829          * global lock and hooking into the active list. */
1830         if (tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0) == -1)
1831                 goto fail;
1832         tdb->next = tdbs;
1833         tdbs = tdb;
1834         return tdb;
1835
1836  fail:
1837         { int save_errno = errno;
1838
1839         if (!tdb)
1840                 return NULL;
1841         
1842         if (tdb->map_ptr) {
1843                 if (tdb->flags & TDB_INTERNAL)
1844                         SAFE_FREE(tdb->map_ptr);
1845                 else
1846                         tdb_munmap(tdb);
1847         }
1848         SAFE_FREE(tdb->name);
1849         if (tdb->fd != -1)
1850                 if (close(tdb->fd) != 0)
1851                         TDB_LOG((tdb, 5, "tdb_open_ex: failed to close tdb->fd on error!\n"));
1852         SAFE_FREE(tdb->locked);
1853         SAFE_FREE(tdb);
1854         errno = save_errno;
1855         return NULL;
1856         }
1857 }
1858
1859 /**
1860  * Close a database.
1861  *
1862  * @returns -1 for error; 0 for success.
1863  **/
1864 int tdb_close(TDB_CONTEXT *tdb)
1865 {
1866         TDB_CONTEXT **i;
1867         int ret = 0;
1868
1869         if (tdb->map_ptr) {
1870                 if (tdb->flags & TDB_INTERNAL)
1871                         SAFE_FREE(tdb->map_ptr);
1872                 else
1873                         tdb_munmap(tdb);
1874         }
1875         SAFE_FREE(tdb->name);
1876         if (tdb->fd != -1)
1877                 ret = close(tdb->fd);
1878         SAFE_FREE(tdb->locked);
1879         SAFE_FREE(tdb->lockedkeys);
1880
1881         /* Remove from contexts list */
1882         for (i = &tdbs; *i; i = &(*i)->next) {
1883                 if (*i == tdb) {
1884                         *i = tdb->next;
1885                         break;
1886                 }
1887         }
1888
1889         memset(tdb, 0, sizeof(*tdb));
1890         SAFE_FREE(tdb);
1891
1892         return ret;
1893 }
1894
1895 /* lock/unlock entire database */
1896 int tdb_lockall(TDB_CONTEXT *tdb)
1897 {
1898         u32 i;
1899
1900         /* There are no locks on read-only dbs */
1901         if (tdb->read_only)
1902                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
1903         if (tdb->lockedkeys)
1904                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1905         for (i = 0; i < tdb->header.hash_size; i++) 
1906                 if (tdb_lock(tdb, i, F_WRLCK))
1907                         break;
1908
1909         /* If error, release locks we have... */
1910         if (i < tdb->header.hash_size) {
1911                 u32 j;
1912
1913                 for ( j = 0; j < i; j++)
1914                         tdb_unlock(tdb, j, F_WRLCK);
1915                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1916         }
1917
1918         return 0;
1919 }
1920 void tdb_unlockall(TDB_CONTEXT *tdb)
1921 {
1922         u32 i;
1923         for (i=0; i < tdb->header.hash_size; i++)
1924                 tdb_unlock(tdb, i, F_WRLCK);
1925 }
1926
1927 int tdb_lockkeys(TDB_CONTEXT *tdb, u32 number, TDB_DATA keys[])
1928 {
1929         u32 i, j, hash;
1930
1931         /* Can't lock more keys if already locked */
1932         if (tdb->lockedkeys)
1933                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1934         if (!(tdb->lockedkeys = malloc(sizeof(u32) * (number+1))))
1935                 return TDB_ERRCODE(TDB_ERR_OOM, -1);
1936         /* First number in array is # keys */
1937         tdb->lockedkeys[0] = number;
1938
1939         /* Insertion sort by bucket */
1940         for (i = 0; i < number; i++) {
1941                 hash = tdb_hash(&keys[i]);
1942                 for (j = 0; j < i && BUCKET(tdb->lockedkeys[j+1]) < BUCKET(hash); j++);
1943                         memmove(&tdb->lockedkeys[j+2], &tdb->lockedkeys[j+1], sizeof(u32) * (i-j));
1944                 tdb->lockedkeys[j+1] = hash;
1945         }
1946         /* Finally, lock in order */
1947         for (i = 0; i < number; i++)
1948                 if (tdb_lock(tdb, i, F_WRLCK))
1949                         break;
1950
1951         /* If error, release locks we have... */
1952         if (i < number) {
1953                 for ( j = 0; j < i; j++)
1954                         tdb_unlock(tdb, j, F_WRLCK);
1955                 SAFE_FREE(tdb->lockedkeys);
1956                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1957         }
1958         return 0;
1959 }
1960
1961 /* Unlock the keys previously locked by tdb_lockkeys() */
1962 void tdb_unlockkeys(TDB_CONTEXT *tdb)
1963 {
1964         u32 i;
1965         if (!tdb->lockedkeys)
1966                 return;
1967         for (i = 0; i < tdb->lockedkeys[0]; i++)
1968                 tdb_unlock(tdb, tdb->lockedkeys[i+1], F_WRLCK);
1969         SAFE_FREE(tdb->lockedkeys);
1970 }
1971
1972 /* lock/unlock one hash chain. This is meant to be used to reduce
1973    contention - it cannot guarantee how many records will be locked */
1974 int tdb_chainlock(TDB_CONTEXT *tdb, TDB_DATA key)
1975 {
1976         return tdb_lock(tdb, BUCKET(tdb_hash(&key)), F_WRLCK);
1977 }
1978
1979 int tdb_chainunlock(TDB_CONTEXT *tdb, TDB_DATA key)
1980 {
1981         return tdb_unlock(tdb, BUCKET(tdb_hash(&key)), F_WRLCK);
1982 }
1983
1984 int tdb_chainlock_read(TDB_CONTEXT *tdb, TDB_DATA key)
1985 {
1986         return tdb_lock(tdb, BUCKET(tdb_hash(&key)), F_RDLCK);
1987 }
1988
1989 int tdb_chainunlock_read(TDB_CONTEXT *tdb, TDB_DATA key)
1990 {
1991         return tdb_unlock(tdb, BUCKET(tdb_hash(&key)), F_RDLCK);
1992 }
1993
1994
1995 /* register a loging function */
1996 void tdb_logging_function(TDB_CONTEXT *tdb, void (*fn)(TDB_CONTEXT *, int , const char *, ...))
1997 {
1998         tdb->log_fn = fn;
1999 }
2000
2001
2002 /* reopen a tdb - this is used after a fork to ensure that we have an independent
2003    seek pointer from our parent and to re-establish locks */
2004 int tdb_reopen(TDB_CONTEXT *tdb)
2005 {
2006         struct stat st;
2007
2008         if (tdb_munmap(tdb) != 0) {
2009                 TDB_LOG((tdb, 0, "tdb_reopen: munmap failed (%s)\n", strerror(errno)));
2010                 goto fail;
2011         }
2012         if (close(tdb->fd) != 0)
2013                 TDB_LOG((tdb, 0, "tdb_reopen: WARNING closing tdb->fd failed!\n"));
2014         tdb->fd = open(tdb->name, tdb->open_flags & ~(O_CREAT|O_TRUNC), 0);
2015         if (tdb->fd == -1) {
2016                 TDB_LOG((tdb, 0, "tdb_reopen: open failed (%s)\n", strerror(errno)));
2017                 goto fail;
2018         }
2019         if (fstat(tdb->fd, &st) != 0) {
2020                 TDB_LOG((tdb, 0, "tdb_reopen: fstat failed (%s)\n", strerror(errno)));
2021                 goto fail;
2022         }
2023         if (st.st_ino != tdb->inode || st.st_dev != tdb->device) {
2024                 TDB_LOG((tdb, 0, "tdb_reopen: file dev/inode has changed!\n"));
2025                 goto fail;
2026         }
2027         tdb_mmap(tdb);
2028         if (tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0) == -1) {
2029                 TDB_LOG((tdb, 0, "tdb_reopen: failed to obtain active lock\n"));
2030                 goto fail;
2031         }
2032
2033         return 0;
2034
2035 fail:
2036         tdb_close(tdb);
2037         return -1;
2038 }
2039
2040 /* reopen all tdb's */
2041 int tdb_reopen_all(void)
2042 {
2043         TDB_CONTEXT *tdb;
2044
2045         for (tdb=tdbs; tdb; tdb = tdb->next) {
2046                 if (tdb_reopen(tdb) != 0) return -1;
2047         }
2048
2049         return 0;
2050 }