chiark / gitweb /
[PATCH] klibc makefile fixes
[elogind.git] / tdb / tdb.c
1  /* 
2    Unix SMB/CIFS implementation.
3    Samba database functions
4    Copyright (C) Andrew Tridgell              1999-2000
5    Copyright (C) Luke Kenneth Casson Leighton      2000
6    Copyright (C) Paul `Rusty' Russell              2000
7    Copyright (C) Jeremy Allison                    2000-2003
8    
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 2 of the License, or
12    (at your option) any later version.
13    
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18    
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22 */
23
24
25 /* NOTE: If you use tdbs under valgrind, and in particular if you run
26  * tdbtorture, you may get spurious "uninitialized value" warnings.  I
27  * think this is because valgrind doesn't understand that the mmap'd
28  * area may be written to by other processes.  Memory can, from the
29  * point of view of the grinded process, spontaneously become
30  * initialized.
31  *
32  * I can think of a few solutions.  [mbp 20030311]
33  *
34  * 1 - Write suppressions for Valgrind so that it doesn't complain
35  * about this.  Probably the most reasonable but people need to
36  * remember to use them.
37  *
38  * 2 - Use IO not mmap when running under valgrind.  Not so nice.
39  *
40  * 3 - Use the special valgrind macros to mark memory as valid at the
41  * right time.  Probably too hard -- the process just doesn't know.
42  */ 
43
44 /* udev defines */
45 #define STANDALONE
46 #define TDB_DEBUG
47 #define HAVE_MMAP       1
48
49
50 #ifdef STANDALONE
51 #if HAVE_CONFIG_H
52 #include <config.h>
53 #endif
54
55 #include <stdlib.h>
56 #include <stdio.h>
57 #include <fcntl.h>
58 #include <unistd.h>
59 #include <string.h>
60 #include <fcntl.h>
61 #include <errno.h>
62 #include <sys/mman.h>
63 #include <sys/stat.h>
64 #include <signal.h>
65 #include "tdb.h"
66 #include "spinlock.h"
67 #else
68 #include "includes.h"
69 #endif
70
71 #define TDB_MAGIC_FOOD "TDB file\n"
72 #define TDB_VERSION (0x26011967 + 6)
73 #define TDB_MAGIC (0x26011999U)
74 #define TDB_FREE_MAGIC (~TDB_MAGIC)
75 #define TDB_DEAD_MAGIC (0xFEE1DEAD)
76 #define TDB_ALIGNMENT 4
77 #define MIN_REC_SIZE (2*sizeof(struct list_struct) + TDB_ALIGNMENT)
78 #define DEFAULT_HASH_SIZE 131
79 #define TDB_PAGE_SIZE 0x2000
80 #define FREELIST_TOP (sizeof(struct tdb_header))
81 #define TDB_ALIGN(x,a) (((x) + (a)-1) & ~((a)-1))
82 #define TDB_BYTEREV(x) (((((x)&0xff)<<24)|((x)&0xFF00)<<8)|(((x)>>8)&0xFF00)|((x)>>24))
83 #define TDB_DEAD(r) ((r)->magic == TDB_DEAD_MAGIC)
84 #define TDB_BAD_MAGIC(r) ((r)->magic != TDB_MAGIC && !TDB_DEAD(r))
85 #define TDB_HASH_TOP(hash) (FREELIST_TOP + (BUCKET(hash)+1)*sizeof(tdb_off))
86
87 /* NB assumes there is a local variable called "tdb" that is the
88  * current context, also takes doubly-parenthesized print-style
89  * argument. */
90 #define TDB_LOG(x) (tdb->log_fn?((tdb->log_fn x),0) : 0)
91
92 /* lock offsets */
93 #define GLOBAL_LOCK 0
94 #define ACTIVE_LOCK 4
95
96 #ifndef MAP_FILE
97 #define MAP_FILE 0
98 #endif
99
100 #ifndef MAP_FAILED
101 #define MAP_FAILED ((void *)-1)
102 #endif
103
104 /* free memory if the pointer is valid and zero the pointer */
105 #ifndef SAFE_FREE
106 #define SAFE_FREE(x) do { if ((x) != NULL) {free((x)); (x)=NULL;} } while(0)
107 #endif
108
109 #define BUCKET(hash) ((hash) % tdb->header.hash_size)
110 TDB_DATA tdb_null;
111
112 /* all contexts, to ensure no double-opens (fcntl locks don't nest!) */
113 static TDB_CONTEXT *tdbs = NULL;
114
115 static int tdb_munmap(TDB_CONTEXT *tdb)
116 {
117         if (tdb->flags & TDB_INTERNAL)
118                 return 0;
119
120 #ifdef HAVE_MMAP
121         if (tdb->map_ptr) {
122                 int ret = munmap(tdb->map_ptr, tdb->map_size);
123                 if (ret != 0)
124                         return ret;
125         }
126 #endif
127         tdb->map_ptr = NULL;
128         return 0;
129 }
130
131 static void tdb_mmap(TDB_CONTEXT *tdb)
132 {
133         if (tdb->flags & TDB_INTERNAL)
134                 return;
135
136 #ifdef HAVE_MMAP
137         if (!(tdb->flags & TDB_NOMMAP)) {
138                 tdb->map_ptr = mmap(NULL, tdb->map_size, 
139                                     PROT_READ|(tdb->read_only? 0:PROT_WRITE), 
140                                     MAP_SHARED|MAP_FILE, tdb->fd, 0);
141
142                 /*
143                  * NB. When mmap fails it returns MAP_FAILED *NOT* NULL !!!!
144                  */
145
146                 if (tdb->map_ptr == MAP_FAILED) {
147                         tdb->map_ptr = NULL;
148                         TDB_LOG((tdb, 2, "tdb_mmap failed for size %d (%s)\n", 
149                                  tdb->map_size, strerror(errno)));
150                 }
151         } else {
152                 tdb->map_ptr = NULL;
153         }
154 #else
155         tdb->map_ptr = NULL;
156 #endif
157 }
158
159 /* Endian conversion: we only ever deal with 4 byte quantities */
160 static void *convert(void *buf, u32 size)
161 {
162         u32 i, *p = buf;
163         for (i = 0; i < size / 4; i++)
164                 p[i] = TDB_BYTEREV(p[i]);
165         return buf;
166 }
167 #define DOCONV() (tdb->flags & TDB_CONVERT)
168 #define CONVERT(x) (DOCONV() ? convert(&x, sizeof(x)) : &x)
169
170 /* the body of the database is made of one list_struct for the free space
171    plus a separate data list for each hash value */
172 struct list_struct {
173         tdb_off next; /* offset of the next record in the list */
174         tdb_len rec_len; /* total byte length of record */
175         tdb_len key_len; /* byte length of key */
176         tdb_len data_len; /* byte length of data */
177         u32 full_hash; /* the full 32 bit hash of the key */
178         u32 magic;   /* try to catch errors */
179         /* the following union is implied:
180                 union {
181                         char record[rec_len];
182                         struct {
183                                 char key[key_len];
184                                 char data[data_len];
185                         }
186                         u32 totalsize; (tailer)
187                 }
188         */
189 };
190
191 /***************************************************************
192  Allow a caller to set a "alarm" flag that tdb can check to abort
193  a blocking lock on SIGALRM.
194 ***************************************************************/
195
196 static sig_atomic_t *palarm_fired;
197
198 void tdb_set_lock_alarm(sig_atomic_t *palarm)
199 {
200         palarm_fired = palarm;
201 }
202
203 /* a byte range locking function - return 0 on success
204    this functions locks/unlocks 1 byte at the specified offset.
205
206    On error, errno is also set so that errors are passed back properly
207    through tdb_open(). */
208 static int tdb_brlock(TDB_CONTEXT *tdb, tdb_off offset, 
209                       int rw_type, int lck_type, int probe)
210 {
211         struct flock fl;
212         int ret;
213
214         if (tdb->flags & TDB_NOLOCK)
215                 return 0;
216         if ((rw_type == F_WRLCK) && (tdb->read_only)) {
217                 errno = EACCES;
218                 return -1;
219         }
220
221         fl.l_type = rw_type;
222         fl.l_whence = SEEK_SET;
223         fl.l_start = offset;
224         fl.l_len = 1;
225         fl.l_pid = 0;
226
227         do {
228                 ret = fcntl(tdb->fd,lck_type,&fl);
229                 if (ret == -1 && errno == EINTR && palarm_fired && *palarm_fired)
230                         break;
231         } while (ret == -1 && errno == EINTR);
232
233         if (ret == -1) {
234                 if (!probe && lck_type != F_SETLK) {
235                         /* Ensure error code is set for log fun to examine. */
236                         if (errno == EINTR && palarm_fired && *palarm_fired)
237                                 tdb->ecode = TDB_ERR_LOCK_TIMEOUT;
238                         else
239                                 tdb->ecode = TDB_ERR_LOCK;
240                         TDB_LOG((tdb, 5,"tdb_brlock failed (fd=%d) at offset %d rw_type=%d lck_type=%d\n", 
241                                  tdb->fd, offset, rw_type, lck_type));
242                 }
243                 /* Was it an alarm timeout ? */
244                 if (errno == EINTR && palarm_fired && *palarm_fired)
245                         return TDB_ERRCODE(TDB_ERR_LOCK_TIMEOUT, -1);
246                 /* Otherwise - generic lock error. */
247                 /* errno set by fcntl */
248                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
249         }
250         return 0;
251 }
252
253 /* lock a list in the database. list -1 is the alloc list */
254 static int tdb_lock(TDB_CONTEXT *tdb, int list, int ltype)
255 {
256         if (list < -1 || list >= (int)tdb->header.hash_size) {
257                 TDB_LOG((tdb, 0,"tdb_lock: invalid list %d for ltype=%d\n", 
258                            list, ltype));
259                 return -1;
260         }
261         if (tdb->flags & TDB_NOLOCK)
262                 return 0;
263
264         /* Since fcntl locks don't nest, we do a lock for the first one,
265            and simply bump the count for future ones */
266         if (tdb->locked[list+1].count == 0) {
267                 if (!tdb->read_only && tdb->header.rwlocks) {
268                         if (tdb_spinlock(tdb, list, ltype)) {
269                                 TDB_LOG((tdb, 0, "tdb_lock spinlock failed on list ltype=%d\n", 
270                                            list, ltype));
271                                 return -1;
272                         }
273                 } else if (tdb_brlock(tdb,FREELIST_TOP+4*list,ltype,F_SETLKW, 0)) {
274                         TDB_LOG((tdb, 0,"tdb_lock failed on list %d ltype=%d (%s)\n", 
275                                            list, ltype, strerror(errno)));
276                         return -1;
277                 }
278                 tdb->locked[list+1].ltype = ltype;
279         }
280         tdb->locked[list+1].count++;
281         return 0;
282 }
283
284 /* unlock the database: returns void because it's too late for errors. */
285         /* changed to return int it may be interesting to know there
286            has been an error  --simo */
287 static int tdb_unlock(TDB_CONTEXT *tdb, int list, int ltype)
288 {
289         int ret = -1;
290
291         if (tdb->flags & TDB_NOLOCK)
292                 return 0;
293
294         /* Sanity checks */
295         if (list < -1 || list >= (int)tdb->header.hash_size) {
296                 TDB_LOG((tdb, 0, "tdb_unlock: list %d invalid (%d)\n", list, tdb->header.hash_size));
297                 return ret;
298         }
299
300         if (tdb->locked[list+1].count==0) {
301                 TDB_LOG((tdb, 0, "tdb_unlock: count is 0\n"));
302                 return ret;
303         }
304
305         if (tdb->locked[list+1].count == 1) {
306                 /* Down to last nested lock: unlock underneath */
307                 if (!tdb->read_only && tdb->header.rwlocks) {
308                         ret = tdb_spinunlock(tdb, list, ltype);
309                 } else {
310                         ret = tdb_brlock(tdb, FREELIST_TOP+4*list, F_UNLCK, F_SETLKW, 0);
311                 }
312         } else {
313                 ret = 0;
314         }
315         tdb->locked[list+1].count--;
316
317         if (ret)
318                 TDB_LOG((tdb, 0,"tdb_unlock: An error occurred unlocking!\n")); 
319         return ret;
320 }
321
322 /* This is based on the hash algorithm from gdbm */
323 static u32 tdb_hash(TDB_DATA *key)
324 {
325         u32 value;      /* Used to compute the hash value.  */
326         u32   i;        /* Used to cycle through random values. */
327
328         /* Set the initial value from the key size. */
329         for (value = 0x238F13AF * key->dsize, i=0; i < key->dsize; i++)
330                 value = (value + (key->dptr[i] << (i*5 % 24)));
331
332         return (1103515243 * value + 12345);  
333 }
334
335 /* check for an out of bounds access - if it is out of bounds then
336    see if the database has been expanded by someone else and expand
337    if necessary 
338    note that "len" is the minimum length needed for the db
339 */
340 static int tdb_oob(TDB_CONTEXT *tdb, tdb_off len, int probe)
341 {
342         struct stat st;
343         if (len <= tdb->map_size)
344                 return 0;
345         if (tdb->flags & TDB_INTERNAL) {
346                 if (!probe) {
347                         /* Ensure ecode is set for log fn. */
348                         tdb->ecode = TDB_ERR_IO;
349                         TDB_LOG((tdb, 0,"tdb_oob len %d beyond internal malloc size %d\n",
350                                  (int)len, (int)tdb->map_size));
351                 }
352                 return TDB_ERRCODE(TDB_ERR_IO, -1);
353         }
354
355         if (fstat(tdb->fd, &st) == -1)
356                 return TDB_ERRCODE(TDB_ERR_IO, -1);
357
358         if (st.st_size < (size_t)len) {
359                 if (!probe) {
360                         /* Ensure ecode is set for log fn. */
361                         tdb->ecode = TDB_ERR_IO;
362                         TDB_LOG((tdb, 0,"tdb_oob len %d beyond eof at %d\n",
363                                  (int)len, (int)st.st_size));
364                 }
365                 return TDB_ERRCODE(TDB_ERR_IO, -1);
366         }
367
368         /* Unmap, update size, remap */
369         if (tdb_munmap(tdb) == -1)
370                 return TDB_ERRCODE(TDB_ERR_IO, -1);
371         tdb->map_size = st.st_size;
372         tdb_mmap(tdb);
373         return 0;
374 }
375
376 /* write a lump of data at a specified offset */
377 static int tdb_write(TDB_CONTEXT *tdb, tdb_off off, void *buf, tdb_len len)
378 {
379         if (tdb_oob(tdb, off + len, 0) != 0)
380                 return -1;
381
382         if (tdb->map_ptr)
383                 memcpy(off + (char *)tdb->map_ptr, buf, len);
384 #ifdef HAVE_PWRITE
385         else if (pwrite(tdb->fd, buf, len, off) != (ssize_t)len) {
386 #else
387         else if (lseek(tdb->fd, off, SEEK_SET) != off
388                  || write(tdb->fd, buf, len) != (ssize_t)len) {
389 #endif
390                 /* Ensure ecode is set for log fn. */
391                 tdb->ecode = TDB_ERR_IO;
392                 TDB_LOG((tdb, 0,"tdb_write failed at %d len=%d (%s)\n",
393                            off, len, strerror(errno)));
394                 return TDB_ERRCODE(TDB_ERR_IO, -1);
395         }
396         return 0;
397 }
398
399 /* read a lump of data at a specified offset, maybe convert */
400 static int tdb_read(TDB_CONTEXT *tdb,tdb_off off,void *buf,tdb_len len,int cv)
401 {
402         if (tdb_oob(tdb, off + len, 0) != 0)
403                 return -1;
404
405         if (tdb->map_ptr)
406                 memcpy(buf, off + (char *)tdb->map_ptr, len);
407 #ifdef HAVE_PREAD
408         else if (pread(tdb->fd, buf, len, off) != (ssize_t)len) {
409 #else
410         else if (lseek(tdb->fd, off, SEEK_SET) != off
411                  || read(tdb->fd, buf, len) != (ssize_t)len) {
412 #endif
413                 /* Ensure ecode is set for log fn. */
414                 tdb->ecode = TDB_ERR_IO;
415                 TDB_LOG((tdb, 0,"tdb_read failed at %d len=%d (%s)\n",
416                            off, len, strerror(errno)));
417                 return TDB_ERRCODE(TDB_ERR_IO, -1);
418         }
419         if (cv)
420                 convert(buf, len);
421         return 0;
422 }
423
424 /* read a lump of data, allocating the space for it */
425 static char *tdb_alloc_read(TDB_CONTEXT *tdb, tdb_off offset, tdb_len len)
426 {
427         char *buf;
428
429         if (!(buf = malloc(len))) {
430                 /* Ensure ecode is set for log fn. */
431                 tdb->ecode = TDB_ERR_OOM;
432                 TDB_LOG((tdb, 0,"tdb_alloc_read malloc failed len=%d (%s)\n",
433                            len, strerror(errno)));
434                 return TDB_ERRCODE(TDB_ERR_OOM, buf);
435         }
436         if (tdb_read(tdb, offset, buf, len, 0) == -1) {
437                 SAFE_FREE(buf);
438                 return NULL;
439         }
440         return buf;
441 }
442
443 /* read/write a tdb_off */
444 static int ofs_read(TDB_CONTEXT *tdb, tdb_off offset, tdb_off *d)
445 {
446         return tdb_read(tdb, offset, (char*)d, sizeof(*d), DOCONV());
447 }
448 static int ofs_write(TDB_CONTEXT *tdb, tdb_off offset, tdb_off *d)
449 {
450         tdb_off off = *d;
451         return tdb_write(tdb, offset, CONVERT(off), sizeof(*d));
452 }
453
454 /* read/write a record */
455 static int rec_read(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
456 {
457         if (tdb_read(tdb, offset, rec, sizeof(*rec),DOCONV()) == -1)
458                 return -1;
459         if (TDB_BAD_MAGIC(rec)) {
460                 /* Ensure ecode is set for log fn. */
461                 tdb->ecode = TDB_ERR_CORRUPT;
462                 TDB_LOG((tdb, 0,"rec_read bad magic 0x%x at offset=%d\n", rec->magic, offset));
463                 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
464         }
465         return tdb_oob(tdb, rec->next+sizeof(*rec), 0);
466 }
467 static int rec_write(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
468 {
469         struct list_struct r = *rec;
470         return tdb_write(tdb, offset, CONVERT(r), sizeof(r));
471 }
472
473 /* read a freelist record and check for simple errors */
474 static int rec_free_read(TDB_CONTEXT *tdb, tdb_off off, struct list_struct *rec)
475 {
476         if (tdb_read(tdb, off, rec, sizeof(*rec),DOCONV()) == -1)
477                 return -1;
478
479         if (rec->magic == TDB_MAGIC) {
480                 /* this happens when a app is showdown while deleting a record - we should
481                    not completely fail when this happens */
482                 TDB_LOG((tdb, 0,"rec_free_read non-free magic at offset=%d - fixing\n", 
483                          rec->magic, off));
484                 rec->magic = TDB_FREE_MAGIC;
485                 if (tdb_write(tdb, off, rec, sizeof(*rec)) == -1)
486                         return -1;
487         }
488
489         if (rec->magic != TDB_FREE_MAGIC) {
490                 /* Ensure ecode is set for log fn. */
491                 tdb->ecode = TDB_ERR_CORRUPT;
492                 TDB_LOG((tdb, 0,"rec_free_read bad magic 0x%x at offset=%d\n", 
493                            rec->magic, off));
494                 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
495         }
496         if (tdb_oob(tdb, rec->next+sizeof(*rec), 0) != 0)
497                 return -1;
498         return 0;
499 }
500
501 /* update a record tailer (must hold allocation lock) */
502 static int update_tailer(TDB_CONTEXT *tdb, tdb_off offset,
503                          const struct list_struct *rec)
504 {
505         tdb_off totalsize;
506
507         /* Offset of tailer from record header */
508         totalsize = sizeof(*rec) + rec->rec_len;
509         return ofs_write(tdb, offset + totalsize - sizeof(tdb_off),
510                          &totalsize);
511 }
512
513 static tdb_off tdb_dump_record(TDB_CONTEXT *tdb, tdb_off offset)
514 {
515         struct list_struct rec;
516         tdb_off tailer_ofs, tailer;
517
518         if (tdb_read(tdb, offset, (char *)&rec, sizeof(rec), DOCONV()) == -1) {
519                 printf("ERROR: failed to read record at %u\n", offset);
520                 return 0;
521         }
522
523         printf(" rec: offset=%u next=%d rec_len=%d key_len=%d data_len=%d full_hash=0x%x magic=0x%x\n",
524                offset, rec.next, rec.rec_len, rec.key_len, rec.data_len, rec.full_hash, rec.magic);
525
526         tailer_ofs = offset + sizeof(rec) + rec.rec_len - sizeof(tdb_off);
527         if (ofs_read(tdb, tailer_ofs, &tailer) == -1) {
528                 printf("ERROR: failed to read tailer at %u\n", tailer_ofs);
529                 return rec.next;
530         }
531
532         if (tailer != rec.rec_len + sizeof(rec)) {
533                 printf("ERROR: tailer does not match record! tailer=%u totalsize=%u\n",
534                                 (unsigned)tailer, (unsigned)(rec.rec_len + sizeof(rec)));
535         }
536         return rec.next;
537 }
538
539 static int tdb_dump_chain(TDB_CONTEXT *tdb, int i)
540 {
541         tdb_off rec_ptr, top;
542
543         top = TDB_HASH_TOP(i);
544
545         if (tdb_lock(tdb, i, F_WRLCK) != 0)
546                 return -1;
547
548         if (ofs_read(tdb, top, &rec_ptr) == -1)
549                 return tdb_unlock(tdb, i, F_WRLCK);
550
551         if (rec_ptr)
552                 printf("hash=%d\n", i);
553
554         while (rec_ptr) {
555                 rec_ptr = tdb_dump_record(tdb, rec_ptr);
556         }
557
558         return tdb_unlock(tdb, i, F_WRLCK);
559 }
560
561 void tdb_dump_all(TDB_CONTEXT *tdb)
562 {
563         int i;
564         for (i=0;i<tdb->header.hash_size;i++) {
565                 tdb_dump_chain(tdb, i);
566         }
567         printf("freelist:\n");
568         tdb_dump_chain(tdb, -1);
569 }
570
571 int tdb_printfreelist(TDB_CONTEXT *tdb)
572 {
573         int ret;
574         long total_free = 0;
575         tdb_off offset, rec_ptr;
576         struct list_struct rec;
577
578         if ((ret = tdb_lock(tdb, -1, F_WRLCK)) != 0)
579                 return ret;
580
581         offset = FREELIST_TOP;
582
583         /* read in the freelist top */
584         if (ofs_read(tdb, offset, &rec_ptr) == -1) {
585                 tdb_unlock(tdb, -1, F_WRLCK);
586                 return 0;
587         }
588
589         printf("freelist top=[0x%08x]\n", rec_ptr );
590         while (rec_ptr) {
591                 if (tdb_read(tdb, rec_ptr, (char *)&rec, sizeof(rec), DOCONV()) == -1) {
592                         tdb_unlock(tdb, -1, F_WRLCK);
593                         return -1;
594                 }
595
596                 if (rec.magic != TDB_FREE_MAGIC) {
597                         printf("bad magic 0x%08x in free list\n", rec.magic);
598                         tdb_unlock(tdb, -1, F_WRLCK);
599                         return -1;
600                 }
601
602                 printf("entry offset=[0x%08x], rec.rec_len = [0x%08x (%d)]\n", rec.next, rec.rec_len, rec.rec_len );
603                 total_free += rec.rec_len;
604
605                 /* move to the next record */
606                 rec_ptr = rec.next;
607         }
608         printf("total rec_len = [0x%08x (%d)]\n", (int)total_free, 
609                (int)total_free);
610
611         return tdb_unlock(tdb, -1, F_WRLCK);
612 }
613
614 /* Remove an element from the freelist.  Must have alloc lock. */
615 static int remove_from_freelist(TDB_CONTEXT *tdb, tdb_off off, tdb_off next)
616 {
617         tdb_off last_ptr, i;
618
619         /* read in the freelist top */
620         last_ptr = FREELIST_TOP;
621         while (ofs_read(tdb, last_ptr, &i) != -1 && i != 0) {
622                 if (i == off) {
623                         /* We've found it! */
624                         return ofs_write(tdb, last_ptr, &next);
625                 }
626                 /* Follow chain (next offset is at start of record) */
627                 last_ptr = i;
628         }
629         TDB_LOG((tdb, 0,"remove_from_freelist: not on list at off=%d\n", off));
630         return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
631 }
632
633 /* Add an element into the freelist. Merge adjacent records if
634    neccessary. */
635 static int tdb_free(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
636 {
637         tdb_off right, left;
638
639         /* Allocation and tailer lock */
640         if (tdb_lock(tdb, -1, F_WRLCK) != 0)
641                 return -1;
642
643         /* set an initial tailer, so if we fail we don't leave a bogus record */
644         if (update_tailer(tdb, offset, rec) != 0) {
645                 TDB_LOG((tdb, 0, "tdb_free: upfate_tailer failed!\n"));
646                 goto fail;
647         }
648
649         /* Look right first (I'm an Australian, dammit) */
650         right = offset + sizeof(*rec) + rec->rec_len;
651         if (right + sizeof(*rec) <= tdb->map_size) {
652                 struct list_struct r;
653
654                 if (tdb_read(tdb, right, &r, sizeof(r), DOCONV()) == -1) {
655                         TDB_LOG((tdb, 0, "tdb_free: right read failed at %u\n", right));
656                         goto left;
657                 }
658
659                 /* If it's free, expand to include it. */
660                 if (r.magic == TDB_FREE_MAGIC) {
661                         if (remove_from_freelist(tdb, right, r.next) == -1) {
662                                 TDB_LOG((tdb, 0, "tdb_free: right free failed at %u\n", right));
663                                 goto left;
664                         }
665                         rec->rec_len += sizeof(r) + r.rec_len;
666                 }
667         }
668
669 left:
670         /* Look left */
671         left = offset - sizeof(tdb_off);
672         if (left > TDB_HASH_TOP(tdb->header.hash_size-1)) {
673                 struct list_struct l;
674                 tdb_off leftsize;
675
676                 /* Read in tailer and jump back to header */
677                 if (ofs_read(tdb, left, &leftsize) == -1) {
678                         TDB_LOG((tdb, 0, "tdb_free: left offset read failed at %u\n", left));
679                         goto update;
680                 }
681                 left = offset - leftsize;
682
683                 /* Now read in record */
684                 if (tdb_read(tdb, left, &l, sizeof(l), DOCONV()) == -1) {
685                         TDB_LOG((tdb, 0, "tdb_free: left read failed at %u (%u)\n", left, leftsize));
686                         goto update;
687                 }
688
689                 /* If it's free, expand to include it. */
690                 if (l.magic == TDB_FREE_MAGIC) {
691                         if (remove_from_freelist(tdb, left, l.next) == -1) {
692                                 TDB_LOG((tdb, 0, "tdb_free: left free failed at %u\n", left));
693                                 goto update;
694                         } else {
695                                 offset = left;
696                                 rec->rec_len += leftsize;
697                         }
698                 }
699         }
700
701 update:
702         if (update_tailer(tdb, offset, rec) == -1) {
703                 TDB_LOG((tdb, 0, "tdb_free: update_tailer failed at %u\n", offset));
704                 goto fail;
705         }
706
707         /* Now, prepend to free list */
708         rec->magic = TDB_FREE_MAGIC;
709
710         if (ofs_read(tdb, FREELIST_TOP, &rec->next) == -1 ||
711             rec_write(tdb, offset, rec) == -1 ||
712             ofs_write(tdb, FREELIST_TOP, &offset) == -1) {
713                 TDB_LOG((tdb, 0, "tdb_free record write failed at offset=%d\n", offset));
714                 goto fail;
715         }
716
717         /* And we're done. */
718         tdb_unlock(tdb, -1, F_WRLCK);
719         return 0;
720
721  fail:
722         tdb_unlock(tdb, -1, F_WRLCK);
723         return -1;
724 }
725
726
727 /* expand a file.  we prefer to use ftruncate, as that is what posix
728   says to use for mmap expansion */
729 static int expand_file(TDB_CONTEXT *tdb, tdb_off size, tdb_off addition)
730 {
731         char buf[1024];
732 #if HAVE_FTRUNCATE_EXTEND
733         if (ftruncate(tdb->fd, size+addition) != 0) {
734                 TDB_LOG((tdb, 0, "expand_file ftruncate to %d failed (%s)\n", 
735                            size+addition, strerror(errno)));
736                 return -1;
737         }
738 #else
739         char b = 0;
740
741 #ifdef HAVE_PWRITE
742         if (pwrite(tdb->fd,  &b, 1, (size+addition) - 1) != 1) {
743 #else
744         if (lseek(tdb->fd, (size+addition) - 1, SEEK_SET) != (size+addition) - 1 || 
745             write(tdb->fd, &b, 1) != 1) {
746 #endif
747                 TDB_LOG((tdb, 0, "expand_file to %d failed (%s)\n", 
748                            size+addition, strerror(errno)));
749                 return -1;
750         }
751 #endif
752
753         /* now fill the file with something. This ensures that the file isn't sparse, which would be
754            very bad if we ran out of disk. This must be done with write, not via mmap */
755         memset(buf, 0x42, sizeof(buf));
756         while (addition) {
757                 int n = addition>sizeof(buf)?sizeof(buf):addition;
758 #ifdef HAVE_PWRITE
759                 int ret = pwrite(tdb->fd, buf, n, size);
760 #else
761                 int ret;
762                 if (lseek(tdb->fd, size, SEEK_SET) != size)
763                         return -1;
764                 ret = write(tdb->fd, buf, n);
765 #endif
766                 if (ret != n) {
767                         TDB_LOG((tdb, 0, "expand_file write of %d failed (%s)\n", 
768                                    n, strerror(errno)));
769                         return -1;
770                 }
771                 addition -= n;
772                 size += n;
773         }
774         return 0;
775 }
776
777
778 /* expand the database at least size bytes by expanding the underlying
779    file and doing the mmap again if necessary */
780 static int tdb_expand(TDB_CONTEXT *tdb, tdb_off size)
781 {
782         struct list_struct rec;
783         tdb_off offset;
784
785         if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
786                 TDB_LOG((tdb, 0, "lock failed in tdb_expand\n"));
787                 return -1;
788         }
789
790         /* must know about any previous expansions by another process */
791         tdb_oob(tdb, tdb->map_size + 1, 1);
792
793         /* always make room for at least 10 more records, and round
794            the database up to a multiple of TDB_PAGE_SIZE */
795         size = TDB_ALIGN(tdb->map_size + size*10, TDB_PAGE_SIZE) - tdb->map_size;
796
797         if (!(tdb->flags & TDB_INTERNAL))
798                 tdb_munmap(tdb);
799
800         /*
801          * We must ensure the file is unmapped before doing this
802          * to ensure consistency with systems like OpenBSD where
803          * writes and mmaps are not consistent.
804          */
805
806         /* expand the file itself */
807         if (!(tdb->flags & TDB_INTERNAL)) {
808                 if (expand_file(tdb, tdb->map_size, size) != 0)
809                         goto fail;
810         }
811
812         tdb->map_size += size;
813
814         if (tdb->flags & TDB_INTERNAL)
815                 tdb->map_ptr = realloc(tdb->map_ptr, tdb->map_size);
816         else {
817                 /*
818                  * We must ensure the file is remapped before adding the space
819                  * to ensure consistency with systems like OpenBSD where
820                  * writes and mmaps are not consistent.
821                  */
822
823                 /* We're ok if the mmap fails as we'll fallback to read/write */
824                 tdb_mmap(tdb);
825         }
826
827         /* form a new freelist record */
828         memset(&rec,'\0',sizeof(rec));
829         rec.rec_len = size - sizeof(rec);
830
831         /* link it into the free list */
832         offset = tdb->map_size - size;
833         if (tdb_free(tdb, offset, &rec) == -1)
834                 goto fail;
835
836         tdb_unlock(tdb, -1, F_WRLCK);
837         return 0;
838  fail:
839         tdb_unlock(tdb, -1, F_WRLCK);
840         return -1;
841 }
842
843 /* allocate some space from the free list. The offset returned points
844    to a unconnected list_struct within the database with room for at
845    least length bytes of total data
846
847    0 is returned if the space could not be allocated
848  */
849 static tdb_off tdb_allocate(TDB_CONTEXT *tdb, tdb_len length,
850                             struct list_struct *rec)
851 {
852         tdb_off rec_ptr, last_ptr, newrec_ptr;
853         struct list_struct newrec;
854
855         if (tdb_lock(tdb, -1, F_WRLCK) == -1)
856                 return 0;
857
858         /* Extra bytes required for tailer */
859         length += sizeof(tdb_off);
860
861  again:
862         last_ptr = FREELIST_TOP;
863
864         /* read in the freelist top */
865         if (ofs_read(tdb, FREELIST_TOP, &rec_ptr) == -1)
866                 goto fail;
867
868         /* keep looking until we find a freelist record big enough */
869         while (rec_ptr) {
870                 if (rec_free_read(tdb, rec_ptr, rec) == -1)
871                         goto fail;
872
873                 if (rec->rec_len >= length) {
874                         /* found it - now possibly split it up  */
875                         if (rec->rec_len > length + MIN_REC_SIZE) {
876                                 /* Length of left piece */
877                                 length = TDB_ALIGN(length, TDB_ALIGNMENT);
878
879                                 /* Right piece to go on free list */
880                                 newrec.rec_len = rec->rec_len
881                                         - (sizeof(*rec) + length);
882                                 newrec_ptr = rec_ptr + sizeof(*rec) + length;
883
884                                 /* And left record is shortened */
885                                 rec->rec_len = length;
886                         } else
887                                 newrec_ptr = 0;
888
889                         /* Remove allocated record from the free list */
890                         if (ofs_write(tdb, last_ptr, &rec->next) == -1)
891                                 goto fail;
892
893                         /* Update header: do this before we drop alloc
894                            lock, otherwise tdb_free() might try to
895                            merge with us, thinking we're free.
896                            (Thanks Jeremy Allison). */
897                         rec->magic = TDB_MAGIC;
898                         if (rec_write(tdb, rec_ptr, rec) == -1)
899                                 goto fail;
900
901                         /* Did we create new block? */
902                         if (newrec_ptr) {
903                                 /* Update allocated record tailer (we
904                                    shortened it). */
905                                 if (update_tailer(tdb, rec_ptr, rec) == -1)
906                                         goto fail;
907
908                                 /* Free new record */
909                                 if (tdb_free(tdb, newrec_ptr, &newrec) == -1)
910                                         goto fail;
911                         }
912
913                         /* all done - return the new record offset */
914                         tdb_unlock(tdb, -1, F_WRLCK);
915                         return rec_ptr;
916                 }
917                 /* move to the next record */
918                 last_ptr = rec_ptr;
919                 rec_ptr = rec->next;
920         }
921         /* we didn't find enough space. See if we can expand the
922            database and if we can then try again */
923         if (tdb_expand(tdb, length + sizeof(*rec)) == 0)
924                 goto again;
925  fail:
926         tdb_unlock(tdb, -1, F_WRLCK);
927         return 0;
928 }
929
930 /* initialise a new database with a specified hash size */
931 static int tdb_new_database(TDB_CONTEXT *tdb, int hash_size)
932 {
933         struct tdb_header *newdb;
934         int size, ret = -1;
935
936         /* We make it up in memory, then write it out if not internal */
937         size = sizeof(struct tdb_header) + (hash_size+1)*sizeof(tdb_off);
938         if (!(newdb = calloc(size, 1)))
939                 return TDB_ERRCODE(TDB_ERR_OOM, -1);
940
941         /* Fill in the header */
942         newdb->version = TDB_VERSION;
943         newdb->hash_size = hash_size;
944 #ifdef USE_SPINLOCKS
945         newdb->rwlocks = size;
946 #endif
947         if (tdb->flags & TDB_INTERNAL) {
948                 tdb->map_size = size;
949                 tdb->map_ptr = (char *)newdb;
950                 memcpy(&tdb->header, newdb, sizeof(tdb->header));
951                 /* Convert the `ondisk' version if asked. */
952                 CONVERT(*newdb);
953                 return 0;
954         }
955         if (lseek(tdb->fd, 0, SEEK_SET) == -1)
956                 goto fail;
957
958         if (ftruncate(tdb->fd, 0) == -1)
959                 goto fail;
960
961         /* This creates an endian-converted header, as if read from disk */
962         CONVERT(*newdb);
963         memcpy(&tdb->header, newdb, sizeof(tdb->header));
964         /* Don't endian-convert the magic food! */
965         memcpy(newdb->magic_food, TDB_MAGIC_FOOD, strlen(TDB_MAGIC_FOOD)+1);
966         if (write(tdb->fd, newdb, size) != size)
967                 ret = -1;
968         else
969                 ret = tdb_create_rwlocks(tdb->fd, hash_size);
970
971   fail:
972         SAFE_FREE(newdb);
973         return ret;
974 }
975
976 /* Returns 0 on fail.  On success, return offset of record, and fills
977    in rec */
978 static tdb_off tdb_find(TDB_CONTEXT *tdb, TDB_DATA key, u32 hash,
979                         struct list_struct *r)
980 {
981         tdb_off rec_ptr;
982         
983         /* read in the hash top */
984         if (ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
985                 return 0;
986
987         /* keep looking until we find the right record */
988         while (rec_ptr) {
989                 if (rec_read(tdb, rec_ptr, r) == -1)
990                         return 0;
991
992                 if (!TDB_DEAD(r) && hash==r->full_hash && key.dsize==r->key_len) {
993                         char *k;
994                         /* a very likely hit - read the key */
995                         k = tdb_alloc_read(tdb, rec_ptr + sizeof(*r), 
996                                            r->key_len);
997                         if (!k)
998                                 return 0;
999
1000                         if (memcmp(key.dptr, k, key.dsize) == 0) {
1001                                 SAFE_FREE(k);
1002                                 return rec_ptr;
1003                         }
1004                         SAFE_FREE(k);
1005                 }
1006                 rec_ptr = r->next;
1007         }
1008         return TDB_ERRCODE(TDB_ERR_NOEXIST, 0);
1009 }
1010
1011 /* If they do lockkeys, check that this hash is one they locked */
1012 static int tdb_keylocked(TDB_CONTEXT *tdb, u32 hash)
1013 {
1014         u32 i;
1015         if (!tdb->lockedkeys)
1016                 return 1;
1017         for (i = 0; i < tdb->lockedkeys[0]; i++)
1018                 if (tdb->lockedkeys[i+1] == hash)
1019                         return 1;
1020         return TDB_ERRCODE(TDB_ERR_NOLOCK, 0);
1021 }
1022
1023 /* As tdb_find, but if you succeed, keep the lock */
1024 static tdb_off tdb_find_lock(TDB_CONTEXT *tdb, TDB_DATA key, int locktype,
1025                              struct list_struct *rec)
1026 {
1027         u32 hash, rec_ptr;
1028
1029         hash = tdb_hash(&key);
1030         if (!tdb_keylocked(tdb, hash))
1031                 return 0;
1032         if (tdb_lock(tdb, BUCKET(hash), locktype) == -1)
1033                 return 0;
1034         if (!(rec_ptr = tdb_find(tdb, key, hash, rec)))
1035                 tdb_unlock(tdb, BUCKET(hash), locktype);
1036         return rec_ptr;
1037 }
1038
1039 enum TDB_ERROR tdb_error(TDB_CONTEXT *tdb)
1040 {
1041         return tdb->ecode;
1042 }
1043
1044 static struct tdb_errname {
1045         enum TDB_ERROR ecode; const char *estring;
1046 } emap[] = { {TDB_SUCCESS, "Success"},
1047              {TDB_ERR_CORRUPT, "Corrupt database"},
1048              {TDB_ERR_IO, "IO Error"},
1049              {TDB_ERR_LOCK, "Locking error"},
1050              {TDB_ERR_OOM, "Out of memory"},
1051              {TDB_ERR_EXISTS, "Record exists"},
1052              {TDB_ERR_NOLOCK, "Lock exists on other keys"},
1053              {TDB_ERR_NOEXIST, "Record does not exist"} };
1054
1055 /* Error string for the last tdb error */
1056 const char *tdb_errorstr(TDB_CONTEXT *tdb)
1057 {
1058         u32 i;
1059         for (i = 0; i < sizeof(emap) / sizeof(struct tdb_errname); i++)
1060                 if (tdb->ecode == emap[i].ecode)
1061                         return emap[i].estring;
1062         return "Invalid error code";
1063 }
1064
1065 /* update an entry in place - this only works if the new data size
1066    is <= the old data size and the key exists.
1067    on failure return -1.
1068 */
1069
1070 static int tdb_update(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA dbuf)
1071 {
1072         struct list_struct rec;
1073         tdb_off rec_ptr;
1074
1075         /* find entry */
1076         if (!(rec_ptr = tdb_find(tdb, key, tdb_hash(&key), &rec)))
1077                 return -1;
1078
1079         /* must be long enough key, data and tailer */
1080         if (rec.rec_len < key.dsize + dbuf.dsize + sizeof(tdb_off)) {
1081                 tdb->ecode = TDB_SUCCESS; /* Not really an error */
1082                 return -1;
1083         }
1084
1085         if (tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len,
1086                       dbuf.dptr, dbuf.dsize) == -1)
1087                 return -1;
1088
1089         if (dbuf.dsize != rec.data_len) {
1090                 /* update size */
1091                 rec.data_len = dbuf.dsize;
1092                 return rec_write(tdb, rec_ptr, &rec);
1093         }
1094  
1095         return 0;
1096 }
1097
1098 /* find an entry in the database given a key */
1099 /* If an entry doesn't exist tdb_err will be set to
1100  * TDB_ERR_NOEXIST. If a key has no data attached
1101  * tdb_err will not be set. Both will return a
1102  * zero pptr and zero dsize.
1103  */
1104
1105 TDB_DATA tdb_fetch(TDB_CONTEXT *tdb, TDB_DATA key)
1106 {
1107         tdb_off rec_ptr;
1108         struct list_struct rec;
1109         TDB_DATA ret;
1110
1111         /* find which hash bucket it is in */
1112         if (!(rec_ptr = tdb_find_lock(tdb,key,F_RDLCK,&rec)))
1113                 return tdb_null;
1114
1115         if (rec.data_len)
1116                 ret.dptr = tdb_alloc_read(tdb, rec_ptr + sizeof(rec) + rec.key_len,
1117                                           rec.data_len);
1118         else
1119                 ret.dptr = NULL;
1120         ret.dsize = rec.data_len;
1121         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
1122         return ret;
1123 }
1124
1125 /* check if an entry in the database exists 
1126
1127    note that 1 is returned if the key is found and 0 is returned if not found
1128    this doesn't match the conventions in the rest of this module, but is
1129    compatible with gdbm
1130 */
1131 int tdb_exists(TDB_CONTEXT *tdb, TDB_DATA key)
1132 {
1133         struct list_struct rec;
1134         
1135         if (tdb_find_lock(tdb, key, F_RDLCK, &rec) == 0)
1136                 return 0;
1137         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
1138         return 1;
1139 }
1140
1141 /* record lock stops delete underneath */
1142 static int lock_record(TDB_CONTEXT *tdb, tdb_off off)
1143 {
1144         return off ? tdb_brlock(tdb, off, F_RDLCK, F_SETLKW, 0) : 0;
1145 }
1146 /*
1147   Write locks override our own fcntl readlocks, so check it here.
1148   Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
1149   an error to fail to get the lock here.
1150 */
1151  
1152 static int write_lock_record(TDB_CONTEXT *tdb, tdb_off off)
1153 {
1154         struct tdb_traverse_lock *i;
1155         for (i = &tdb->travlocks; i; i = i->next)
1156                 if (i->off == off)
1157                         return -1;
1158         return tdb_brlock(tdb, off, F_WRLCK, F_SETLK, 1);
1159 }
1160
1161 /*
1162   Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
1163   an error to fail to get the lock here.
1164 */
1165
1166 static int write_unlock_record(TDB_CONTEXT *tdb, tdb_off off)
1167 {
1168         return tdb_brlock(tdb, off, F_UNLCK, F_SETLK, 0);
1169 }
1170 /* fcntl locks don't stack: avoid unlocking someone else's */
1171 static int unlock_record(TDB_CONTEXT *tdb, tdb_off off)
1172 {
1173         struct tdb_traverse_lock *i;
1174         u32 count = 0;
1175
1176         if (off == 0)
1177                 return 0;
1178         for (i = &tdb->travlocks; i; i = i->next)
1179                 if (i->off == off)
1180                         count++;
1181         return (count == 1 ? tdb_brlock(tdb, off, F_UNLCK, F_SETLKW, 0) : 0);
1182 }
1183
1184 /* actually delete an entry in the database given the offset */
1185 static int do_delete(TDB_CONTEXT *tdb, tdb_off rec_ptr, struct list_struct*rec)
1186 {
1187         tdb_off last_ptr, i;
1188         struct list_struct lastrec;
1189
1190         if (tdb->read_only) return -1;
1191
1192         if (write_lock_record(tdb, rec_ptr) == -1) {
1193                 /* Someone traversing here: mark it as dead */
1194                 rec->magic = TDB_DEAD_MAGIC;
1195                 return rec_write(tdb, rec_ptr, rec);
1196         }
1197         if (write_unlock_record(tdb, rec_ptr) != 0)
1198                 return -1;
1199
1200         /* find previous record in hash chain */
1201         if (ofs_read(tdb, TDB_HASH_TOP(rec->full_hash), &i) == -1)
1202                 return -1;
1203         for (last_ptr = 0; i != rec_ptr; last_ptr = i, i = lastrec.next)
1204                 if (rec_read(tdb, i, &lastrec) == -1)
1205                         return -1;
1206
1207         /* unlink it: next ptr is at start of record. */
1208         if (last_ptr == 0)
1209                 last_ptr = TDB_HASH_TOP(rec->full_hash);
1210         if (ofs_write(tdb, last_ptr, &rec->next) == -1)
1211                 return -1;
1212
1213         /* recover the space */
1214         if (tdb_free(tdb, rec_ptr, rec) == -1)
1215                 return -1;
1216         return 0;
1217 }
1218
1219 /* Uses traverse lock: 0 = finish, -1 = error, other = record offset */
1220 static int tdb_next_lock(TDB_CONTEXT *tdb, struct tdb_traverse_lock *tlock,
1221                          struct list_struct *rec)
1222 {
1223         int want_next = (tlock->off != 0);
1224
1225         /* No traversal allows if you've called tdb_lockkeys() */
1226         if (tdb->lockedkeys)
1227                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1228
1229         /* Lock each chain from the start one. */
1230         for (; tlock->hash < tdb->header.hash_size; tlock->hash++) {
1231                 if (tdb_lock(tdb, tlock->hash, F_WRLCK) == -1)
1232                         return -1;
1233
1234                 /* No previous record?  Start at top of chain. */
1235                 if (!tlock->off) {
1236                         if (ofs_read(tdb, TDB_HASH_TOP(tlock->hash),
1237                                      &tlock->off) == -1)
1238                                 goto fail;
1239                 } else {
1240                         /* Otherwise unlock the previous record. */
1241                         if (unlock_record(tdb, tlock->off) != 0)
1242                                 goto fail;
1243                 }
1244
1245                 if (want_next) {
1246                         /* We have offset of old record: grab next */
1247                         if (rec_read(tdb, tlock->off, rec) == -1)
1248                                 goto fail;
1249                         tlock->off = rec->next;
1250                 }
1251
1252                 /* Iterate through chain */
1253                 while( tlock->off) {
1254                         tdb_off current;
1255                         if (rec_read(tdb, tlock->off, rec) == -1)
1256                                 goto fail;
1257                         if (!TDB_DEAD(rec)) {
1258                                 /* Woohoo: we found one! */
1259                                 if (lock_record(tdb, tlock->off) != 0)
1260                                         goto fail;
1261                                 return tlock->off;
1262                         }
1263                         /* Try to clean dead ones from old traverses */
1264                         current = tlock->off;
1265                         tlock->off = rec->next;
1266                         if (!tdb->read_only && 
1267                             do_delete(tdb, current, rec) != 0)
1268                                 goto fail;
1269                 }
1270                 tdb_unlock(tdb, tlock->hash, F_WRLCK);
1271                 want_next = 0;
1272         }
1273         /* We finished iteration without finding anything */
1274         return TDB_ERRCODE(TDB_SUCCESS, 0);
1275
1276  fail:
1277         tlock->off = 0;
1278         if (tdb_unlock(tdb, tlock->hash, F_WRLCK) != 0)
1279                 TDB_LOG((tdb, 0, "tdb_next_lock: On error unlock failed!\n"));
1280         return -1;
1281 }
1282
1283 /* traverse the entire database - calling fn(tdb, key, data) on each element.
1284    return -1 on error or the record count traversed
1285    if fn is NULL then it is not called
1286    a non-zero return value from fn() indicates that the traversal should stop
1287   */
1288 int tdb_traverse(TDB_CONTEXT *tdb, tdb_traverse_func fn, void *state)
1289 {
1290         TDB_DATA key, dbuf;
1291         struct list_struct rec;
1292         struct tdb_traverse_lock tl = { NULL, 0, 0 };
1293         int ret, count = 0;
1294
1295         /* This was in the initializaton, above, but the IRIX compiler
1296          * did not like it.  crh
1297          */
1298         tl.next = tdb->travlocks.next;
1299
1300         /* fcntl locks don't stack: beware traverse inside traverse */
1301         tdb->travlocks.next = &tl;
1302
1303         /* tdb_next_lock places locks on the record returned, and its chain */
1304         while ((ret = tdb_next_lock(tdb, &tl, &rec)) > 0) {
1305                 count++;
1306                 /* now read the full record */
1307                 key.dptr = tdb_alloc_read(tdb, tl.off + sizeof(rec), 
1308                                           rec.key_len + rec.data_len);
1309                 if (!key.dptr) {
1310                         ret = -1;
1311                         if (tdb_unlock(tdb, tl.hash, F_WRLCK) != 0)
1312                                 goto out;
1313                         if (unlock_record(tdb, tl.off) != 0)
1314                                 TDB_LOG((tdb, 0, "tdb_traverse: key.dptr == NULL and unlock_record failed!\n"));
1315                         goto out;
1316                 }
1317                 key.dsize = rec.key_len;
1318                 dbuf.dptr = key.dptr + rec.key_len;
1319                 dbuf.dsize = rec.data_len;
1320
1321                 /* Drop chain lock, call out */
1322                 if (tdb_unlock(tdb, tl.hash, F_WRLCK) != 0) {
1323                         ret = -1;
1324                         goto out;
1325                 }
1326                 if (fn && fn(tdb, key, dbuf, state)) {
1327                         /* They want us to terminate traversal */
1328                         ret = count;
1329                         if (unlock_record(tdb, tl.off) != 0) {
1330                                 TDB_LOG((tdb, 0, "tdb_traverse: unlock_record failed!\n"));;
1331                                 ret = -1;
1332                         }
1333                         tdb->travlocks.next = tl.next;
1334                         SAFE_FREE(key.dptr);
1335                         return count;
1336                 }
1337                 SAFE_FREE(key.dptr);
1338         }
1339 out:
1340         tdb->travlocks.next = tl.next;
1341         if (ret < 0)
1342                 return -1;
1343         else
1344                 return count;
1345 }
1346
1347 /* find the first entry in the database and return its key */
1348 TDB_DATA tdb_firstkey(TDB_CONTEXT *tdb)
1349 {
1350         TDB_DATA key;
1351         struct list_struct rec;
1352
1353         /* release any old lock */
1354         if (unlock_record(tdb, tdb->travlocks.off) != 0)
1355                 return tdb_null;
1356         tdb->travlocks.off = tdb->travlocks.hash = 0;
1357
1358         if (tdb_next_lock(tdb, &tdb->travlocks, &rec) <= 0)
1359                 return tdb_null;
1360         /* now read the key */
1361         key.dsize = rec.key_len;
1362         key.dptr =tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),key.dsize);
1363         if (tdb_unlock(tdb, BUCKET(tdb->travlocks.hash), F_WRLCK) != 0)
1364                 TDB_LOG((tdb, 0, "tdb_firstkey: error occurred while tdb_unlocking!\n"));
1365         return key;
1366 }
1367
1368 /* find the next entry in the database, returning its key */
1369 TDB_DATA tdb_nextkey(TDB_CONTEXT *tdb, TDB_DATA oldkey)
1370 {
1371         u32 oldhash;
1372         TDB_DATA key = tdb_null;
1373         struct list_struct rec;
1374         char *k = NULL;
1375
1376         /* Is locked key the old key?  If so, traverse will be reliable. */
1377         if (tdb->travlocks.off) {
1378                 if (tdb_lock(tdb,tdb->travlocks.hash,F_WRLCK))
1379                         return tdb_null;
1380                 if (rec_read(tdb, tdb->travlocks.off, &rec) == -1
1381                     || !(k = tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),
1382                                             rec.key_len))
1383                     || memcmp(k, oldkey.dptr, oldkey.dsize) != 0) {
1384                         /* No, it wasn't: unlock it and start from scratch */
1385                         if (unlock_record(tdb, tdb->travlocks.off) != 0)
1386                                 return tdb_null;
1387                         if (tdb_unlock(tdb, tdb->travlocks.hash, F_WRLCK) != 0)
1388                                 return tdb_null;
1389                         tdb->travlocks.off = 0;
1390                 }
1391
1392                 SAFE_FREE(k);
1393         }
1394
1395         if (!tdb->travlocks.off) {
1396                 /* No previous element: do normal find, and lock record */
1397                 tdb->travlocks.off = tdb_find_lock(tdb, oldkey, F_WRLCK, &rec);
1398                 if (!tdb->travlocks.off)
1399                         return tdb_null;
1400                 tdb->travlocks.hash = BUCKET(rec.full_hash);
1401                 if (lock_record(tdb, tdb->travlocks.off) != 0) {
1402                         TDB_LOG((tdb, 0, "tdb_nextkey: lock_record failed (%s)!\n", strerror(errno)));
1403                         return tdb_null;
1404                 }
1405         }
1406         oldhash = tdb->travlocks.hash;
1407
1408         /* Grab next record: locks chain and returned record,
1409            unlocks old record */
1410         if (tdb_next_lock(tdb, &tdb->travlocks, &rec) > 0) {
1411                 key.dsize = rec.key_len;
1412                 key.dptr = tdb_alloc_read(tdb, tdb->travlocks.off+sizeof(rec),
1413                                           key.dsize);
1414                 /* Unlock the chain of this new record */
1415                 if (tdb_unlock(tdb, tdb->travlocks.hash, F_WRLCK) != 0)
1416                         TDB_LOG((tdb, 0, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
1417         }
1418         /* Unlock the chain of old record */
1419         if (tdb_unlock(tdb, BUCKET(oldhash), F_WRLCK) != 0)
1420                 TDB_LOG((tdb, 0, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
1421         return key;
1422 }
1423
1424 /* delete an entry in the database given a key */
1425 int tdb_delete(TDB_CONTEXT *tdb, TDB_DATA key)
1426 {
1427         tdb_off rec_ptr;
1428         struct list_struct rec;
1429         int ret;
1430
1431         if (!(rec_ptr = tdb_find_lock(tdb, key, F_WRLCK, &rec)))
1432                 return -1;
1433         ret = do_delete(tdb, rec_ptr, &rec);
1434         if (tdb_unlock(tdb, BUCKET(rec.full_hash), F_WRLCK) != 0)
1435                 TDB_LOG((tdb, 0, "tdb_delete: WARNING tdb_unlock failed!\n"));
1436         return ret;
1437 }
1438
1439 /* store an element in the database, replacing any existing element
1440    with the same key 
1441
1442    return 0 on success, -1 on failure
1443 */
1444 int tdb_store(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
1445 {
1446         struct list_struct rec;
1447         u32 hash;
1448         tdb_off rec_ptr;
1449         char *p = NULL;
1450         int ret = 0;
1451
1452         /* find which hash bucket it is in */
1453         hash = tdb_hash(&key);
1454         if (!tdb_keylocked(tdb, hash))
1455                 return -1;
1456         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
1457                 return -1;
1458
1459         /* check for it existing, on insert. */
1460         if (flag == TDB_INSERT) {
1461                 if (tdb_exists(tdb, key)) {
1462                         tdb->ecode = TDB_ERR_EXISTS;
1463                         goto fail;
1464                 }
1465         } else {
1466                 /* first try in-place update, on modify or replace. */
1467                 if (tdb_update(tdb, key, dbuf) == 0)
1468                         goto out;
1469                 if (flag == TDB_MODIFY && tdb->ecode == TDB_ERR_NOEXIST)
1470                         goto fail;
1471         }
1472         /* reset the error code potentially set by the tdb_update() */
1473         tdb->ecode = TDB_SUCCESS;
1474
1475         /* delete any existing record - if it doesn't exist we don't
1476            care.  Doing this first reduces fragmentation, and avoids
1477            coalescing with `allocated' block before it's updated. */
1478         if (flag != TDB_INSERT)
1479                 tdb_delete(tdb, key);
1480
1481         /* Copy key+value *before* allocating free space in case malloc
1482            fails and we are left with a dead spot in the tdb. */
1483
1484         if (!(p = (char *)malloc(key.dsize + dbuf.dsize))) {
1485                 tdb->ecode = TDB_ERR_OOM;
1486                 goto fail;
1487         }
1488
1489         memcpy(p, key.dptr, key.dsize);
1490         if (dbuf.dsize)
1491                 memcpy(p+key.dsize, dbuf.dptr, dbuf.dsize);
1492
1493         /* now we're into insert / modify / replace of a record which
1494          * we know could not be optimised by an in-place store (for
1495          * various reasons).  */
1496         if (!(rec_ptr = tdb_allocate(tdb, key.dsize + dbuf.dsize, &rec)))
1497                 goto fail;
1498
1499         /* Read hash top into next ptr */
1500         if (ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
1501                 goto fail;
1502
1503         rec.key_len = key.dsize;
1504         rec.data_len = dbuf.dsize;
1505         rec.full_hash = hash;
1506         rec.magic = TDB_MAGIC;
1507
1508         /* write out and point the top of the hash chain at it */
1509         if (rec_write(tdb, rec_ptr, &rec) == -1
1510             || tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+dbuf.dsize)==-1
1511             || ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
1512                 /* Need to tdb_unallocate() here */
1513                 goto fail;
1514         }
1515  out:
1516         SAFE_FREE(p); 
1517         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
1518         return ret;
1519 fail:
1520         ret = -1;
1521         goto out;
1522 }
1523
1524 /* Attempt to append data to an entry in place - this only works if the new data size
1525    is <= the old data size and the key exists.
1526    on failure return -1. Record must be locked before calling.
1527 */
1528 static int tdb_append_inplace(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA new_dbuf)
1529 {
1530         struct list_struct rec;
1531         tdb_off rec_ptr;
1532
1533         /* find entry */
1534         if (!(rec_ptr = tdb_find(tdb, key, tdb_hash(&key), &rec)))
1535                 return -1;
1536
1537         /* Append of 0 is always ok. */
1538         if (new_dbuf.dsize == 0)
1539                 return 0;
1540
1541         /* must be long enough for key, old data + new data and tailer */
1542         if (rec.rec_len < key.dsize + rec.data_len + new_dbuf.dsize + sizeof(tdb_off)) {
1543                 /* No room. */
1544                 tdb->ecode = TDB_SUCCESS; /* Not really an error */
1545                 return -1;
1546         }
1547
1548         if (tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len + rec.data_len,
1549                       new_dbuf.dptr, new_dbuf.dsize) == -1)
1550                 return -1;
1551
1552         /* update size */
1553         rec.data_len += new_dbuf.dsize;
1554         return rec_write(tdb, rec_ptr, &rec);
1555 }
1556
1557 /* Append to an entry. Create if not exist. */
1558
1559 int tdb_append(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA new_dbuf)
1560 {
1561         struct list_struct rec;
1562         u32 hash;
1563         tdb_off rec_ptr;
1564         char *p = NULL;
1565         int ret = 0;
1566         size_t new_data_size = 0;
1567
1568         /* find which hash bucket it is in */
1569         hash = tdb_hash(&key);
1570         if (!tdb_keylocked(tdb, hash))
1571                 return -1;
1572         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
1573                 return -1;
1574
1575         /* first try in-place. */
1576         if (tdb_append_inplace(tdb, key, new_dbuf) == 0)
1577                 goto out;
1578
1579         /* reset the error code potentially set by the tdb_append_inplace() */
1580         tdb->ecode = TDB_SUCCESS;
1581
1582         /* find entry */
1583         if (!(rec_ptr = tdb_find(tdb, key, hash, &rec))) {
1584                 if (tdb->ecode != TDB_ERR_NOEXIST)
1585                         goto fail;
1586
1587                 /* Not found - create. */
1588
1589                 ret = tdb_store(tdb, key, new_dbuf, TDB_INSERT);
1590                 goto out;
1591         }
1592
1593         new_data_size = rec.data_len + new_dbuf.dsize;
1594
1595         /* Copy key+old_value+value *before* allocating free space in case malloc
1596            fails and we are left with a dead spot in the tdb. */
1597
1598         if (!(p = (char *)malloc(key.dsize + new_data_size))) {
1599                 tdb->ecode = TDB_ERR_OOM;
1600                 goto fail;
1601         }
1602
1603         /* Copy the key in place. */
1604         memcpy(p, key.dptr, key.dsize);
1605
1606         /* Now read the old data into place. */
1607         if (rec.data_len &&
1608                 tdb_read(tdb, rec_ptr + sizeof(rec) + rec.key_len, p + key.dsize, rec.data_len, 0) == -1)
1609                         goto fail;
1610
1611         /* Finally append the new data. */
1612         if (new_dbuf.dsize)
1613                 memcpy(p+key.dsize+rec.data_len, new_dbuf.dptr, new_dbuf.dsize);
1614
1615         /* delete any existing record - if it doesn't exist we don't
1616            care.  Doing this first reduces fragmentation, and avoids
1617            coalescing with `allocated' block before it's updated. */
1618
1619         tdb_delete(tdb, key);
1620
1621         if (!(rec_ptr = tdb_allocate(tdb, key.dsize + new_data_size, &rec)))
1622                 goto fail;
1623
1624         /* Read hash top into next ptr */
1625         if (ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
1626                 goto fail;
1627
1628         rec.key_len = key.dsize;
1629         rec.data_len = new_data_size;
1630         rec.full_hash = hash;
1631         rec.magic = TDB_MAGIC;
1632
1633         /* write out and point the top of the hash chain at it */
1634         if (rec_write(tdb, rec_ptr, &rec) == -1
1635             || tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+new_data_size)==-1
1636             || ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
1637                 /* Need to tdb_unallocate() here */
1638                 goto fail;
1639         }
1640
1641  out:
1642         SAFE_FREE(p); 
1643         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
1644         return ret;
1645
1646 fail:
1647         ret = -1;
1648         goto out;
1649 }
1650
1651 static int tdb_already_open(dev_t device,
1652                             ino_t ino)
1653 {
1654         TDB_CONTEXT *i;
1655         
1656         for (i = tdbs; i; i = i->next) {
1657                 if (i->device == device && i->inode == ino) {
1658                         return 1;
1659                 }
1660         }
1661
1662         return 0;
1663 }
1664
1665 /* open the database, creating it if necessary 
1666
1667    The open_flags and mode are passed straight to the open call on the
1668    database file. A flags value of O_WRONLY is invalid. The hash size
1669    is advisory, use zero for a default value.
1670
1671    Return is NULL on error, in which case errno is also set.  Don't 
1672    try to call tdb_error or tdb_errname, just do strerror(errno).
1673
1674    @param name may be NULL for internal databases. */
1675 TDB_CONTEXT *tdb_open(const char *name, int hash_size, int tdb_flags,
1676                       int open_flags, mode_t mode)
1677 {
1678         return tdb_open_ex(name, hash_size, tdb_flags, open_flags, mode, NULL);
1679 }
1680
1681
1682 TDB_CONTEXT *tdb_open_ex(const char *name, int hash_size, int tdb_flags,
1683                          int open_flags, mode_t mode,
1684                          tdb_log_func log_fn)
1685 {
1686         TDB_CONTEXT *tdb;
1687         struct stat st;
1688         int rev = 0, locked;
1689         unsigned char *vp;
1690         u32 vertest;
1691
1692         if (!(tdb = calloc(1, sizeof *tdb))) {
1693                 /* Can't log this */
1694                 errno = ENOMEM;
1695                 goto fail;
1696         }
1697         tdb->fd = -1;
1698         tdb->name = NULL;
1699         tdb->map_ptr = NULL;
1700         tdb->lockedkeys = NULL;
1701         tdb->flags = tdb_flags;
1702         tdb->open_flags = open_flags;
1703         tdb->log_fn = log_fn;
1704         
1705         if ((open_flags & O_ACCMODE) == O_WRONLY) {
1706                 TDB_LOG((tdb, 0, "tdb_open_ex: can't open tdb %s write-only\n",
1707                          name));
1708                 errno = EINVAL;
1709                 goto fail;
1710         }
1711         
1712         if (hash_size == 0)
1713                 hash_size = DEFAULT_HASH_SIZE;
1714         if ((open_flags & O_ACCMODE) == O_RDONLY) {
1715                 tdb->read_only = 1;
1716                 /* read only databases don't do locking or clear if first */
1717                 tdb->flags |= TDB_NOLOCK;
1718                 tdb->flags &= ~TDB_CLEAR_IF_FIRST;
1719         }
1720
1721         /* internal databases don't mmap or lock, and start off cleared */
1722         if (tdb->flags & TDB_INTERNAL) {
1723                 tdb->flags |= (TDB_NOLOCK | TDB_NOMMAP);
1724                 tdb->flags &= ~TDB_CLEAR_IF_FIRST;
1725                 if (tdb_new_database(tdb, hash_size) != 0) {
1726                         TDB_LOG((tdb, 0, "tdb_open_ex: tdb_new_database failed!"));
1727                         goto fail;
1728                 }
1729                 goto internal;
1730         }
1731
1732         if ((tdb->fd = open(name, open_flags, mode)) == -1) {
1733                 TDB_LOG((tdb, 5, "tdb_open_ex: could not open file %s: %s\n",
1734                          name, strerror(errno)));
1735                 goto fail;      /* errno set by open(2) */
1736         }
1737
1738         /* ensure there is only one process initialising at once */
1739         if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0) == -1) {
1740                 TDB_LOG((tdb, 0, "tdb_open_ex: failed to get global lock on %s: %s\n",
1741                          name, strerror(errno)));
1742                 goto fail;      /* errno set by tdb_brlock */
1743         }
1744
1745         /* we need to zero database if we are the only one with it open */
1746         if ((locked = (tdb_brlock(tdb, ACTIVE_LOCK, F_WRLCK, F_SETLK, 0) == 0))
1747             && (tdb_flags & TDB_CLEAR_IF_FIRST)) {
1748                 open_flags |= O_CREAT;
1749                 if (ftruncate(tdb->fd, 0) == -1) {
1750                         TDB_LOG((tdb, 0, "tdb_open_ex: "
1751                                  "failed to truncate %s: %s\n",
1752                                  name, strerror(errno)));
1753                         goto fail; /* errno set by ftruncate */
1754                 }
1755         }
1756
1757         if (read(tdb->fd, &tdb->header, sizeof(tdb->header)) != sizeof(tdb->header)
1758             || strcmp(tdb->header.magic_food, TDB_MAGIC_FOOD) != 0
1759             || (tdb->header.version != TDB_VERSION
1760                 && !(rev = (tdb->header.version==TDB_BYTEREV(TDB_VERSION))))) {
1761                 /* its not a valid database - possibly initialise it */
1762                 if (!(open_flags & O_CREAT) || tdb_new_database(tdb, hash_size) == -1) {
1763                         errno = EIO; /* ie bad format or something */
1764                         goto fail;
1765                 }
1766                 rev = (tdb->flags & TDB_CONVERT);
1767         }
1768         vp = (unsigned char *)&tdb->header.version;
1769         vertest = (((u32)vp[0]) << 24) | (((u32)vp[1]) << 16) |
1770                   (((u32)vp[2]) << 8) | (u32)vp[3];
1771         tdb->flags |= (vertest==TDB_VERSION) ? TDB_BIGENDIAN : 0;
1772         if (!rev)
1773                 tdb->flags &= ~TDB_CONVERT;
1774         else {
1775                 tdb->flags |= TDB_CONVERT;
1776                 convert(&tdb->header, sizeof(tdb->header));
1777         }
1778         if (fstat(tdb->fd, &st) == -1)
1779                 goto fail;
1780
1781         /* Is it already in the open list?  If so, fail. */
1782         if (tdb_already_open(st.st_dev, st.st_ino)) {
1783                 TDB_LOG((tdb, 2, "tdb_open_ex: "
1784                          "%s (%d,%d) is already open in this process\n",
1785                          name, st.st_dev, st.st_ino));
1786                 errno = EBUSY;
1787                 goto fail;
1788         }
1789
1790         if (!(tdb->name = (char *)strdup(name))) {
1791                 errno = ENOMEM;
1792                 goto fail;
1793         }
1794
1795         tdb->map_size = st.st_size;
1796         tdb->device = st.st_dev;
1797         tdb->inode = st.st_ino;
1798         tdb->locked = calloc(tdb->header.hash_size+1, sizeof(tdb->locked[0]));
1799         if (!tdb->locked) {
1800                 TDB_LOG((tdb, 2, "tdb_open_ex: "
1801                          "failed to allocate lock structure for %s\n",
1802                          name));
1803                 errno = ENOMEM;
1804                 goto fail;
1805         }
1806         tdb_mmap(tdb);
1807         if (locked) {
1808                 if (!tdb->read_only)
1809                         if (tdb_clear_spinlocks(tdb) != 0) {
1810                                 TDB_LOG((tdb, 0, "tdb_open_ex: "
1811                                 "failed to clear spinlock\n"));
1812                                 goto fail;
1813                         }
1814                 if (tdb_brlock(tdb, ACTIVE_LOCK, F_UNLCK, F_SETLK, 0) == -1) {
1815                         TDB_LOG((tdb, 0, "tdb_open_ex: "
1816                                  "failed to take ACTIVE_LOCK on %s: %s\n",
1817                                  name, strerror(errno)));
1818                         goto fail;
1819                 }
1820         }
1821         /* leave this lock in place to indicate it's in use */
1822         if (tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0) == -1)
1823                 goto fail;
1824
1825  internal:
1826         /* Internal (memory-only) databases skip all the code above to
1827          * do with disk files, and resume here by releasing their
1828          * global lock and hooking into the active list. */
1829         if (tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0) == -1)
1830                 goto fail;
1831         tdb->next = tdbs;
1832         tdbs = tdb;
1833         return tdb;
1834
1835  fail:
1836         { int save_errno = errno;
1837
1838         if (!tdb)
1839                 return NULL;
1840         
1841         if (tdb->map_ptr) {
1842                 if (tdb->flags & TDB_INTERNAL)
1843                         SAFE_FREE(tdb->map_ptr);
1844                 else
1845                         tdb_munmap(tdb);
1846         }
1847         SAFE_FREE(tdb->name);
1848         if (tdb->fd != -1)
1849                 if (close(tdb->fd) != 0)
1850                         TDB_LOG((tdb, 5, "tdb_open_ex: failed to close tdb->fd on error!\n"));
1851         SAFE_FREE(tdb->locked);
1852         SAFE_FREE(tdb);
1853         errno = save_errno;
1854         return NULL;
1855         }
1856 }
1857
1858 /**
1859  * Close a database.
1860  *
1861  * @returns -1 for error; 0 for success.
1862  **/
1863 int tdb_close(TDB_CONTEXT *tdb)
1864 {
1865         TDB_CONTEXT **i;
1866         int ret = 0;
1867
1868         if (tdb->map_ptr) {
1869                 if (tdb->flags & TDB_INTERNAL)
1870                         SAFE_FREE(tdb->map_ptr);
1871                 else
1872                         tdb_munmap(tdb);
1873         }
1874         SAFE_FREE(tdb->name);
1875         if (tdb->fd != -1)
1876                 ret = close(tdb->fd);
1877         SAFE_FREE(tdb->locked);
1878         SAFE_FREE(tdb->lockedkeys);
1879
1880         /* Remove from contexts list */
1881         for (i = &tdbs; *i; i = &(*i)->next) {
1882                 if (*i == tdb) {
1883                         *i = tdb->next;
1884                         break;
1885                 }
1886         }
1887
1888         memset(tdb, 0, sizeof(*tdb));
1889         SAFE_FREE(tdb);
1890
1891         return ret;
1892 }
1893
1894 /* lock/unlock entire database */
1895 int tdb_lockall(TDB_CONTEXT *tdb)
1896 {
1897         u32 i;
1898
1899         /* There are no locks on read-only dbs */
1900         if (tdb->read_only)
1901                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
1902         if (tdb->lockedkeys)
1903                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1904         for (i = 0; i < tdb->header.hash_size; i++) 
1905                 if (tdb_lock(tdb, i, F_WRLCK))
1906                         break;
1907
1908         /* If error, release locks we have... */
1909         if (i < tdb->header.hash_size) {
1910                 u32 j;
1911
1912                 for ( j = 0; j < i; j++)
1913                         tdb_unlock(tdb, j, F_WRLCK);
1914                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1915         }
1916
1917         return 0;
1918 }
1919 void tdb_unlockall(TDB_CONTEXT *tdb)
1920 {
1921         u32 i;
1922         for (i=0; i < tdb->header.hash_size; i++)
1923                 tdb_unlock(tdb, i, F_WRLCK);
1924 }
1925
1926 int tdb_lockkeys(TDB_CONTEXT *tdb, u32 number, TDB_DATA keys[])
1927 {
1928         u32 i, j, hash;
1929
1930         /* Can't lock more keys if already locked */
1931         if (tdb->lockedkeys)
1932                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1933         if (!(tdb->lockedkeys = malloc(sizeof(u32) * (number+1))))
1934                 return TDB_ERRCODE(TDB_ERR_OOM, -1);
1935         /* First number in array is # keys */
1936         tdb->lockedkeys[0] = number;
1937
1938         /* Insertion sort by bucket */
1939         for (i = 0; i < number; i++) {
1940                 hash = tdb_hash(&keys[i]);
1941                 for (j = 0; j < i && BUCKET(tdb->lockedkeys[j+1]) < BUCKET(hash); j++);
1942                         memmove(&tdb->lockedkeys[j+2], &tdb->lockedkeys[j+1], sizeof(u32) * (i-j));
1943                 tdb->lockedkeys[j+1] = hash;
1944         }
1945         /* Finally, lock in order */
1946         for (i = 0; i < number; i++)
1947                 if (tdb_lock(tdb, i, F_WRLCK))
1948                         break;
1949
1950         /* If error, release locks we have... */
1951         if (i < number) {
1952                 for ( j = 0; j < i; j++)
1953                         tdb_unlock(tdb, j, F_WRLCK);
1954                 SAFE_FREE(tdb->lockedkeys);
1955                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1956         }
1957         return 0;
1958 }
1959
1960 /* Unlock the keys previously locked by tdb_lockkeys() */
1961 void tdb_unlockkeys(TDB_CONTEXT *tdb)
1962 {
1963         u32 i;
1964         if (!tdb->lockedkeys)
1965                 return;
1966         for (i = 0; i < tdb->lockedkeys[0]; i++)
1967                 tdb_unlock(tdb, tdb->lockedkeys[i+1], F_WRLCK);
1968         SAFE_FREE(tdb->lockedkeys);
1969 }
1970
1971 /* lock/unlock one hash chain. This is meant to be used to reduce
1972    contention - it cannot guarantee how many records will be locked */
1973 int tdb_chainlock(TDB_CONTEXT *tdb, TDB_DATA key)
1974 {
1975         return tdb_lock(tdb, BUCKET(tdb_hash(&key)), F_WRLCK);
1976 }
1977
1978 int tdb_chainunlock(TDB_CONTEXT *tdb, TDB_DATA key)
1979 {
1980         return tdb_unlock(tdb, BUCKET(tdb_hash(&key)), F_WRLCK);
1981 }
1982
1983 int tdb_chainlock_read(TDB_CONTEXT *tdb, TDB_DATA key)
1984 {
1985         return tdb_lock(tdb, BUCKET(tdb_hash(&key)), F_RDLCK);
1986 }
1987
1988 int tdb_chainunlock_read(TDB_CONTEXT *tdb, TDB_DATA key)
1989 {
1990         return tdb_unlock(tdb, BUCKET(tdb_hash(&key)), F_RDLCK);
1991 }
1992
1993
1994 /* register a loging function */
1995 void tdb_logging_function(TDB_CONTEXT *tdb, void (*fn)(TDB_CONTEXT *, int , const char *, ...))
1996 {
1997         tdb->log_fn = fn;
1998 }
1999
2000
2001 /* reopen a tdb - this is used after a fork to ensure that we have an independent
2002    seek pointer from our parent and to re-establish locks */
2003 int tdb_reopen(TDB_CONTEXT *tdb)
2004 {
2005         struct stat st;
2006
2007         if (tdb_munmap(tdb) != 0) {
2008                 TDB_LOG((tdb, 0, "tdb_reopen: munmap failed (%s)\n", strerror(errno)));
2009                 goto fail;
2010         }
2011         if (close(tdb->fd) != 0)
2012                 TDB_LOG((tdb, 0, "tdb_reopen: WARNING closing tdb->fd failed!\n"));
2013         tdb->fd = open(tdb->name, tdb->open_flags & ~(O_CREAT|O_TRUNC), 0);
2014         if (tdb->fd == -1) {
2015                 TDB_LOG((tdb, 0, "tdb_reopen: open failed (%s)\n", strerror(errno)));
2016                 goto fail;
2017         }
2018         if (fstat(tdb->fd, &st) != 0) {
2019                 TDB_LOG((tdb, 0, "tdb_reopen: fstat failed (%s)\n", strerror(errno)));
2020                 goto fail;
2021         }
2022         if (st.st_ino != tdb->inode || st.st_dev != tdb->device) {
2023                 TDB_LOG((tdb, 0, "tdb_reopen: file dev/inode has changed!\n"));
2024                 goto fail;
2025         }
2026         tdb_mmap(tdb);
2027         if (tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0) == -1) {
2028                 TDB_LOG((tdb, 0, "tdb_reopen: failed to obtain active lock\n"));
2029                 goto fail;
2030         }
2031
2032         return 0;
2033
2034 fail:
2035         tdb_close(tdb);
2036         return -1;
2037 }
2038
2039 /* reopen all tdb's */
2040 int tdb_reopen_all(void)
2041 {
2042         TDB_CONTEXT *tdb;
2043
2044         for (tdb=tdbs; tdb; tdb = tdb->next) {
2045                 if (tdb_reopen(tdb) != 0) return -1;
2046         }
2047
2048         return 0;
2049 }