tdb.c revision 106ad96daee151064aa44a857f82ba10c8258b40
1/*
2URL: svn://svnanon.samba.org/samba/branches/SAMBA_4_0/source/lib/tdb
3Rev: 22080
4Last Changed: 2007-04-03 05:08:18 -0400
5*/
6 /*
7   trivial database library - standalone version
8
9   Copyright (C) Andrew Tridgell              1999-2005
10   Copyright (C) Jeremy Allison               2000-2006
11   Copyright (C) Paul `Rusty' Russell         2000
12
13     ** NOTE! The following LGPL license applies to the tdb
14     ** library. This does NOT imply that all of Samba is released
15     ** under the LGPL
16
17   This library is free software; you can redistribute it and/or
18   modify it under the terms of the GNU Lesser General Public
19   License as published by the Free Software Foundation; either
20   version 2 of the License, or (at your option) any later version.
21
22   This library is distributed in the hope that it will be useful,
23   but WITHOUT ANY WARRANTY; without even the implied warranty of
24   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
25   Lesser General Public License for more details.
26
27   You should have received a copy of the GNU Lesser General Public
28   License along with this library; if not, write to the Free Software
29   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
30*/
31
32#ifdef CONFIG_STAND_ALONE
33#define HAVE_MMAP
34#define HAVE_STRDUP
35#define HAVE_SYS_MMAN_H
36#define HAVE_UTIME_H
37#define HAVE_UTIME
38#endif
39#define _XOPEN_SOURCE 500
40
41#include <unistd.h>
42#include <stdio.h>
43#include <stdlib.h>
44#include <stdarg.h>
45#include <stddef.h>
46#include <errno.h>
47#include <string.h>
48#include <sys/select.h>
49#include <sys/time.h>
50#include <sys/types.h>
51#include <time.h>
52#ifdef HAVE_UTIME_H
53#include <utime.h>
54#endif
55#include <sys/stat.h>
56#include <sys/file.h>
57#include <fcntl.h>
58
59#ifdef HAVE_SYS_MMAN_H
60#include <sys/mman.h>
61#endif
62
63#ifndef MAP_FILE
64#define MAP_FILE 0
65#endif
66
67#ifndef MAP_FAILED
68#define MAP_FAILED ((void *)-1)
69#endif
70
71#ifndef HAVE_STRDUP
72#define strdup rep_strdup
73static char *rep_strdup(const char *s)
74{
75	char *ret;
76	int length;
77	if (!s)
78		return NULL;
79
80	if (!length)
81		length = strlen(s);
82
83	ret = malloc(length + 1);
84	if (ret) {
85		strncpy(ret, s, length);
86		ret[length] = '\0';
87	}
88	return ret;
89}
90#endif
91
92#ifndef PRINTF_ATTRIBUTE
93#if (__GNUC__ >= 3) && (__GNUC_MINOR__ >= 1 )
94/** Use gcc attribute to check printf fns.  a1 is the 1-based index of
95 * the parameter containing the format, and a2 the index of the first
96 * argument. Note that some gcc 2.x versions don't handle this
97 * properly **/
98#define PRINTF_ATTRIBUTE(a1, a2) __attribute__ ((format (__printf__, a1, a2)))
99#else
100#define PRINTF_ATTRIBUTE(a1, a2)
101#endif
102#endif
103
104#include "tdb.h"
105
106#ifndef u32
107#define u32 unsigned
108#endif
109
110#ifndef HAVE_GETPAGESIZE
111#define getpagesize() 0x2000
112#endif
113
114typedef u32 tdb_len_t;
115typedef u32 tdb_off_t;
116
117#ifndef offsetof
118#define offsetof(t,f) ((unsigned int)&((t *)0)->f)
119#endif
120
121#define TDB_MAGIC_FOOD "TDB file\n"
122#define TDB_VERSION (0x26011967 + 6)
123#define TDB_MAGIC (0x26011999U)
124#define TDB_FREE_MAGIC (~TDB_MAGIC)
125#define TDB_DEAD_MAGIC (0xFEE1DEAD)
126#define TDB_RECOVERY_MAGIC (0xf53bc0e7U)
127#define TDB_ALIGNMENT 4
128#define MIN_REC_SIZE (2*sizeof(struct list_struct) + TDB_ALIGNMENT)
129#define DEFAULT_HASH_SIZE 131
130#define FREELIST_TOP (sizeof(struct tdb_header))
131#define TDB_ALIGN(x,a) (((x) + (a)-1) & ~((a)-1))
132#define TDB_BYTEREV(x) (((((x)&0xff)<<24)|((x)&0xFF00)<<8)|(((x)>>8)&0xFF00)|((x)>>24))
133#define TDB_DEAD(r) ((r)->magic == TDB_DEAD_MAGIC)
134#define TDB_BAD_MAGIC(r) ((r)->magic != TDB_MAGIC && !TDB_DEAD(r))
135#define TDB_HASH_TOP(hash) (FREELIST_TOP + (BUCKET(hash)+1)*sizeof(tdb_off_t))
136#define TDB_HASHTABLE_SIZE(tdb) ((tdb->header.hash_size+1)*sizeof(tdb_off_t))
137#define TDB_DATA_START(hash_size) TDB_HASH_TOP(hash_size-1)
138#define TDB_RECOVERY_HEAD offsetof(struct tdb_header, recovery_start)
139#define TDB_SEQNUM_OFS    offsetof(struct tdb_header, sequence_number)
140#define TDB_PAD_BYTE 0x42
141#define TDB_PAD_U32  0x42424242
142
143/* NB assumes there is a local variable called "tdb" that is the
144 * current context, also takes doubly-parenthesized print-style
145 * argument. */
146#define TDB_LOG(x) tdb->log.log_fn x
147
148/* lock offsets */
149#define GLOBAL_LOCK      0
150#define ACTIVE_LOCK      4
151#define TRANSACTION_LOCK 8
152
153/* free memory if the pointer is valid and zero the pointer */
154#ifndef SAFE_FREE
155#define SAFE_FREE(x) do { if ((x) != NULL) {free(x); (x)=NULL;} } while(0)
156#endif
157
158#define BUCKET(hash) ((hash) % tdb->header.hash_size)
159
160#define DOCONV() (tdb->flags & TDB_CONVERT)
161#define CONVERT(x) (DOCONV() ? tdb_convert(&x, sizeof(x)) : &x)
162
163
164/* the body of the database is made of one list_struct for the free space
165   plus a separate data list for each hash value */
166struct list_struct {
167	tdb_off_t next; /* offset of the next record in the list */
168	tdb_len_t rec_len; /* total byte length of record */
169	tdb_len_t key_len; /* byte length of key */
170	tdb_len_t data_len; /* byte length of data */
171	u32 full_hash; /* the full 32 bit hash of the key */
172	u32 magic;   /* try to catch errors */
173	/* the following union is implied:
174		union {
175			char record[rec_len];
176			struct {
177				char key[key_len];
178				char data[data_len];
179			}
180			u32 totalsize; (tailer)
181		}
182	*/
183};
184
185
186/* this is stored at the front of every database */
187struct tdb_header {
188	char magic_food[32]; /* for /etc/magic */
189	u32 version; /* version of the code */
190	u32 hash_size; /* number of hash entries */
191	tdb_off_t rwlocks; /* obsolete - kept to detect old formats */
192	tdb_off_t recovery_start; /* offset of transaction recovery region */
193	tdb_off_t sequence_number; /* used when TDB_SEQNUM is set */
194	tdb_off_t reserved[29];
195};
196
197struct tdb_lock_type {
198	int list;
199	u32 count;
200	u32 ltype;
201};
202
203struct tdb_traverse_lock {
204	struct tdb_traverse_lock *next;
205	u32 off;
206	u32 hash;
207	int lock_rw;
208};
209
210
211struct tdb_methods {
212	int (*tdb_read)(struct tdb_context *, tdb_off_t , void *, tdb_len_t , int );
213	int (*tdb_write)(struct tdb_context *, tdb_off_t, const void *, tdb_len_t);
214	void (*next_hash_chain)(struct tdb_context *, u32 *);
215	int (*tdb_oob)(struct tdb_context *, tdb_off_t , int );
216	int (*tdb_expand_file)(struct tdb_context *, tdb_off_t , tdb_off_t );
217	int (*tdb_brlock)(struct tdb_context *, tdb_off_t , int, int, int, size_t);
218};
219
220struct tdb_context {
221	char *name; /* the name of the database */
222	void *map_ptr; /* where it is currently mapped */
223	int fd; /* open file descriptor for the database */
224	tdb_len_t map_size; /* how much space has been mapped */
225	int read_only; /* opened read-only */
226	int traverse_read; /* read-only traversal */
227	struct tdb_lock_type global_lock;
228	int num_lockrecs;
229	struct tdb_lock_type *lockrecs; /* only real locks, all with count>0 */
230	enum TDB_ERROR ecode; /* error code for last tdb error */
231	struct tdb_header header; /* a cached copy of the header */
232	u32 flags; /* the flags passed to tdb_open */
233	struct tdb_traverse_lock travlocks; /* current traversal locks */
234	struct tdb_context *next; /* all tdbs to avoid multiple opens */
235	dev_t device;	/* uniquely identifies this tdb */
236	ino_t inode;	/* uniquely identifies this tdb */
237	struct tdb_logging_context log;
238	unsigned int (*hash_fn)(TDB_DATA *key);
239	int open_flags; /* flags used in the open - needed by reopen */
240	unsigned int num_locks; /* number of chain locks held */
241	const struct tdb_methods *methods;
242	struct tdb_transaction *transaction;
243	int page_size;
244	int max_dead_records;
245};
246
247
248/*
249  internal prototypes
250*/
251static int tdb_munmap(struct tdb_context *tdb);
252static void tdb_mmap(struct tdb_context *tdb);
253static int tdb_lock(struct tdb_context *tdb, int list, int ltype);
254static int tdb_unlock(struct tdb_context *tdb, int list, int ltype);
255static int tdb_brlock(struct tdb_context *tdb, tdb_off_t offset, int rw_type, int lck_type, int probe, size_t len);
256static int tdb_brlock_upgrade(struct tdb_context *tdb, tdb_off_t offset, size_t len);
257static int tdb_write_lock_record(struct tdb_context *tdb, tdb_off_t off);
258static int tdb_write_unlock_record(struct tdb_context *tdb, tdb_off_t off);
259static int tdb_ofs_read(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
260static int tdb_ofs_write(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
261static void *tdb_convert(void *buf, u32 size);
262static int tdb_free(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec);
263static tdb_off_t tdb_allocate(struct tdb_context *tdb, tdb_len_t length, struct list_struct *rec);
264static int tdb_ofs_read(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
265static int tdb_ofs_write(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
266static int tdb_lock_record(struct tdb_context *tdb, tdb_off_t off);
267static int tdb_unlock_record(struct tdb_context *tdb, tdb_off_t off);
268static int tdb_rec_read(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec);
269static int tdb_rec_write(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec);
270static int tdb_do_delete(struct tdb_context *tdb, tdb_off_t rec_ptr, struct list_struct *rec);
271static unsigned char *tdb_alloc_read(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t len);
272static int tdb_parse_data(struct tdb_context *tdb, TDB_DATA key,
273		   tdb_off_t offset, tdb_len_t len,
274		   int (*parser)(TDB_DATA key, TDB_DATA data,
275				 void *private_data),
276		   void *private_data);
277static tdb_off_t tdb_find_lock_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash, int locktype,
278			   struct list_struct *rec);
279static void tdb_io_init(struct tdb_context *tdb);
280static int tdb_expand(struct tdb_context *tdb, tdb_off_t size);
281
282
283/* file: error.c */
284
285enum TDB_ERROR tdb_error(struct tdb_context *tdb)
286{
287	return tdb->ecode;
288}
289
290static struct tdb_errname {
291	enum TDB_ERROR ecode; const char *estring;
292} emap[] = { {TDB_SUCCESS, "Success"},
293	     {TDB_ERR_CORRUPT, "Corrupt database"},
294	     {TDB_ERR_IO, "IO Error"},
295	     {TDB_ERR_LOCK, "Locking error"},
296	     {TDB_ERR_OOM, "Out of memory"},
297	     {TDB_ERR_EXISTS, "Record exists"},
298	     {TDB_ERR_NOLOCK, "Lock exists on other keys"},
299	     {TDB_ERR_EINVAL, "Invalid parameter"},
300	     {TDB_ERR_NOEXIST, "Record does not exist"},
301	     {TDB_ERR_RDONLY, "write not permitted"} };
302
303/* Error string for the last tdb error */
304const char *tdb_errorstr(struct tdb_context *tdb)
305{
306	u32 i;
307	for (i = 0; i < sizeof(emap) / sizeof(struct tdb_errname); i++)
308		if (tdb->ecode == emap[i].ecode)
309			return emap[i].estring;
310	return "Invalid error code";
311}
312
313/* file: lock.c */
314
315/* a byte range locking function - return 0 on success
316   this functions locks/unlocks 1 byte at the specified offset.
317
318   On error, errno is also set so that errors are passed back properly
319   through tdb_open().
320
321   note that a len of zero means lock to end of file
322*/
323int tdb_brlock(struct tdb_context *tdb, tdb_off_t offset,
324	       int rw_type, int lck_type, int probe, size_t len)
325{
326	struct flock fl;
327	int ret;
328
329	if (tdb->flags & TDB_NOLOCK) {
330		return 0;
331	}
332
333	if ((rw_type == F_WRLCK) && (tdb->read_only || tdb->traverse_read)) {
334		tdb->ecode = TDB_ERR_RDONLY;
335		return -1;
336	}
337
338	fl.l_type = rw_type;
339	fl.l_whence = SEEK_SET;
340	fl.l_start = offset;
341	fl.l_len = len;
342	fl.l_pid = 0;
343
344	do {
345		ret = fcntl(tdb->fd,lck_type,&fl);
346	} while (ret == -1 && errno == EINTR);
347
348	if (ret == -1) {
349		/* Generic lock error. errno set by fcntl.
350		 * EAGAIN is an expected return from non-blocking
351		 * locks. */
352		if (!probe && lck_type != F_SETLK) {
353			/* Ensure error code is set for log fun to examine. */
354			tdb->ecode = TDB_ERR_LOCK;
355			TDB_LOG((tdb, TDB_DEBUG_TRACE,"tdb_brlock failed (fd=%d) at offset %d rw_type=%d lck_type=%d len=%d\n",
356				 tdb->fd, offset, rw_type, lck_type, (int)len));
357		}
358		return TDB_ERRCODE(TDB_ERR_LOCK, -1);
359	}
360	return 0;
361}
362
363
364/*
365  upgrade a read lock to a write lock. This needs to be handled in a
366  special way as some OSes (such as solaris) have too conservative
367  deadlock detection and claim a deadlock when progress can be
368  made. For those OSes we may loop for a while.
369*/
370int tdb_brlock_upgrade(struct tdb_context *tdb, tdb_off_t offset, size_t len)
371{
372	int count = 1000;
373	while (count--) {
374		struct timeval tv;
375		if (tdb_brlock(tdb, offset, F_WRLCK, F_SETLKW, 1, len) == 0) {
376			return 0;
377		}
378		if (errno != EDEADLK) {
379			break;
380		}
381		/* sleep for as short a time as we can - more portable than usleep() */
382		tv.tv_sec = 0;
383		tv.tv_usec = 1;
384		select(0, NULL, NULL, NULL, &tv);
385	}
386	TDB_LOG((tdb, TDB_DEBUG_TRACE,"tdb_brlock_upgrade failed at offset %d\n", offset));
387	return -1;
388}
389
390
391/* lock a list in the database. list -1 is the alloc list */
392int tdb_lock(struct tdb_context *tdb, int list, int ltype)
393{
394	struct tdb_lock_type *new_lck;
395	int i;
396
397	/* a global lock allows us to avoid per chain locks */
398	if (tdb->global_lock.count &&
399	    (ltype == tdb->global_lock.ltype || ltype == F_RDLCK)) {
400		return 0;
401	}
402
403	if (tdb->global_lock.count) {
404		return TDB_ERRCODE(TDB_ERR_LOCK, -1);
405	}
406
407	if (list < -1 || list >= (int)tdb->header.hash_size) {
408		TDB_LOG((tdb, TDB_DEBUG_ERROR,"tdb_lock: invalid list %d for ltype=%d\n",
409			   list, ltype));
410		return -1;
411	}
412	if (tdb->flags & TDB_NOLOCK)
413		return 0;
414
415	for (i=0; i<tdb->num_lockrecs; i++) {
416		if (tdb->lockrecs[i].list == list) {
417			if (tdb->lockrecs[i].count == 0) {
418				/*
419				 * Can't happen, see tdb_unlock(). It should
420				 * be an assert.
421				 */
422				TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lock: "
423					 "lck->count == 0 for list %d", list));
424			}
425			/*
426			 * Just increment the in-memory struct, posix locks
427			 * don't stack.
428			 */
429			tdb->lockrecs[i].count++;
430			return 0;
431		}
432	}
433
434	new_lck = (struct tdb_lock_type *)realloc(
435		tdb->lockrecs,
436		sizeof(*tdb->lockrecs) * (tdb->num_lockrecs+1));
437	if (new_lck == NULL) {
438		errno = ENOMEM;
439		return -1;
440	}
441	tdb->lockrecs = new_lck;
442
443	/* Since fcntl locks don't nest, we do a lock for the first one,
444	   and simply bump the count for future ones */
445	if (tdb->methods->tdb_brlock(tdb,FREELIST_TOP+4*list,ltype,F_SETLKW,
446				     0, 1)) {
447		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lock failed on list %d "
448			 "ltype=%d (%s)\n",  list, ltype, strerror(errno)));
449		return -1;
450	}
451
452	tdb->num_locks++;
453
454	tdb->lockrecs[tdb->num_lockrecs].list = list;
455	tdb->lockrecs[tdb->num_lockrecs].count = 1;
456	tdb->lockrecs[tdb->num_lockrecs].ltype = ltype;
457	tdb->num_lockrecs += 1;
458
459	return 0;
460}
461
462/* unlock the database: returns void because it's too late for errors. */
463	/* changed to return int it may be interesting to know there
464	   has been an error  --simo */
465int tdb_unlock(struct tdb_context *tdb, int list, int ltype)
466{
467	int ret = -1;
468	int i;
469	struct tdb_lock_type *lck = NULL;
470
471	/* a global lock allows us to avoid per chain locks */
472	if (tdb->global_lock.count &&
473	    (ltype == tdb->global_lock.ltype || ltype == F_RDLCK)) {
474		return 0;
475	}
476
477	if (tdb->global_lock.count) {
478		return TDB_ERRCODE(TDB_ERR_LOCK, -1);
479	}
480
481	if (tdb->flags & TDB_NOLOCK)
482		return 0;
483
484	/* Sanity checks */
485	if (list < -1 || list >= (int)tdb->header.hash_size) {
486		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: list %d invalid (%d)\n", list, tdb->header.hash_size));
487		return ret;
488	}
489
490	for (i=0; i<tdb->num_lockrecs; i++) {
491		if (tdb->lockrecs[i].list == list) {
492			lck = &tdb->lockrecs[i];
493			break;
494		}
495	}
496
497	if ((lck == NULL) || (lck->count == 0)) {
498		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: count is 0\n"));
499		return -1;
500	}
501
502	if (lck->count > 1) {
503		lck->count--;
504		return 0;
505	}
506
507	/*
508	 * This lock has count==1 left, so we need to unlock it in the
509	 * kernel. We don't bother with decrementing the in-memory array
510	 * element, we're about to overwrite it with the last array element
511	 * anyway.
512	 */
513
514	ret = tdb->methods->tdb_brlock(tdb, FREELIST_TOP+4*list, F_UNLCK,
515				       F_SETLKW, 0, 1);
516	tdb->num_locks--;
517
518	/*
519	 * Shrink the array by overwriting the element just unlocked with the
520	 * last array element.
521	 */
522
523	if (tdb->num_lockrecs > 1) {
524		*lck = tdb->lockrecs[tdb->num_lockrecs-1];
525	}
526	tdb->num_lockrecs -= 1;
527
528	/*
529	 * We don't bother with realloc when the array shrinks, but if we have
530	 * a completely idle tdb we should get rid of the locked array.
531	 */
532
533	if (tdb->num_lockrecs == 0) {
534		SAFE_FREE(tdb->lockrecs);
535	}
536
537	if (ret)
538		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: An error occurred unlocking!\n"));
539	return ret;
540}
541
542
543
544/* lock/unlock entire database */
545static int _tdb_lockall(struct tdb_context *tdb, int ltype)
546{
547	/* There are no locks on read-only dbs */
548	if (tdb->read_only || tdb->traverse_read)
549		return TDB_ERRCODE(TDB_ERR_LOCK, -1);
550
551	if (tdb->global_lock.count && tdb->global_lock.ltype == ltype) {
552		tdb->global_lock.count++;
553		return 0;
554	}
555
556	if (tdb->global_lock.count) {
557		/* a global lock of a different type exists */
558		return TDB_ERRCODE(TDB_ERR_LOCK, -1);
559	}
560
561	if (tdb->num_locks != 0) {
562		/* can't combine global and chain locks */
563		return TDB_ERRCODE(TDB_ERR_LOCK, -1);
564	}
565
566	if (tdb->methods->tdb_brlock(tdb, FREELIST_TOP, ltype, F_SETLKW,
567				     0, 4*tdb->header.hash_size)) {
568		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lockall failed (%s)\n", strerror(errno)));
569		return -1;
570	}
571
572	tdb->global_lock.count = 1;
573	tdb->global_lock.ltype = ltype;
574
575	return 0;
576}
577
578/* unlock entire db */
579static int _tdb_unlockall(struct tdb_context *tdb, int ltype)
580{
581	/* There are no locks on read-only dbs */
582	if (tdb->read_only || tdb->traverse_read) {
583		return TDB_ERRCODE(TDB_ERR_LOCK, -1);
584	}
585
586	if (tdb->global_lock.ltype != ltype || tdb->global_lock.count == 0) {
587		return TDB_ERRCODE(TDB_ERR_LOCK, -1);
588	}
589
590	if (tdb->global_lock.count > 1) {
591		tdb->global_lock.count--;
592		return 0;
593	}
594
595	if (tdb->methods->tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW,
596				     0, 4*tdb->header.hash_size)) {
597		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlockall failed (%s)\n", strerror(errno)));
598		return -1;
599	}
600
601	tdb->global_lock.count = 0;
602	tdb->global_lock.ltype = 0;
603
604	return 0;
605}
606
607/* lock entire database with write lock */
608int tdb_lockall(struct tdb_context *tdb)
609{
610	return _tdb_lockall(tdb, F_WRLCK);
611}
612
613/* unlock entire database with write lock */
614int tdb_unlockall(struct tdb_context *tdb)
615{
616	return _tdb_unlockall(tdb, F_WRLCK);
617}
618
619/* lock entire database with read lock */
620int tdb_lockall_read(struct tdb_context *tdb)
621{
622	return _tdb_lockall(tdb, F_RDLCK);
623}
624
625/* unlock entire database with read lock */
626int tdb_unlockall_read(struct tdb_context *tdb)
627{
628	return _tdb_unlockall(tdb, F_RDLCK);
629}
630
631/* lock/unlock one hash chain. This is meant to be used to reduce
632   contention - it cannot guarantee how many records will be locked */
633int tdb_chainlock(struct tdb_context *tdb, TDB_DATA key)
634{
635	return tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
636}
637
638int tdb_chainunlock(struct tdb_context *tdb, TDB_DATA key)
639{
640	return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
641}
642
643int tdb_chainlock_read(struct tdb_context *tdb, TDB_DATA key)
644{
645	return tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_RDLCK);
646}
647
648int tdb_chainunlock_read(struct tdb_context *tdb, TDB_DATA key)
649{
650	return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_RDLCK);
651}
652
653
654
655/* record lock stops delete underneath */
656int tdb_lock_record(struct tdb_context *tdb, tdb_off_t off)
657{
658	return off ? tdb->methods->tdb_brlock(tdb, off, F_RDLCK, F_SETLKW, 0, 1) : 0;
659}
660
661/*
662  Write locks override our own fcntl readlocks, so check it here.
663  Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
664  an error to fail to get the lock here.
665*/
666int tdb_write_lock_record(struct tdb_context *tdb, tdb_off_t off)
667{
668	struct tdb_traverse_lock *i;
669	for (i = &tdb->travlocks; i; i = i->next)
670		if (i->off == off)
671			return -1;
672	return tdb->methods->tdb_brlock(tdb, off, F_WRLCK, F_SETLK, 1, 1);
673}
674
675/*
676  Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
677  an error to fail to get the lock here.
678*/
679int tdb_write_unlock_record(struct tdb_context *tdb, tdb_off_t off)
680{
681	return tdb->methods->tdb_brlock(tdb, off, F_UNLCK, F_SETLK, 0, 1);
682}
683
684/* fcntl locks don't stack: avoid unlocking someone else's */
685int tdb_unlock_record(struct tdb_context *tdb, tdb_off_t off)
686{
687	struct tdb_traverse_lock *i;
688	u32 count = 0;
689
690	if (off == 0)
691		return 0;
692	for (i = &tdb->travlocks; i; i = i->next)
693		if (i->off == off)
694			count++;
695	return (count == 1 ? tdb->methods->tdb_brlock(tdb, off, F_UNLCK, F_SETLKW, 0, 1) : 0);
696}
697
698/* file: io.c */
699
700/* check for an out of bounds access - if it is out of bounds then
701   see if the database has been expanded by someone else and expand
702   if necessary
703   note that "len" is the minimum length needed for the db
704*/
705static int tdb_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
706{
707	struct stat st;
708	if (len <= tdb->map_size)
709		return 0;
710	if (tdb->flags & TDB_INTERNAL) {
711		if (!probe) {
712			/* Ensure ecode is set for log fn. */
713			tdb->ecode = TDB_ERR_IO;
714			TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_oob len %d beyond internal malloc size %d\n",
715				 (int)len, (int)tdb->map_size));
716		}
717		return TDB_ERRCODE(TDB_ERR_IO, -1);
718	}
719
720	if (fstat(tdb->fd, &st) == -1) {
721		return TDB_ERRCODE(TDB_ERR_IO, -1);
722	}
723
724	if (st.st_size < (size_t)len) {
725		if (!probe) {
726			/* Ensure ecode is set for log fn. */
727			tdb->ecode = TDB_ERR_IO;
728			TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_oob len %d beyond eof at %d\n",
729				 (int)len, (int)st.st_size));
730		}
731		return TDB_ERRCODE(TDB_ERR_IO, -1);
732	}
733
734	/* Unmap, update size, remap */
735	if (tdb_munmap(tdb) == -1)
736		return TDB_ERRCODE(TDB_ERR_IO, -1);
737	tdb->map_size = st.st_size;
738	tdb_mmap(tdb);
739	return 0;
740}
741
742/* write a lump of data at a specified offset */
743static int tdb_write(struct tdb_context *tdb, tdb_off_t off,
744		     const void *buf, tdb_len_t len)
745{
746	if (len == 0) {
747		return 0;
748	}
749
750	if (tdb->read_only || tdb->traverse_read) {
751		tdb->ecode = TDB_ERR_RDONLY;
752		return -1;
753	}
754
755	if (tdb->methods->tdb_oob(tdb, off + len, 0) != 0)
756		return -1;
757
758	if (tdb->map_ptr) {
759		memcpy(off + (char *)tdb->map_ptr, buf, len);
760	} else if (pwrite(tdb->fd, buf, len, off) != (ssize_t)len) {
761		/* Ensure ecode is set for log fn. */
762		tdb->ecode = TDB_ERR_IO;
763		TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_write failed at %d len=%d (%s)\n",
764			   off, len, strerror(errno)));
765		return TDB_ERRCODE(TDB_ERR_IO, -1);
766	}
767	return 0;
768}
769
770/* Endian conversion: we only ever deal with 4 byte quantities */
771void *tdb_convert(void *buf, u32 size)
772{
773	u32 i, *p = (u32 *)buf;
774	for (i = 0; i < size / 4; i++)
775		p[i] = TDB_BYTEREV(p[i]);
776	return buf;
777}
778
779
780/* read a lump of data at a specified offset, maybe convert */
781static int tdb_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
782		    tdb_len_t len, int cv)
783{
784	if (tdb->methods->tdb_oob(tdb, off + len, 0) != 0) {
785		return -1;
786	}
787
788	if (tdb->map_ptr) {
789		memcpy(buf, off + (char *)tdb->map_ptr, len);
790	} else {
791		ssize_t ret = pread(tdb->fd, buf, len, off);
792		if (ret != (ssize_t)len) {
793			/* Ensure ecode is set for log fn. */
794			tdb->ecode = TDB_ERR_IO;
795			TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_read failed at %d "
796				 "len=%d ret=%d (%s) map_size=%d\n",
797				 (int)off, (int)len, (int)ret, strerror(errno),
798				 (int)tdb->map_size));
799			return TDB_ERRCODE(TDB_ERR_IO, -1);
800		}
801	}
802	if (cv) {
803		tdb_convert(buf, len);
804	}
805	return 0;
806}
807
808
809
810/*
811  do an unlocked scan of the hash table heads to find the next non-zero head. The value
812  will then be confirmed with the lock held
813*/
814static void tdb_next_hash_chain(struct tdb_context *tdb, u32 *chain)
815{
816	u32 h = *chain;
817	if (tdb->map_ptr) {
818		for (;h < tdb->header.hash_size;h++) {
819			if (0 != *(u32 *)(TDB_HASH_TOP(h) + (unsigned char *)tdb->map_ptr)) {
820				break;
821			}
822		}
823	} else {
824		u32 off=0;
825		for (;h < tdb->header.hash_size;h++) {
826			if (tdb_ofs_read(tdb, TDB_HASH_TOP(h), &off) != 0 || off != 0) {
827				break;
828			}
829		}
830	}
831	(*chain) = h;
832}
833
834
835int tdb_munmap(struct tdb_context *tdb)
836{
837	if (tdb->flags & TDB_INTERNAL)
838		return 0;
839
840#ifdef HAVE_MMAP
841	if (tdb->map_ptr) {
842		int ret = munmap(tdb->map_ptr, tdb->map_size);
843		if (ret != 0)
844			return ret;
845	}
846#endif
847	tdb->map_ptr = NULL;
848	return 0;
849}
850
851void tdb_mmap(struct tdb_context *tdb)
852{
853	if (tdb->flags & TDB_INTERNAL)
854		return;
855
856#ifdef HAVE_MMAP
857	if (!(tdb->flags & TDB_NOMMAP)) {
858		tdb->map_ptr = mmap(NULL, tdb->map_size,
859				    PROT_READ|(tdb->read_only? 0:PROT_WRITE),
860				    MAP_SHARED|MAP_FILE, tdb->fd, 0);
861
862		/*
863		 * NB. When mmap fails it returns MAP_FAILED *NOT* NULL !!!!
864		 */
865
866		if (tdb->map_ptr == MAP_FAILED) {
867			tdb->map_ptr = NULL;
868			TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_mmap failed for size %d (%s)\n",
869				 tdb->map_size, strerror(errno)));
870		}
871	} else {
872		tdb->map_ptr = NULL;
873	}
874#else
875	tdb->map_ptr = NULL;
876#endif
877}
878
879/* expand a file.  we prefer to use ftruncate, as that is what posix
880  says to use for mmap expansion */
881static int tdb_expand_file(struct tdb_context *tdb, tdb_off_t size, tdb_off_t addition)
882{
883	char buf[1024];
884
885	if (tdb->read_only || tdb->traverse_read) {
886		tdb->ecode = TDB_ERR_RDONLY;
887		return -1;
888	}
889
890	if (ftruncate(tdb->fd, size+addition) == -1) {
891		char b = 0;
892		if (pwrite(tdb->fd,  &b, 1, (size+addition) - 1) != 1) {
893			TDB_LOG((tdb, TDB_DEBUG_FATAL, "expand_file to %d failed (%s)\n",
894				 size+addition, strerror(errno)));
895			return -1;
896		}
897	}
898
899	/* now fill the file with something. This ensures that the
900	   file isn't sparse, which would be very bad if we ran out of
901	   disk. This must be done with write, not via mmap */
902	memset(buf, TDB_PAD_BYTE, sizeof(buf));
903	while (addition) {
904		int n = addition>sizeof(buf)?sizeof(buf):addition;
905		int ret = pwrite(tdb->fd, buf, n, size);
906		if (ret != n) {
907			TDB_LOG((tdb, TDB_DEBUG_FATAL, "expand_file write of %d failed (%s)\n",
908				   n, strerror(errno)));
909			return -1;
910		}
911		addition -= n;
912		size += n;
913	}
914	return 0;
915}
916
917
918/* expand the database at least size bytes by expanding the underlying
919   file and doing the mmap again if necessary */
920int tdb_expand(struct tdb_context *tdb, tdb_off_t size)
921{
922	struct list_struct rec;
923	tdb_off_t offset;
924
925	if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
926		TDB_LOG((tdb, TDB_DEBUG_ERROR, "lock failed in tdb_expand\n"));
927		return -1;
928	}
929
930	/* must know about any previous expansions by another process */
931	tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
932
933	/* always make room for at least 10 more records, and round
934           the database up to a multiple of the page size */
935	size = TDB_ALIGN(tdb->map_size + size*10, tdb->page_size) - tdb->map_size;
936
937	if (!(tdb->flags & TDB_INTERNAL))
938		tdb_munmap(tdb);
939
940	/*
941	 * We must ensure the file is unmapped before doing this
942	 * to ensure consistency with systems like OpenBSD where
943	 * writes and mmaps are not consistent.
944	 */
945
946	/* expand the file itself */
947	if (!(tdb->flags & TDB_INTERNAL)) {
948		if (tdb->methods->tdb_expand_file(tdb, tdb->map_size, size) != 0)
949			goto fail;
950	}
951
952	tdb->map_size += size;
953
954	if (tdb->flags & TDB_INTERNAL) {
955		char *new_map_ptr = (char *)realloc(tdb->map_ptr,
956						    tdb->map_size);
957		if (!new_map_ptr) {
958			tdb->map_size -= size;
959			goto fail;
960		}
961		tdb->map_ptr = new_map_ptr;
962	} else {
963		/*
964		 * We must ensure the file is remapped before adding the space
965		 * to ensure consistency with systems like OpenBSD where
966		 * writes and mmaps are not consistent.
967		 */
968
969		/* We're ok if the mmap fails as we'll fallback to read/write */
970		tdb_mmap(tdb);
971	}
972
973	/* form a new freelist record */
974	memset(&rec,'\0',sizeof(rec));
975	rec.rec_len = size - sizeof(rec);
976
977	/* link it into the free list */
978	offset = tdb->map_size - size;
979	if (tdb_free(tdb, offset, &rec) == -1)
980		goto fail;
981
982	tdb_unlock(tdb, -1, F_WRLCK);
983	return 0;
984 fail:
985	tdb_unlock(tdb, -1, F_WRLCK);
986	return -1;
987}
988
989/* read/write a tdb_off_t */
990int tdb_ofs_read(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d)
991{
992	return tdb->methods->tdb_read(tdb, offset, (char*)d, sizeof(*d), DOCONV());
993}
994
995int tdb_ofs_write(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d)
996{
997	tdb_off_t off = *d;
998	return tdb->methods->tdb_write(tdb, offset, CONVERT(off), sizeof(*d));
999}
1000
1001
1002/* read a lump of data, allocating the space for it */
1003unsigned char *tdb_alloc_read(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t len)
1004{
1005	unsigned char *buf;
1006
1007	/* some systems don't like zero length malloc */
1008	if (len == 0) {
1009		len = 1;
1010	}
1011
1012	if (!(buf = (unsigned char *)malloc(len))) {
1013		/* Ensure ecode is set for log fn. */
1014		tdb->ecode = TDB_ERR_OOM;
1015		TDB_LOG((tdb, TDB_DEBUG_ERROR,"tdb_alloc_read malloc failed len=%d (%s)\n",
1016			   len, strerror(errno)));
1017		return TDB_ERRCODE(TDB_ERR_OOM, buf);
1018	}
1019	if (tdb->methods->tdb_read(tdb, offset, buf, len, 0) == -1) {
1020		SAFE_FREE(buf);
1021		return NULL;
1022	}
1023	return buf;
1024}
1025
1026/* Give a piece of tdb data to a parser */
1027
1028int tdb_parse_data(struct tdb_context *tdb, TDB_DATA key,
1029		   tdb_off_t offset, tdb_len_t len,
1030		   int (*parser)(TDB_DATA key, TDB_DATA data,
1031				 void *private_data),
1032		   void *private_data)
1033{
1034	TDB_DATA data;
1035	int result;
1036
1037	data.dsize = len;
1038
1039	if ((tdb->transaction == NULL) && (tdb->map_ptr != NULL)) {
1040		/*
1041		 * Optimize by avoiding the malloc/memcpy/free, point the
1042		 * parser directly at the mmap area.
1043		 */
1044		if (tdb->methods->tdb_oob(tdb, offset+len, 0) != 0) {
1045			return -1;
1046		}
1047		data.dptr = offset + (unsigned char *)tdb->map_ptr;
1048		return parser(key, data, private_data);
1049	}
1050
1051	if (!(data.dptr = tdb_alloc_read(tdb, offset, len))) {
1052		return -1;
1053	}
1054
1055	result = parser(key, data, private_data);
1056	free(data.dptr);
1057	return result;
1058}
1059
1060/* read/write a record */
1061int tdb_rec_read(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec)
1062{
1063	if (tdb->methods->tdb_read(tdb, offset, rec, sizeof(*rec),DOCONV()) == -1)
1064		return -1;
1065	if (TDB_BAD_MAGIC(rec)) {
1066		/* Ensure ecode is set for log fn. */
1067		tdb->ecode = TDB_ERR_CORRUPT;
1068		TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_rec_read bad magic 0x%x at offset=%d\n", rec->magic, offset));
1069		return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
1070	}
1071	return tdb->methods->tdb_oob(tdb, rec->next+sizeof(*rec), 0);
1072}
1073
1074int tdb_rec_write(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec)
1075{
1076	struct list_struct r = *rec;
1077	return tdb->methods->tdb_write(tdb, offset, CONVERT(r), sizeof(r));
1078}
1079
1080static const struct tdb_methods io_methods = {
1081	tdb_read,
1082	tdb_write,
1083	tdb_next_hash_chain,
1084	tdb_oob,
1085	tdb_expand_file,
1086	tdb_brlock
1087};
1088
1089/*
1090  initialise the default methods table
1091*/
1092void tdb_io_init(struct tdb_context *tdb)
1093{
1094	tdb->methods = &io_methods;
1095}
1096
1097/* file: transaction.c */
1098
1099/*
1100  transaction design:
1101
1102  - only allow a single transaction at a time per database. This makes
1103    using the transaction API simpler, as otherwise the caller would
1104    have to cope with temporary failures in transactions that conflict
1105    with other current transactions
1106
1107  - keep the transaction recovery information in the same file as the
1108    database, using a special 'transaction recovery' record pointed at
1109    by the header. This removes the need for extra journal files as
1110    used by some other databases
1111
1112  - dynamically allocated the transaction recover record, re-using it
1113    for subsequent transactions. If a larger record is needed then
1114    tdb_free() the old record to place it on the normal tdb freelist
1115    before allocating the new record
1116
1117  - during transactions, keep a linked list of writes all that have
1118    been performed by intercepting all tdb_write() calls. The hooked
1119    transaction versions of tdb_read() and tdb_write() check this
1120    linked list and try to use the elements of the list in preference
1121    to the real database.
1122
1123  - don't allow any locks to be held when a transaction starts,
1124    otherwise we can end up with deadlock (plus lack of lock nesting
1125    in posix locks would mean the lock is lost)
1126
1127  - if the caller gains a lock during the transaction but doesn't
1128    release it then fail the commit
1129
1130  - allow for nested calls to tdb_transaction_start(), re-using the
1131    existing transaction record. If the inner transaction is cancelled
1132    then a subsequent commit will fail
1133
1134  - keep a mirrored copy of the tdb hash chain heads to allow for the
1135    fast hash heads scan on traverse, updating the mirrored copy in
1136    the transaction version of tdb_write
1137
1138  - allow callers to mix transaction and non-transaction use of tdb,
1139    although once a transaction is started then an exclusive lock is
1140    gained until the transaction is committed or cancelled
1141
1142  - the commit stategy involves first saving away all modified data
1143    into a linearised buffer in the transaction recovery area, then
1144    marking the transaction recovery area with a magic value to
1145    indicate a valid recovery record. In total 4 fsync/msync calls are
1146    needed per commit to prevent race conditions. It might be possible
1147    to reduce this to 3 or even 2 with some more work.
1148
1149  - check for a valid recovery record on open of the tdb, while the
1150    global lock is held. Automatically recover from the transaction
1151    recovery area if needed, then continue with the open as
1152    usual. This allows for smooth crash recovery with no administrator
1153    intervention.
1154
1155  - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
1156    still available, but no transaction recovery area is used and no
1157    fsync/msync calls are made.
1158
1159*/
1160
1161struct tdb_transaction_el {
1162	struct tdb_transaction_el *next, *prev;
1163	tdb_off_t offset;
1164	tdb_len_t length;
1165	unsigned char *data;
1166};
1167
1168/*
1169  hold the context of any current transaction
1170*/
1171struct tdb_transaction {
1172	/* we keep a mirrored copy of the tdb hash heads here so
1173	   tdb_next_hash_chain() can operate efficiently */
1174	u32 *hash_heads;
1175
1176	/* the original io methods - used to do IOs to the real db */
1177	const struct tdb_methods *io_methods;
1178
1179	/* the list of transaction elements. We use a doubly linked
1180	   list with a last pointer to allow us to keep the list
1181	   ordered, with first element at the front of the list. It
1182	   needs to be doubly linked as the read/write traversals need
1183	   to be backwards, while the commit needs to be forwards */
1184	struct tdb_transaction_el *elements, *elements_last;
1185
1186	/* non-zero when an internal transaction error has
1187	   occurred. All write operations will then fail until the
1188	   transaction is ended */
1189	int transaction_error;
1190
1191	/* when inside a transaction we need to keep track of any
1192	   nested tdb_transaction_start() calls, as these are allowed,
1193	   but don't create a new transaction */
1194	int nesting;
1195
1196	/* old file size before transaction */
1197	tdb_len_t old_map_size;
1198};
1199
1200
1201/*
1202  read while in a transaction. We need to check first if the data is in our list
1203  of transaction elements, then if not do a real read
1204*/
1205static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
1206			    tdb_len_t len, int cv)
1207{
1208	struct tdb_transaction_el *el;
1209
1210	/* we need to walk the list backwards to get the most recent data */
1211	for (el=tdb->transaction->elements_last;el;el=el->prev) {
1212		tdb_len_t partial;
1213
1214		if (off+len <= el->offset) {
1215			continue;
1216		}
1217		if (off >= el->offset + el->length) {
1218			continue;
1219		}
1220
1221		/* an overlapping read - needs to be split into up to
1222		   2 reads and a memcpy */
1223		if (off < el->offset) {
1224			partial = el->offset - off;
1225			if (transaction_read(tdb, off, buf, partial, cv) != 0) {
1226				goto fail;
1227			}
1228			len -= partial;
1229			off += partial;
1230			buf = (void *)(partial + (char *)buf);
1231		}
1232		if (off + len <= el->offset + el->length) {
1233			partial = len;
1234		} else {
1235			partial = el->offset + el->length - off;
1236		}
1237		memcpy(buf, el->data + (off - el->offset), partial);
1238		if (cv) {
1239			tdb_convert(buf, len);
1240		}
1241		len -= partial;
1242		off += partial;
1243		buf = (void *)(partial + (char *)buf);
1244
1245		if (len != 0 && transaction_read(tdb, off, buf, len, cv) != 0) {
1246			goto fail;
1247		}
1248
1249		return 0;
1250	}
1251
1252	/* its not in the transaction elements - do a real read */
1253	return tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv);
1254
1255fail:
1256	TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%d len=%d\n", off, len));
1257	tdb->ecode = TDB_ERR_IO;
1258	tdb->transaction->transaction_error = 1;
1259	return -1;
1260}
1261
1262
1263/*
1264  write while in a transaction
1265*/
1266static int transaction_write(struct tdb_context *tdb, tdb_off_t off,
1267			     const void *buf, tdb_len_t len)
1268{
1269	struct tdb_transaction_el *el, *best_el=NULL;
1270
1271	if (len == 0) {
1272		return 0;
1273	}
1274
1275	/* if the write is to a hash head, then update the transaction
1276	   hash heads */
1277	if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
1278	    off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
1279		u32 chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
1280		memcpy(&tdb->transaction->hash_heads[chain], buf, len);
1281	}
1282
1283	/* first see if we can replace an existing entry */
1284	for (el=tdb->transaction->elements_last;el;el=el->prev) {
1285		tdb_len_t partial;
1286
1287		if (best_el == NULL && off == el->offset+el->length) {
1288			best_el = el;
1289		}
1290
1291		if (off+len <= el->offset) {
1292			continue;
1293		}
1294		if (off >= el->offset + el->length) {
1295			continue;
1296		}
1297
1298		/* an overlapping write - needs to be split into up to
1299		   2 writes and a memcpy */
1300		if (off < el->offset) {
1301			partial = el->offset - off;
1302			if (transaction_write(tdb, off, buf, partial) != 0) {
1303				goto fail;
1304			}
1305			len -= partial;
1306			off += partial;
1307			buf = (const void *)(partial + (const char *)buf);
1308		}
1309		if (off + len <= el->offset + el->length) {
1310			partial = len;
1311		} else {
1312			partial = el->offset + el->length - off;
1313		}
1314		memcpy(el->data + (off - el->offset), buf, partial);
1315		len -= partial;
1316		off += partial;
1317		buf = (const void *)(partial + (const char *)buf);
1318
1319		if (len != 0 && transaction_write(tdb, off, buf, len) != 0) {
1320			goto fail;
1321		}
1322
1323		return 0;
1324	}
1325
1326	/* see if we can append the new entry to an existing entry */
1327	if (best_el && best_el->offset + best_el->length == off &&
1328	    (off+len < tdb->transaction->old_map_size ||
1329	     off > tdb->transaction->old_map_size)) {
1330		unsigned char *data = best_el->data;
1331		el = best_el;
1332		el->data = (unsigned char *)realloc(el->data,
1333						    el->length + len);
1334		if (el->data == NULL) {
1335			tdb->ecode = TDB_ERR_OOM;
1336			tdb->transaction->transaction_error = 1;
1337			el->data = data;
1338			return -1;
1339		}
1340		if (buf) {
1341			memcpy(el->data + el->length, buf, len);
1342		} else {
1343			memset(el->data + el->length, TDB_PAD_BYTE, len);
1344		}
1345		el->length += len;
1346		return 0;
1347	}
1348
1349	/* add a new entry at the end of the list */
1350	el = (struct tdb_transaction_el *)malloc(sizeof(*el));
1351	if (el == NULL) {
1352		tdb->ecode = TDB_ERR_OOM;
1353		tdb->transaction->transaction_error = 1;
1354		return -1;
1355	}
1356	el->next = NULL;
1357	el->prev = tdb->transaction->elements_last;
1358	el->offset = off;
1359	el->length = len;
1360	el->data = (unsigned char *)malloc(len);
1361	if (el->data == NULL) {
1362		free(el);
1363		tdb->ecode = TDB_ERR_OOM;
1364		tdb->transaction->transaction_error = 1;
1365		return -1;
1366	}
1367	if (buf) {
1368		memcpy(el->data, buf, len);
1369	} else {
1370		memset(el->data, TDB_PAD_BYTE, len);
1371	}
1372	if (el->prev) {
1373		el->prev->next = el;
1374	} else {
1375		tdb->transaction->elements = el;
1376	}
1377	tdb->transaction->elements_last = el;
1378	return 0;
1379
1380fail:
1381	TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n", off, len));
1382	tdb->ecode = TDB_ERR_IO;
1383	tdb->transaction->transaction_error = 1;
1384	return -1;
1385}
1386
1387/*
1388  accelerated hash chain head search, using the cached hash heads
1389*/
1390static void transaction_next_hash_chain(struct tdb_context *tdb, u32 *chain)
1391{
1392	u32 h = *chain;
1393	for (;h < tdb->header.hash_size;h++) {
1394		/* the +1 takes account of the freelist */
1395		if (0 != tdb->transaction->hash_heads[h+1]) {
1396			break;
1397		}
1398	}
1399	(*chain) = h;
1400}
1401
1402/*
1403  out of bounds check during a transaction
1404*/
1405static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
1406{
1407	if (len <= tdb->map_size) {
1408		return 0;
1409	}
1410	return TDB_ERRCODE(TDB_ERR_IO, -1);
1411}
1412
1413/*
1414  transaction version of tdb_expand().
1415*/
1416static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size,
1417				   tdb_off_t addition)
1418{
1419	/* add a write to the transaction elements, so subsequent
1420	   reads see the zero data */
1421	if (transaction_write(tdb, size, NULL, addition) != 0) {
1422		return -1;
1423	}
1424
1425	return 0;
1426}
1427
1428/*
1429  brlock during a transaction - ignore them
1430*/
1431static int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset,
1432			      int rw_type, int lck_type, int probe, size_t len)
1433{
1434	return 0;
1435}
1436
1437static const struct tdb_methods transaction_methods = {
1438	transaction_read,
1439	transaction_write,
1440	transaction_next_hash_chain,
1441	transaction_oob,
1442	transaction_expand_file,
1443	transaction_brlock
1444};
1445
1446
1447/*
1448  start a tdb transaction. No token is returned, as only a single
1449  transaction is allowed to be pending per tdb_context
1450*/
1451int tdb_transaction_start(struct tdb_context *tdb)
1452{
1453	/* some sanity checks */
1454	if (tdb->read_only || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) {
1455		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
1456		tdb->ecode = TDB_ERR_EINVAL;
1457		return -1;
1458	}
1459
1460	/* cope with nested tdb_transaction_start() calls */
1461	if (tdb->transaction != NULL) {
1462		tdb->transaction->nesting++;
1463		TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n",
1464			 tdb->transaction->nesting));
1465		return 0;
1466	}
1467
1468	if (tdb->num_locks != 0 || tdb->global_lock.count) {
1469		/* the caller must not have any locks when starting a
1470		   transaction as otherwise we'll be screwed by lack
1471		   of nested locks in posix */
1472		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n"));
1473		tdb->ecode = TDB_ERR_LOCK;
1474		return -1;
1475	}
1476
1477	if (tdb->travlocks.next != NULL) {
1478		/* you cannot use transactions inside a traverse (although you can use
1479		   traverse inside a transaction) as otherwise you can end up with
1480		   deadlock */
1481		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
1482		tdb->ecode = TDB_ERR_LOCK;
1483		return -1;
1484	}
1485
1486	tdb->transaction = (struct tdb_transaction *)
1487		calloc(sizeof(struct tdb_transaction), 1);
1488	if (tdb->transaction == NULL) {
1489		tdb->ecode = TDB_ERR_OOM;
1490		return -1;
1491	}
1492
1493	/* get the transaction write lock. This is a blocking lock. As
1494	   discussed with Volker, there are a number of ways we could
1495	   make this async, which we will probably do in the future */
1496	if (tdb_brlock(tdb, TRANSACTION_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
1497		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get transaction lock\n"));
1498		tdb->ecode = TDB_ERR_LOCK;
1499		SAFE_FREE(tdb->transaction);
1500		return -1;
1501	}
1502
1503	/* get a read lock from the freelist to the end of file. This
1504	   is upgraded to a write lock during the commit */
1505	if (tdb_brlock(tdb, FREELIST_TOP, F_RDLCK, F_SETLKW, 0, 0) == -1) {
1506		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
1507		tdb->ecode = TDB_ERR_LOCK;
1508		goto fail;
1509	}
1510
1511	/* setup a copy of the hash table heads so the hash scan in
1512	   traverse can be fast */
1513	tdb->transaction->hash_heads = (u32 *)
1514		calloc(tdb->header.hash_size+1, sizeof(u32));
1515	if (tdb->transaction->hash_heads == NULL) {
1516		tdb->ecode = TDB_ERR_OOM;
1517		goto fail;
1518	}
1519	if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
1520				   TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
1521		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n"));
1522		tdb->ecode = TDB_ERR_IO;
1523		goto fail;
1524	}
1525
1526	/* make sure we know about any file expansions already done by
1527	   anyone else */
1528	tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
1529	tdb->transaction->old_map_size = tdb->map_size;
1530
1531	/* finally hook the io methods, replacing them with
1532	   transaction specific methods */
1533	tdb->transaction->io_methods = tdb->methods;
1534	tdb->methods = &transaction_methods;
1535
1536	/* by calling this transaction write here, we ensure that we don't grow the
1537	   transaction linked list due to hash table updates */
1538	if (transaction_write(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
1539			      TDB_HASHTABLE_SIZE(tdb)) != 0) {
1540		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to prime hash table\n"));
1541		tdb->ecode = TDB_ERR_IO;
1542		goto fail;
1543	}
1544
1545	return 0;
1546
1547fail:
1548	tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
1549	tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
1550	SAFE_FREE(tdb->transaction->hash_heads);
1551	SAFE_FREE(tdb->transaction);
1552	return -1;
1553}
1554
1555
1556/*
1557  cancel the current transaction
1558*/
1559int tdb_transaction_cancel(struct tdb_context *tdb)
1560{
1561	if (tdb->transaction == NULL) {
1562		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
1563		return -1;
1564	}
1565
1566	if (tdb->transaction->nesting != 0) {
1567		tdb->transaction->transaction_error = 1;
1568		tdb->transaction->nesting--;
1569		return 0;
1570	}
1571
1572	tdb->map_size = tdb->transaction->old_map_size;
1573
1574	/* free all the transaction elements */
1575	while (tdb->transaction->elements) {
1576		struct tdb_transaction_el *el = tdb->transaction->elements;
1577		tdb->transaction->elements = el->next;
1578		free(el->data);
1579		free(el);
1580	}
1581
1582	/* remove any global lock created during the transaction */
1583	if (tdb->global_lock.count != 0) {
1584		tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 4*tdb->header.hash_size);
1585		tdb->global_lock.count = 0;
1586	}
1587
1588	/* remove any locks created during the transaction */
1589	if (tdb->num_locks != 0) {
1590		int i;
1591		for (i=0;i<tdb->num_lockrecs;i++) {
1592			tdb_brlock(tdb,FREELIST_TOP+4*tdb->lockrecs[i].list,
1593				   F_UNLCK,F_SETLKW, 0, 1);
1594		}
1595		tdb->num_locks = 0;
1596	}
1597
1598	/* restore the normal io methods */
1599	tdb->methods = tdb->transaction->io_methods;
1600
1601	tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
1602	tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
1603	SAFE_FREE(tdb->transaction->hash_heads);
1604	SAFE_FREE(tdb->transaction);
1605
1606	return 0;
1607}
1608
1609/*
1610  sync to disk
1611*/
1612static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
1613{
1614	if (fsync(tdb->fd) != 0) {
1615		tdb->ecode = TDB_ERR_IO;
1616		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
1617		return -1;
1618	}
1619#ifdef MS_SYNC
1620	if (tdb->map_ptr) {
1621		tdb_off_t moffset = offset & ~(tdb->page_size-1);
1622		if (msync(moffset + (char *)tdb->map_ptr,
1623			  length + (offset - moffset), MS_SYNC) != 0) {
1624			tdb->ecode = TDB_ERR_IO;
1625			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
1626				 strerror(errno)));
1627			return -1;
1628		}
1629	}
1630#endif
1631	return 0;
1632}
1633
1634
1635/*
1636  work out how much space the linearised recovery data will consume
1637*/
1638static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
1639{
1640	struct tdb_transaction_el *el;
1641	tdb_len_t recovery_size = 0;
1642
1643	recovery_size = sizeof(u32);
1644	for (el=tdb->transaction->elements;el;el=el->next) {
1645		if (el->offset >= tdb->transaction->old_map_size) {
1646			continue;
1647		}
1648		recovery_size += 2*sizeof(tdb_off_t) + el->length;
1649	}
1650
1651	return recovery_size;
1652}
1653
1654/*
1655  allocate the recovery area, or use an existing recovery area if it is
1656  large enough
1657*/
1658static int tdb_recovery_allocate(struct tdb_context *tdb,
1659				 tdb_len_t *recovery_size,
1660				 tdb_off_t *recovery_offset,
1661				 tdb_len_t *recovery_max_size)
1662{
1663	struct list_struct rec;
1664	const struct tdb_methods *methods = tdb->transaction->io_methods;
1665	tdb_off_t recovery_head;
1666
1667	if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
1668		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
1669		return -1;
1670	}
1671
1672	rec.rec_len = 0;
1673
1674	if (recovery_head != 0 &&
1675	    methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
1676		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery record\n"));
1677		return -1;
1678	}
1679
1680	*recovery_size = tdb_recovery_size(tdb);
1681
1682	if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
1683		/* it fits in the existing area */
1684		*recovery_max_size = rec.rec_len;
1685		*recovery_offset = recovery_head;
1686		return 0;
1687	}
1688
1689	/* we need to free up the old recovery area, then allocate a
1690	   new one at the end of the file. Note that we cannot use
1691	   tdb_allocate() to allocate the new one as that might return
1692	   us an area that is being currently used (as of the start of
1693	   the transaction) */
1694	if (recovery_head != 0) {
1695		if (tdb_free(tdb, recovery_head, &rec) == -1) {
1696			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to free previous recovery area\n"));
1697			return -1;
1698		}
1699	}
1700
1701	/* the tdb_free() call might have increased the recovery size */
1702	*recovery_size = tdb_recovery_size(tdb);
1703
1704	/* round up to a multiple of page size */
1705	*recovery_max_size = TDB_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
1706	*recovery_offset = tdb->map_size;
1707	recovery_head = *recovery_offset;
1708
1709	if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
1710				     (tdb->map_size - tdb->transaction->old_map_size) +
1711				     sizeof(rec) + *recovery_max_size) == -1) {
1712		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
1713		return -1;
1714	}
1715
1716	/* remap the file (if using mmap) */
1717	methods->tdb_oob(tdb, tdb->map_size + 1, 1);
1718
1719	/* we have to reset the old map size so that we don't try to expand the file
1720	   again in the transaction commit, which would destroy the recovery area */
1721	tdb->transaction->old_map_size = tdb->map_size;
1722
1723	/* write the recovery header offset and sync - we can sync without a race here
1724	   as the magic ptr in the recovery record has not been set */
1725	CONVERT(recovery_head);
1726	if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD,
1727			       &recovery_head, sizeof(tdb_off_t)) == -1) {
1728		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
1729		return -1;
1730	}
1731
1732	return 0;
1733}
1734
1735
1736/*
1737  setup the recovery data that will be used on a crash during commit
1738*/
1739static int transaction_setup_recovery(struct tdb_context *tdb,
1740				      tdb_off_t *magic_offset)
1741{
1742	struct tdb_transaction_el *el;
1743	tdb_len_t recovery_size;
1744	unsigned char *data, *p;
1745	const struct tdb_methods *methods = tdb->transaction->io_methods;
1746	struct list_struct *rec;
1747	tdb_off_t recovery_offset, recovery_max_size;
1748	tdb_off_t old_map_size = tdb->transaction->old_map_size;
1749	u32 magic, tailer;
1750
1751	/*
1752	  check that the recovery area has enough space
1753	*/
1754	if (tdb_recovery_allocate(tdb, &recovery_size,
1755				  &recovery_offset, &recovery_max_size) == -1) {
1756		return -1;
1757	}
1758
1759	data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
1760	if (data == NULL) {
1761		tdb->ecode = TDB_ERR_OOM;
1762		return -1;
1763	}
1764
1765	rec = (struct list_struct *)data;
1766	memset(rec, 0, sizeof(*rec));
1767
1768	rec->magic    = 0;
1769	rec->data_len = recovery_size;
1770	rec->rec_len  = recovery_max_size;
1771	rec->key_len  = old_map_size;
1772	CONVERT(rec);
1773
1774	/* build the recovery data into a single blob to allow us to do a single
1775	   large write, which should be more efficient */
1776	p = data + sizeof(*rec);
1777	for (el=tdb->transaction->elements;el;el=el->next) {
1778		if (el->offset >= old_map_size) {
1779			continue;
1780		}
1781		if (el->offset + el->length > tdb->transaction->old_map_size) {
1782			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
1783			free(data);
1784			tdb->ecode = TDB_ERR_CORRUPT;
1785			return -1;
1786		}
1787		memcpy(p, &el->offset, 4);
1788		memcpy(p+4, &el->length, 4);
1789		if (DOCONV()) {
1790			tdb_convert(p, 8);
1791		}
1792		/* the recovery area contains the old data, not the
1793		   new data, so we have to call the original tdb_read
1794		   method to get it */
1795		if (methods->tdb_read(tdb, el->offset, p + 8, el->length, 0) != 0) {
1796			free(data);
1797			tdb->ecode = TDB_ERR_IO;
1798			return -1;
1799		}
1800		p += 8 + el->length;
1801	}
1802
1803	/* and the tailer */
1804	tailer = sizeof(*rec) + recovery_max_size;
1805	memcpy(p, &tailer, 4);
1806	CONVERT(p);
1807
1808	/* write the recovery data to the recovery area */
1809	if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
1810		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
1811		free(data);
1812		tdb->ecode = TDB_ERR_IO;
1813		return -1;
1814	}
1815
1816	/* as we don't have ordered writes, we have to sync the recovery
1817	   data before we update the magic to indicate that the recovery
1818	   data is present */
1819	if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
1820		free(data);
1821		return -1;
1822	}
1823
1824	free(data);
1825
1826	magic = TDB_RECOVERY_MAGIC;
1827	CONVERT(magic);
1828
1829	*magic_offset = recovery_offset + offsetof(struct list_struct, magic);
1830
1831	if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
1832		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
1833		tdb->ecode = TDB_ERR_IO;
1834		return -1;
1835	}
1836
1837	/* ensure the recovery magic marker is on disk */
1838	if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
1839		return -1;
1840	}
1841
1842	return 0;
1843}
1844
1845/*
1846  commit the current transaction
1847*/
1848int tdb_transaction_commit(struct tdb_context *tdb)
1849{
1850	const struct tdb_methods *methods;
1851	tdb_off_t magic_offset = 0;
1852	u32 zero = 0;
1853
1854	if (tdb->transaction == NULL) {
1855		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
1856		return -1;
1857	}
1858
1859	if (tdb->transaction->transaction_error) {
1860		tdb->ecode = TDB_ERR_IO;
1861		tdb_transaction_cancel(tdb);
1862		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
1863		return -1;
1864	}
1865
1866	if (tdb->transaction->nesting != 0) {
1867		tdb->transaction->nesting--;
1868		return 0;
1869	}
1870
1871	/* check for a null transaction */
1872	if (tdb->transaction->elements == NULL) {
1873		tdb_transaction_cancel(tdb);
1874		return 0;
1875	}
1876
1877	methods = tdb->transaction->io_methods;
1878
1879	/* if there are any locks pending then the caller has not
1880	   nested their locks properly, so fail the transaction */
1881	if (tdb->num_locks || tdb->global_lock.count) {
1882		tdb->ecode = TDB_ERR_LOCK;
1883		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: locks pending on commit\n"));
1884		tdb_transaction_cancel(tdb);
1885		return -1;
1886	}
1887
1888	/* upgrade the main transaction lock region to a write lock */
1889	if (tdb_brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) {
1890		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to upgrade hash locks\n"));
1891		tdb->ecode = TDB_ERR_LOCK;
1892		tdb_transaction_cancel(tdb);
1893		return -1;
1894	}
1895
1896	/* get the global lock - this prevents new users attaching to the database
1897	   during the commit */
1898	if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
1899		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: failed to get global lock\n"));
1900		tdb->ecode = TDB_ERR_LOCK;
1901		tdb_transaction_cancel(tdb);
1902		return -1;
1903	}
1904
1905	if (!(tdb->flags & TDB_NOSYNC)) {
1906		/* write the recovery data to the end of the file */
1907		if (transaction_setup_recovery(tdb, &magic_offset) == -1) {
1908			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to setup recovery data\n"));
1909			tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
1910			tdb_transaction_cancel(tdb);
1911			return -1;
1912		}
1913	}
1914
1915	/* expand the file to the new size if needed */
1916	if (tdb->map_size != tdb->transaction->old_map_size) {
1917		if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
1918					     tdb->map_size -
1919					     tdb->transaction->old_map_size) == -1) {
1920			tdb->ecode = TDB_ERR_IO;
1921			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: expansion failed\n"));
1922			tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
1923			tdb_transaction_cancel(tdb);
1924			return -1;
1925		}
1926		tdb->map_size = tdb->transaction->old_map_size;
1927		methods->tdb_oob(tdb, tdb->map_size + 1, 1);
1928	}
1929
1930	/* perform all the writes */
1931	while (tdb->transaction->elements) {
1932		struct tdb_transaction_el *el = tdb->transaction->elements;
1933
1934		if (methods->tdb_write(tdb, el->offset, el->data, el->length) == -1) {
1935			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
1936
1937			/* we've overwritten part of the data and
1938			   possibly expanded the file, so we need to
1939			   run the crash recovery code */
1940			tdb->methods = methods;
1941			tdb_transaction_recover(tdb);
1942
1943			tdb_transaction_cancel(tdb);
1944			tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
1945
1946			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
1947			return -1;
1948		}
1949		tdb->transaction->elements = el->next;
1950		free(el->data);
1951		free(el);
1952	}
1953
1954	if (!(tdb->flags & TDB_NOSYNC)) {
1955		/* ensure the new data is on disk */
1956		if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1957			return -1;
1958		}
1959
1960		/* remove the recovery marker */
1961		if (methods->tdb_write(tdb, magic_offset, &zero, 4) == -1) {
1962			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to remove recovery magic\n"));
1963			return -1;
1964		}
1965
1966		/* ensure the recovery marker has been removed on disk */
1967		if (transaction_sync(tdb, magic_offset, 4) == -1) {
1968			return -1;
1969		}
1970	}
1971
1972	tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
1973
1974	/*
1975	  TODO: maybe write to some dummy hdr field, or write to magic
1976	  offset without mmap, before the last sync, instead of the
1977	  utime() call
1978	*/
1979
1980	/* on some systems (like Linux 2.6.x) changes via mmap/msync
1981	   don't change the mtime of the file, this means the file may
1982	   not be backed up (as tdb rounding to block sizes means that
1983	   file size changes are quite rare too). The following forces
1984	   mtime changes when a transaction completes */
1985#ifdef HAVE_UTIME
1986	utime(tdb->name, NULL);
1987#endif
1988
1989	/* use a transaction cancel to free memory and remove the
1990	   transaction locks */
1991	tdb_transaction_cancel(tdb);
1992	return 0;
1993}
1994
1995
1996/*
1997  recover from an aborted transaction. Must be called with exclusive
1998  database write access already established (including the global
1999  lock to prevent new processes attaching)
2000*/
2001int tdb_transaction_recover(struct tdb_context *tdb)
2002{
2003	tdb_off_t recovery_head, recovery_eof;
2004	unsigned char *data, *p;
2005	u32 zero = 0;
2006	struct list_struct rec;
2007
2008	/* find the recovery area */
2009	if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
2010		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery head\n"));
2011		tdb->ecode = TDB_ERR_IO;
2012		return -1;
2013	}
2014
2015	if (recovery_head == 0) {
2016		/* we have never allocated a recovery record */
2017		return 0;
2018	}
2019
2020	/* read the recovery record */
2021	if (tdb->methods->tdb_read(tdb, recovery_head, &rec,
2022				   sizeof(rec), DOCONV()) == -1) {
2023		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));
2024		tdb->ecode = TDB_ERR_IO;
2025		return -1;
2026	}
2027
2028	if (rec.magic != TDB_RECOVERY_MAGIC) {
2029		/* there is no valid recovery data */
2030		return 0;
2031	}
2032
2033	if (tdb->read_only) {
2034		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: attempt to recover read only database\n"));
2035		tdb->ecode = TDB_ERR_CORRUPT;
2036		return -1;
2037	}
2038
2039	recovery_eof = rec.key_len;
2040
2041	data = (unsigned char *)malloc(rec.data_len);
2042	if (data == NULL) {
2043		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));
2044		tdb->ecode = TDB_ERR_OOM;
2045		return -1;
2046	}
2047
2048	/* read the full recovery data */
2049	if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
2050				   rec.data_len, 0) == -1) {
2051		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));
2052		tdb->ecode = TDB_ERR_IO;
2053		return -1;
2054	}
2055
2056	/* recover the file data */
2057	p = data;
2058	while (p+8 < data + rec.data_len) {
2059		u32 ofs, len;
2060		if (DOCONV()) {
2061			tdb_convert(p, 8);
2062		}
2063		memcpy(&ofs, p, 4);
2064		memcpy(&len, p+4, 4);
2065
2066		if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
2067			free(data);
2068			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len, ofs));
2069			tdb->ecode = TDB_ERR_IO;
2070			return -1;
2071		}
2072		p += 8 + len;
2073	}
2074
2075	free(data);
2076
2077	if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
2078		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync recovery\n"));
2079		tdb->ecode = TDB_ERR_IO;
2080		return -1;
2081	}
2082
2083	/* if the recovery area is after the recovered eof then remove it */
2084	if (recovery_eof <= recovery_head) {
2085		if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
2086			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
2087			tdb->ecode = TDB_ERR_IO;
2088			return -1;
2089		}
2090	}
2091
2092	/* remove the recovery magic */
2093	if (tdb_ofs_write(tdb, recovery_head + offsetof(struct list_struct, magic),
2094			  &zero) == -1) {
2095		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
2096		tdb->ecode = TDB_ERR_IO;
2097		return -1;
2098	}
2099
2100	/* reduce the file size to the old size */
2101	tdb_munmap(tdb);
2102	if (ftruncate(tdb->fd, recovery_eof) != 0) {
2103		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to reduce to recovery size\n"));
2104		tdb->ecode = TDB_ERR_IO;
2105		return -1;
2106	}
2107	tdb->map_size = recovery_eof;
2108	tdb_mmap(tdb);
2109
2110	if (transaction_sync(tdb, 0, recovery_eof) == -1) {
2111		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
2112		tdb->ecode = TDB_ERR_IO;
2113		return -1;
2114	}
2115
2116	TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %d byte database\n",
2117		 recovery_eof));
2118
2119	/* all done */
2120	return 0;
2121}
2122
2123/* file: freelist.c */
2124
2125/* read a freelist record and check for simple errors */
2126static int rec_free_read(struct tdb_context *tdb, tdb_off_t off, struct list_struct *rec)
2127{
2128	if (tdb->methods->tdb_read(tdb, off, rec, sizeof(*rec),DOCONV()) == -1)
2129		return -1;
2130
2131	if (rec->magic == TDB_MAGIC) {
2132		/* this happens when a app is showdown while deleting a record - we should
2133		   not completely fail when this happens */
2134		TDB_LOG((tdb, TDB_DEBUG_WARNING, "rec_free_read non-free magic 0x%x at offset=%d - fixing\n",
2135			 rec->magic, off));
2136		rec->magic = TDB_FREE_MAGIC;
2137		if (tdb->methods->tdb_write(tdb, off, rec, sizeof(*rec)) == -1)
2138			return -1;
2139	}
2140
2141	if (rec->magic != TDB_FREE_MAGIC) {
2142		/* Ensure ecode is set for log fn. */
2143		tdb->ecode = TDB_ERR_CORRUPT;
2144		TDB_LOG((tdb, TDB_DEBUG_WARNING, "rec_free_read bad magic 0x%x at offset=%d\n",
2145			   rec->magic, off));
2146		return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
2147	}
2148	if (tdb->methods->tdb_oob(tdb, rec->next+sizeof(*rec), 0) != 0)
2149		return -1;
2150	return 0;
2151}
2152
2153
2154
2155/* Remove an element from the freelist.  Must have alloc lock. */
2156static int remove_from_freelist(struct tdb_context *tdb, tdb_off_t off, tdb_off_t next)
2157{
2158	tdb_off_t last_ptr, i;
2159
2160	/* read in the freelist top */
2161	last_ptr = FREELIST_TOP;
2162	while (tdb_ofs_read(tdb, last_ptr, &i) != -1 && i != 0) {
2163		if (i == off) {
2164			/* We've found it! */
2165			return tdb_ofs_write(tdb, last_ptr, &next);
2166		}
2167		/* Follow chain (next offset is at start of record) */
2168		last_ptr = i;
2169	}
2170	TDB_LOG((tdb, TDB_DEBUG_FATAL,"remove_from_freelist: not on list at off=%d\n", off));
2171	return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
2172}
2173
2174
2175/* update a record tailer (must hold allocation lock) */
2176static int update_tailer(struct tdb_context *tdb, tdb_off_t offset,
2177			 const struct list_struct *rec)
2178{
2179	tdb_off_t totalsize;
2180
2181	/* Offset of tailer from record header */
2182	totalsize = sizeof(*rec) + rec->rec_len;
2183	return tdb_ofs_write(tdb, offset + totalsize - sizeof(tdb_off_t),
2184			 &totalsize);
2185}
2186
2187/* Add an element into the freelist. Merge adjacent records if
2188   neccessary. */
2189int tdb_free(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec)
2190{
2191	tdb_off_t right, left;
2192
2193	/* Allocation and tailer lock */
2194	if (tdb_lock(tdb, -1, F_WRLCK) != 0)
2195		return -1;
2196
2197	/* set an initial tailer, so if we fail we don't leave a bogus record */
2198	if (update_tailer(tdb, offset, rec) != 0) {
2199		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: update_tailer failed!\n"));
2200		goto fail;
2201	}
2202
2203	/* Look right first (I'm an Australian, dammit) */
2204	right = offset + sizeof(*rec) + rec->rec_len;
2205	if (right + sizeof(*rec) <= tdb->map_size) {
2206		struct list_struct r;
2207
2208		if (tdb->methods->tdb_read(tdb, right, &r, sizeof(r), DOCONV()) == -1) {
2209			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: right read failed at %u\n", right));
2210			goto left;
2211		}
2212
2213		/* If it's free, expand to include it. */
2214		if (r.magic == TDB_FREE_MAGIC) {
2215			if (remove_from_freelist(tdb, right, r.next) == -1) {
2216				TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: right free failed at %u\n", right));
2217				goto left;
2218			}
2219			rec->rec_len += sizeof(r) + r.rec_len;
2220		}
2221	}
2222
2223left:
2224	/* Look left */
2225	left = offset - sizeof(tdb_off_t);
2226	if (left > TDB_DATA_START(tdb->header.hash_size)) {
2227		struct list_struct l;
2228		tdb_off_t leftsize;
2229
2230		/* Read in tailer and jump back to header */
2231		if (tdb_ofs_read(tdb, left, &leftsize) == -1) {
2232			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: left offset read failed at %u\n", left));
2233			goto update;
2234		}
2235
2236		/* it could be uninitialised data */
2237		if (leftsize == 0 || leftsize == TDB_PAD_U32) {
2238			goto update;
2239		}
2240
2241		left = offset - leftsize;
2242
2243		/* Now read in record */
2244		if (tdb->methods->tdb_read(tdb, left, &l, sizeof(l), DOCONV()) == -1) {
2245			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: left read failed at %u (%u)\n", left, leftsize));
2246			goto update;
2247		}
2248
2249		/* If it's free, expand to include it. */
2250		if (l.magic == TDB_FREE_MAGIC) {
2251			if (remove_from_freelist(tdb, left, l.next) == -1) {
2252				TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: left free failed at %u\n", left));
2253				goto update;
2254			} else {
2255				offset = left;
2256				rec->rec_len += leftsize;
2257			}
2258		}
2259	}
2260
2261update:
2262	if (update_tailer(tdb, offset, rec) == -1) {
2263		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: update_tailer failed at %u\n", offset));
2264		goto fail;
2265	}
2266
2267	/* Now, prepend to free list */
2268	rec->magic = TDB_FREE_MAGIC;
2269
2270	if (tdb_ofs_read(tdb, FREELIST_TOP, &rec->next) == -1 ||
2271	    tdb_rec_write(tdb, offset, rec) == -1 ||
2272	    tdb_ofs_write(tdb, FREELIST_TOP, &offset) == -1) {
2273		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free record write failed at offset=%d\n", offset));
2274		goto fail;
2275	}
2276
2277	/* And we're done. */
2278	tdb_unlock(tdb, -1, F_WRLCK);
2279	return 0;
2280
2281 fail:
2282	tdb_unlock(tdb, -1, F_WRLCK);
2283	return -1;
2284}
2285
2286
2287/*
2288   the core of tdb_allocate - called when we have decided which
2289   free list entry to use
2290 */
2291static tdb_off_t tdb_allocate_ofs(struct tdb_context *tdb, tdb_len_t length, tdb_off_t rec_ptr,
2292				struct list_struct *rec, tdb_off_t last_ptr)
2293{
2294	struct list_struct newrec;
2295	tdb_off_t newrec_ptr;
2296
2297	memset(&newrec, '\0', sizeof(newrec));
2298
2299	/* found it - now possibly split it up  */
2300	if (rec->rec_len > length + MIN_REC_SIZE) {
2301		/* Length of left piece */
2302		length = TDB_ALIGN(length, TDB_ALIGNMENT);
2303
2304		/* Right piece to go on free list */
2305		newrec.rec_len = rec->rec_len - (sizeof(*rec) + length);
2306		newrec_ptr = rec_ptr + sizeof(*rec) + length;
2307
2308		/* And left record is shortened */
2309		rec->rec_len = length;
2310	} else {
2311		newrec_ptr = 0;
2312	}
2313
2314	/* Remove allocated record from the free list */
2315	if (tdb_ofs_write(tdb, last_ptr, &rec->next) == -1) {
2316		return 0;
2317	}
2318
2319	/* Update header: do this before we drop alloc
2320	   lock, otherwise tdb_free() might try to
2321	   merge with us, thinking we're free.
2322	   (Thanks Jeremy Allison). */
2323	rec->magic = TDB_MAGIC;
2324	if (tdb_rec_write(tdb, rec_ptr, rec) == -1) {
2325		return 0;
2326	}
2327
2328	/* Did we create new block? */
2329	if (newrec_ptr) {
2330		/* Update allocated record tailer (we
2331		   shortened it). */
2332		if (update_tailer(tdb, rec_ptr, rec) == -1) {
2333			return 0;
2334		}
2335
2336		/* Free new record */
2337		if (tdb_free(tdb, newrec_ptr, &newrec) == -1) {
2338			return 0;
2339		}
2340	}
2341
2342	/* all done - return the new record offset */
2343	return rec_ptr;
2344}
2345
2346/* allocate some space from the free list. The offset returned points
2347   to a unconnected list_struct within the database with room for at
2348   least length bytes of total data
2349
2350   0 is returned if the space could not be allocated
2351 */
2352tdb_off_t tdb_allocate(struct tdb_context *tdb, tdb_len_t length, struct list_struct *rec)
2353{
2354	tdb_off_t rec_ptr, last_ptr, newrec_ptr;
2355	struct {
2356		tdb_off_t rec_ptr, last_ptr;
2357		tdb_len_t rec_len;
2358	} bestfit;
2359
2360	if (tdb_lock(tdb, -1, F_WRLCK) == -1)
2361		return 0;
2362
2363	/* Extra bytes required for tailer */
2364	length += sizeof(tdb_off_t);
2365
2366 again:
2367	last_ptr = FREELIST_TOP;
2368
2369	/* read in the freelist top */
2370	if (tdb_ofs_read(tdb, FREELIST_TOP, &rec_ptr) == -1)
2371		goto fail;
2372
2373	bestfit.rec_ptr = 0;
2374	bestfit.last_ptr = 0;
2375	bestfit.rec_len = 0;
2376
2377	/*
2378	   this is a best fit allocation strategy. Originally we used
2379	   a first fit strategy, but it suffered from massive fragmentation
2380	   issues when faced with a slowly increasing record size.
2381	 */
2382	while (rec_ptr) {
2383		if (rec_free_read(tdb, rec_ptr, rec) == -1) {
2384			goto fail;
2385		}
2386
2387		if (rec->rec_len >= length) {
2388			if (bestfit.rec_ptr == 0 ||
2389			    rec->rec_len < bestfit.rec_len) {
2390				bestfit.rec_len = rec->rec_len;
2391				bestfit.rec_ptr = rec_ptr;
2392				bestfit.last_ptr = last_ptr;
2393				/* consider a fit to be good enough if
2394				   we aren't wasting more than half
2395				   the space */
2396				if (bestfit.rec_len < 2*length) {
2397					break;
2398				}
2399			}
2400		}
2401
2402		/* move to the next record */
2403		last_ptr = rec_ptr;
2404		rec_ptr = rec->next;
2405	}
2406
2407	if (bestfit.rec_ptr != 0) {
2408		if (rec_free_read(tdb, bestfit.rec_ptr, rec) == -1) {
2409			goto fail;
2410		}
2411
2412		newrec_ptr = tdb_allocate_ofs(tdb, length, bestfit.rec_ptr, rec, bestfit.last_ptr);
2413		tdb_unlock(tdb, -1, F_WRLCK);
2414		return newrec_ptr;
2415	}
2416
2417	/* we didn't find enough space. See if we can expand the
2418	   database and if we can then try again */
2419	if (tdb_expand(tdb, length + sizeof(*rec)) == 0)
2420		goto again;
2421 fail:
2422	tdb_unlock(tdb, -1, F_WRLCK);
2423	return 0;
2424}
2425
2426/* file: freelistcheck.c */
2427
2428/* Check the freelist is good and contains no loops.
2429   Very memory intensive - only do this as a consistency
2430   checker. Heh heh - uses an in memory tdb as the storage
2431   for the "seen" record list. For some reason this strikes
2432   me as extremely clever as I don't have to write another tree
2433   data structure implementation :-).
2434 */
2435
2436static int seen_insert(struct tdb_context *mem_tdb, tdb_off_t rec_ptr)
2437{
2438	TDB_DATA key, data;
2439
2440	memset(&data, '\0', sizeof(data));
2441	key.dptr = (unsigned char *)&rec_ptr;
2442	key.dsize = sizeof(rec_ptr);
2443	return tdb_store(mem_tdb, key, data, TDB_INSERT);
2444}
2445
2446int tdb_validate_freelist(struct tdb_context *tdb, int *pnum_entries)
2447{
2448	struct tdb_context *mem_tdb = NULL;
2449	struct list_struct rec;
2450	tdb_off_t rec_ptr, last_ptr;
2451	int ret = -1;
2452
2453	*pnum_entries = 0;
2454
2455	mem_tdb = tdb_open("flval", tdb->header.hash_size,
2456				TDB_INTERNAL, O_RDWR, 0600);
2457	if (!mem_tdb) {
2458		return -1;
2459	}
2460
2461	if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
2462		tdb_close(mem_tdb);
2463		return 0;
2464	}
2465
2466	last_ptr = FREELIST_TOP;
2467
2468	/* Store the FREELIST_TOP record. */
2469	if (seen_insert(mem_tdb, last_ptr) == -1) {
2470		ret = TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
2471		goto fail;
2472	}
2473
2474	/* read in the freelist top */
2475	if (tdb_ofs_read(tdb, FREELIST_TOP, &rec_ptr) == -1) {
2476		goto fail;
2477	}
2478
2479	while (rec_ptr) {
2480
2481		/* If we can't store this record (we've seen it
2482		   before) then the free list has a loop and must
2483		   be corrupt. */
2484
2485		if (seen_insert(mem_tdb, rec_ptr)) {
2486			ret = TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
2487			goto fail;
2488		}
2489
2490		if (rec_free_read(tdb, rec_ptr, &rec) == -1) {
2491			goto fail;
2492		}
2493
2494		/* move to the next record */
2495		last_ptr = rec_ptr;
2496		rec_ptr = rec.next;
2497		*pnum_entries += 1;
2498	}
2499
2500	ret = 0;
2501
2502  fail:
2503
2504	tdb_close(mem_tdb);
2505	tdb_unlock(tdb, -1, F_WRLCK);
2506	return ret;
2507}
2508
2509/* file: traverse.c */
2510
2511/* Uses traverse lock: 0 = finish, -1 = error, other = record offset */
2512static int tdb_next_lock(struct tdb_context *tdb, struct tdb_traverse_lock *tlock,
2513			 struct list_struct *rec)
2514{
2515	int want_next = (tlock->off != 0);
2516
2517	/* Lock each chain from the start one. */
2518	for (; tlock->hash < tdb->header.hash_size; tlock->hash++) {
2519		if (!tlock->off && tlock->hash != 0) {
2520			/* this is an optimisation for the common case where
2521			   the hash chain is empty, which is particularly
2522			   common for the use of tdb with ldb, where large
2523			   hashes are used. In that case we spend most of our
2524			   time in tdb_brlock(), locking empty hash chains.
2525
2526			   To avoid this, we do an unlocked pre-check to see
2527			   if the hash chain is empty before starting to look
2528			   inside it. If it is empty then we can avoid that
2529			   hash chain. If it isn't empty then we can't believe
2530			   the value we get back, as we read it without a
2531			   lock, so instead we get the lock and re-fetch the
2532			   value below.
2533
2534			   Notice that not doing this optimisation on the
2535			   first hash chain is critical. We must guarantee
2536			   that we have done at least one fcntl lock at the
2537			   start of a search to guarantee that memory is
2538			   coherent on SMP systems. If records are added by
2539			   others during the search then thats OK, and we
2540			   could possibly miss those with this trick, but we
2541			   could miss them anyway without this trick, so the
2542			   semantics don't change.
2543
2544			   With a non-indexed ldb search this trick gains us a
2545			   factor of around 80 in speed on a linux 2.6.x
2546			   system (testing using ldbtest).
2547			*/
2548			tdb->methods->next_hash_chain(tdb, &tlock->hash);
2549			if (tlock->hash == tdb->header.hash_size) {
2550				continue;
2551			}
2552		}
2553
2554		if (tdb_lock(tdb, tlock->hash, tlock->lock_rw) == -1)
2555			return -1;
2556
2557		/* No previous record?  Start at top of chain. */
2558		if (!tlock->off) {
2559			if (tdb_ofs_read(tdb, TDB_HASH_TOP(tlock->hash),
2560				     &tlock->off) == -1)
2561				goto fail;
2562		} else {
2563			/* Otherwise unlock the previous record. */
2564			if (tdb_unlock_record(tdb, tlock->off) != 0)
2565				goto fail;
2566		}
2567
2568		if (want_next) {
2569			/* We have offset of old record: grab next */
2570			if (tdb_rec_read(tdb, tlock->off, rec) == -1)
2571				goto fail;
2572			tlock->off = rec->next;
2573		}
2574
2575		/* Iterate through chain */
2576		while( tlock->off) {
2577			tdb_off_t current;
2578			if (tdb_rec_read(tdb, tlock->off, rec) == -1)
2579				goto fail;
2580
2581			/* Detect infinite loops. From "Shlomi Yaakobovich" <Shlomi@exanet.com>. */
2582			if (tlock->off == rec->next) {
2583				TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_next_lock: loop detected.\n"));
2584				goto fail;
2585			}
2586
2587			if (!TDB_DEAD(rec)) {
2588				/* Woohoo: we found one! */
2589				if (tdb_lock_record(tdb, tlock->off) != 0)
2590					goto fail;
2591				return tlock->off;
2592			}
2593
2594			/* Try to clean dead ones from old traverses */
2595			current = tlock->off;
2596			tlock->off = rec->next;
2597			if (!(tdb->read_only || tdb->traverse_read) &&
2598			    tdb_do_delete(tdb, current, rec) != 0)
2599				goto fail;
2600		}
2601		tdb_unlock(tdb, tlock->hash, tlock->lock_rw);
2602		want_next = 0;
2603	}
2604	/* We finished iteration without finding anything */
2605	return TDB_ERRCODE(TDB_SUCCESS, 0);
2606
2607 fail:
2608	tlock->off = 0;
2609	if (tdb_unlock(tdb, tlock->hash, tlock->lock_rw) != 0)
2610		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_next_lock: On error unlock failed!\n"));
2611	return -1;
2612}
2613
2614/* traverse the entire database - calling fn(tdb, key, data) on each element.
2615   return -1 on error or the record count traversed
2616   if fn is NULL then it is not called
2617   a non-zero return value from fn() indicates that the traversal should stop
2618  */
2619static int tdb_traverse_internal(struct tdb_context *tdb,
2620				 tdb_traverse_func fn, void *private_data,
2621				 struct tdb_traverse_lock *tl)
2622{
2623	TDB_DATA key, dbuf;
2624	struct list_struct rec;
2625	int ret, count = 0;
2626
2627	/* This was in the initializaton, above, but the IRIX compiler
2628	 * did not like it.  crh
2629	 */
2630	tl->next = tdb->travlocks.next;
2631
2632	/* fcntl locks don't stack: beware traverse inside traverse */
2633	tdb->travlocks.next = tl;
2634
2635	/* tdb_next_lock places locks on the record returned, and its chain */
2636	while ((ret = tdb_next_lock(tdb, tl, &rec)) > 0) {
2637		count++;
2638		/* now read the full record */
2639		key.dptr = tdb_alloc_read(tdb, tl->off + sizeof(rec),
2640					  rec.key_len + rec.data_len);
2641		if (!key.dptr) {
2642			ret = -1;
2643			if (tdb_unlock(tdb, tl->hash, tl->lock_rw) != 0)
2644				goto out;
2645			if (tdb_unlock_record(tdb, tl->off) != 0)
2646				TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_traverse: key.dptr == NULL and unlock_record failed!\n"));
2647			goto out;
2648		}
2649		key.dsize = rec.key_len;
2650		dbuf.dptr = key.dptr + rec.key_len;
2651		dbuf.dsize = rec.data_len;
2652
2653		/* Drop chain lock, call out */
2654		if (tdb_unlock(tdb, tl->hash, tl->lock_rw) != 0) {
2655			ret = -1;
2656			SAFE_FREE(key.dptr);
2657			goto out;
2658		}
2659		if (fn && fn(tdb, key, dbuf, private_data)) {
2660			/* They want us to terminate traversal */
2661			ret = count;
2662			if (tdb_unlock_record(tdb, tl->off) != 0) {
2663				TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_traverse: unlock_record failed!\n"));;
2664				ret = -1;
2665			}
2666			SAFE_FREE(key.dptr);
2667			goto out;
2668		}
2669		SAFE_FREE(key.dptr);
2670	}
2671out:
2672	tdb->travlocks.next = tl->next;
2673	if (ret < 0)
2674		return -1;
2675	else
2676		return count;
2677}
2678
2679
2680/*
2681  a write style traverse - temporarily marks the db read only
2682*/
2683int tdb_traverse_read(struct tdb_context *tdb,
2684		      tdb_traverse_func fn, void *private_data)
2685{
2686	struct tdb_traverse_lock tl = { NULL, 0, 0, F_RDLCK };
2687	int ret;
2688
2689	/* we need to get a read lock on the transaction lock here to
2690	   cope with the lock ordering semantics of solaris10 */
2691	if (tdb->methods->tdb_brlock(tdb, TRANSACTION_LOCK, F_RDLCK, F_SETLKW, 0, 1) == -1) {
2692		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_traverse_read: failed to get transaction lock\n"));
2693		tdb->ecode = TDB_ERR_LOCK;
2694		return -1;
2695	}
2696
2697	tdb->traverse_read++;
2698	ret = tdb_traverse_internal(tdb, fn, private_data, &tl);
2699	tdb->traverse_read--;
2700
2701	tdb->methods->tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
2702
2703	return ret;
2704}
2705
2706/*
2707  a write style traverse - needs to get the transaction lock to
2708  prevent deadlocks
2709*/
2710int tdb_traverse(struct tdb_context *tdb,
2711		 tdb_traverse_func fn, void *private_data)
2712{
2713	struct tdb_traverse_lock tl = { NULL, 0, 0, F_WRLCK };
2714	int ret;
2715
2716	if (tdb->read_only || tdb->traverse_read) {
2717		return tdb_traverse_read(tdb, fn, private_data);
2718	}
2719
2720	if (tdb->methods->tdb_brlock(tdb, TRANSACTION_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
2721		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_traverse: failed to get transaction lock\n"));
2722		tdb->ecode = TDB_ERR_LOCK;
2723		return -1;
2724	}
2725
2726	ret = tdb_traverse_internal(tdb, fn, private_data, &tl);
2727
2728	tdb->methods->tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
2729
2730	return ret;
2731}
2732
2733
2734/* find the first entry in the database and return its key */
2735TDB_DATA tdb_firstkey(struct tdb_context *tdb)
2736{
2737	TDB_DATA key;
2738	struct list_struct rec;
2739
2740	/* release any old lock */
2741	if (tdb_unlock_record(tdb, tdb->travlocks.off) != 0)
2742		return tdb_null;
2743	tdb->travlocks.off = tdb->travlocks.hash = 0;
2744	tdb->travlocks.lock_rw = F_RDLCK;
2745
2746	if (tdb_next_lock(tdb, &tdb->travlocks, &rec) <= 0)
2747		return tdb_null;
2748	/* now read the key */
2749	key.dsize = rec.key_len;
2750	key.dptr =tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),key.dsize);
2751	if (tdb_unlock(tdb, BUCKET(tdb->travlocks.hash), F_WRLCK) != 0)
2752		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_firstkey: error occurred while tdb_unlocking!\n"));
2753	return key;
2754}
2755
2756/* find the next entry in the database, returning its key */
2757TDB_DATA tdb_nextkey(struct tdb_context *tdb, TDB_DATA oldkey)
2758{
2759	u32 oldhash;
2760	TDB_DATA key = tdb_null;
2761	struct list_struct rec;
2762	unsigned char *k = NULL;
2763
2764	/* Is locked key the old key?  If so, traverse will be reliable. */
2765	if (tdb->travlocks.off) {
2766		if (tdb_lock(tdb,tdb->travlocks.hash,F_WRLCK))
2767			return tdb_null;
2768		if (tdb_rec_read(tdb, tdb->travlocks.off, &rec) == -1
2769		    || !(k = tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),
2770					    rec.key_len))
2771		    || memcmp(k, oldkey.dptr, oldkey.dsize) != 0) {
2772			/* No, it wasn't: unlock it and start from scratch */
2773			if (tdb_unlock_record(tdb, tdb->travlocks.off) != 0) {
2774				SAFE_FREE(k);
2775				return tdb_null;
2776			}
2777			if (tdb_unlock(tdb, tdb->travlocks.hash, F_WRLCK) != 0) {
2778				SAFE_FREE(k);
2779				return tdb_null;
2780			}
2781			tdb->travlocks.off = 0;
2782		}
2783
2784		SAFE_FREE(k);
2785	}
2786
2787	if (!tdb->travlocks.off) {
2788		/* No previous element: do normal find, and lock record */
2789		tdb->travlocks.off = tdb_find_lock_hash(tdb, oldkey, tdb->hash_fn(&oldkey), F_WRLCK, &rec);
2790		if (!tdb->travlocks.off)
2791			return tdb_null;
2792		tdb->travlocks.hash = BUCKET(rec.full_hash);
2793		if (tdb_lock_record(tdb, tdb->travlocks.off) != 0) {
2794			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_nextkey: lock_record failed (%s)!\n", strerror(errno)));
2795			return tdb_null;
2796		}
2797	}
2798	oldhash = tdb->travlocks.hash;
2799
2800	/* Grab next record: locks chain and returned record,
2801	   unlocks old record */
2802	if (tdb_next_lock(tdb, &tdb->travlocks, &rec) > 0) {
2803		key.dsize = rec.key_len;
2804		key.dptr = tdb_alloc_read(tdb, tdb->travlocks.off+sizeof(rec),
2805					  key.dsize);
2806		/* Unlock the chain of this new record */
2807		if (tdb_unlock(tdb, tdb->travlocks.hash, F_WRLCK) != 0)
2808			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
2809	}
2810	/* Unlock the chain of old record */
2811	if (tdb_unlock(tdb, BUCKET(oldhash), F_WRLCK) != 0)
2812		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
2813	return key;
2814}
2815
2816/* file: dump.c */
2817
2818static tdb_off_t tdb_dump_record(struct tdb_context *tdb, tdb_off_t offset)
2819{
2820	struct list_struct rec;
2821	tdb_off_t tailer_ofs, tailer;
2822
2823	if (tdb->methods->tdb_read(tdb, offset, (char *)&rec,
2824				   sizeof(rec), DOCONV()) == -1) {
2825		printf("ERROR: failed to read record at %u\n", offset);
2826		return 0;
2827	}
2828
2829	printf(" rec: offset=0x%08x next=0x%08x rec_len=%d key_len=%d data_len=%d full_hash=0x%x magic=0x%x\n",
2830	       offset, rec.next, rec.rec_len, rec.key_len, rec.data_len, rec.full_hash, rec.magic);
2831
2832	tailer_ofs = offset + sizeof(rec) + rec.rec_len - sizeof(tdb_off_t);
2833
2834	if (tdb_ofs_read(tdb, tailer_ofs, &tailer) == -1) {
2835		printf("ERROR: failed to read tailer at %u\n", tailer_ofs);
2836		return rec.next;
2837	}
2838
2839	if (tailer != rec.rec_len + sizeof(rec)) {
2840		printf("ERROR: tailer does not match record! tailer=%u totalsize=%u\n",
2841				(unsigned int)tailer, (unsigned int)(rec.rec_len + sizeof(rec)));
2842	}
2843	return rec.next;
2844}
2845
2846static int tdb_dump_chain(struct tdb_context *tdb, int i)
2847{
2848	tdb_off_t rec_ptr, top;
2849
2850	top = TDB_HASH_TOP(i);
2851
2852	if (tdb_lock(tdb, i, F_WRLCK) != 0)
2853		return -1;
2854
2855	if (tdb_ofs_read(tdb, top, &rec_ptr) == -1)
2856		return tdb_unlock(tdb, i, F_WRLCK);
2857
2858	if (rec_ptr)
2859		printf("hash=%d\n", i);
2860
2861	while (rec_ptr) {
2862		rec_ptr = tdb_dump_record(tdb, rec_ptr);
2863	}
2864
2865	return tdb_unlock(tdb, i, F_WRLCK);
2866}
2867
2868void tdb_dump_all(struct tdb_context *tdb)
2869{
2870	int i;
2871	for (i=0;i<tdb->header.hash_size;i++) {
2872		tdb_dump_chain(tdb, i);
2873	}
2874	printf("freelist:\n");
2875	tdb_dump_chain(tdb, -1);
2876}
2877
2878int tdb_printfreelist(struct tdb_context *tdb)
2879{
2880	int ret;
2881	long total_free = 0;
2882	tdb_off_t offset, rec_ptr;
2883	struct list_struct rec;
2884
2885	if ((ret = tdb_lock(tdb, -1, F_WRLCK)) != 0)
2886		return ret;
2887
2888	offset = FREELIST_TOP;
2889
2890	/* read in the freelist top */
2891	if (tdb_ofs_read(tdb, offset, &rec_ptr) == -1) {
2892		tdb_unlock(tdb, -1, F_WRLCK);
2893		return 0;
2894	}
2895
2896	printf("freelist top=[0x%08x]\n", rec_ptr );
2897	while (rec_ptr) {
2898		if (tdb->methods->tdb_read(tdb, rec_ptr, (char *)&rec,
2899					   sizeof(rec), DOCONV()) == -1) {
2900			tdb_unlock(tdb, -1, F_WRLCK);
2901			return -1;
2902		}
2903
2904		if (rec.magic != TDB_FREE_MAGIC) {
2905			printf("bad magic 0x%08x in free list\n", rec.magic);
2906			tdb_unlock(tdb, -1, F_WRLCK);
2907			return -1;
2908		}
2909
2910		printf("entry offset=[0x%08x], rec.rec_len = [0x%08x (%d)] (end = 0x%08x)\n",
2911		       rec_ptr, rec.rec_len, rec.rec_len, rec_ptr + rec.rec_len);
2912		total_free += rec.rec_len;
2913
2914		/* move to the next record */
2915		rec_ptr = rec.next;
2916	}
2917	printf("total rec_len = [0x%08x (%d)]\n", (int)total_free,
2918               (int)total_free);
2919
2920	return tdb_unlock(tdb, -1, F_WRLCK);
2921}
2922
2923/* file: tdb.c */
2924
2925TDB_DATA tdb_null;
2926
2927/*
2928  increment the tdb sequence number if the tdb has been opened using
2929  the TDB_SEQNUM flag
2930*/
2931static void tdb_increment_seqnum(struct tdb_context *tdb)
2932{
2933	tdb_off_t seqnum=0;
2934
2935	if (!(tdb->flags & TDB_SEQNUM)) {
2936		return;
2937	}
2938
2939	if (tdb_brlock(tdb, TDB_SEQNUM_OFS, F_WRLCK, F_SETLKW, 1, 1) != 0) {
2940		return;
2941	}
2942
2943	/* we ignore errors from this, as we have no sane way of
2944	   dealing with them.
2945	*/
2946	tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
2947	seqnum++;
2948	tdb_ofs_write(tdb, TDB_SEQNUM_OFS, &seqnum);
2949
2950	tdb_brlock(tdb, TDB_SEQNUM_OFS, F_UNLCK, F_SETLKW, 1, 1);
2951}
2952
2953static int tdb_key_compare(TDB_DATA key, TDB_DATA data, void *private_data)
2954{
2955	return memcmp(data.dptr, key.dptr, data.dsize);
2956}
2957
2958/* Returns 0 on fail.  On success, return offset of record, and fills
2959   in rec */
2960static tdb_off_t tdb_find(struct tdb_context *tdb, TDB_DATA key, u32 hash,
2961			struct list_struct *r)
2962{
2963	tdb_off_t rec_ptr;
2964
2965	/* read in the hash top */
2966	if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
2967		return 0;
2968
2969	/* keep looking until we find the right record */
2970	while (rec_ptr) {
2971		if (tdb_rec_read(tdb, rec_ptr, r) == -1)
2972			return 0;
2973
2974		if (!TDB_DEAD(r) && hash==r->full_hash
2975		    && key.dsize==r->key_len
2976		    && tdb_parse_data(tdb, key, rec_ptr + sizeof(*r),
2977				      r->key_len, tdb_key_compare,
2978				      NULL) == 0) {
2979			return rec_ptr;
2980		}
2981		rec_ptr = r->next;
2982	}
2983	return TDB_ERRCODE(TDB_ERR_NOEXIST, 0);
2984}
2985
2986/* As tdb_find, but if you succeed, keep the lock */
2987tdb_off_t tdb_find_lock_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash, int locktype,
2988			   struct list_struct *rec)
2989{
2990	u32 rec_ptr;
2991
2992	if (tdb_lock(tdb, BUCKET(hash), locktype) == -1)
2993		return 0;
2994	if (!(rec_ptr = tdb_find(tdb, key, hash, rec)))
2995		tdb_unlock(tdb, BUCKET(hash), locktype);
2996	return rec_ptr;
2997}
2998
2999
3000/* update an entry in place - this only works if the new data size
3001   is <= the old data size and the key exists.
3002   on failure return -1.
3003*/
3004static int tdb_update_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash, TDB_DATA dbuf)
3005{
3006	struct list_struct rec;
3007	tdb_off_t rec_ptr;
3008
3009	/* find entry */
3010	if (!(rec_ptr = tdb_find(tdb, key, hash, &rec)))
3011		return -1;
3012
3013	/* must be long enough key, data and tailer */
3014	if (rec.rec_len < key.dsize + dbuf.dsize + sizeof(tdb_off_t)) {
3015		tdb->ecode = TDB_SUCCESS; /* Not really an error */
3016		return -1;
3017	}
3018
3019	if (tdb->methods->tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len,
3020		      dbuf.dptr, dbuf.dsize) == -1)
3021		return -1;
3022
3023	if (dbuf.dsize != rec.data_len) {
3024		/* update size */
3025		rec.data_len = dbuf.dsize;
3026		return tdb_rec_write(tdb, rec_ptr, &rec);
3027	}
3028
3029	return 0;
3030}
3031
3032/* find an entry in the database given a key */
3033/* If an entry doesn't exist tdb_err will be set to
3034 * TDB_ERR_NOEXIST. If a key has no data attached
3035 * then the TDB_DATA will have zero length but
3036 * a non-zero pointer
3037 */
3038TDB_DATA tdb_fetch(struct tdb_context *tdb, TDB_DATA key)
3039{
3040	tdb_off_t rec_ptr;
3041	struct list_struct rec;
3042	TDB_DATA ret;
3043	u32 hash;
3044
3045	/* find which hash bucket it is in */
3046	hash = tdb->hash_fn(&key);
3047	if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec)))
3048		return tdb_null;
3049
3050	ret.dptr = tdb_alloc_read(tdb, rec_ptr + sizeof(rec) + rec.key_len,
3051				  rec.data_len);
3052	ret.dsize = rec.data_len;
3053	tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
3054	return ret;
3055}
3056
3057/*
3058 * Find an entry in the database and hand the record's data to a parsing
3059 * function. The parsing function is executed under the chain read lock, so it
3060 * should be fast and should not block on other syscalls.
3061 *
3062 * DONT CALL OTHER TDB CALLS FROM THE PARSER, THIS MIGHT LEAD TO SEGFAULTS.
3063 *
3064 * For mmapped tdb's that do not have a transaction open it points the parsing
3065 * function directly at the mmap area, it avoids the malloc/memcpy in this
3066 * case. If a transaction is open or no mmap is available, it has to do
3067 * malloc/read/parse/free.
3068 *
3069 * This is interesting for all readers of potentially large data structures in
3070 * the tdb records, ldb indexes being one example.
3071 */
3072
3073int tdb_parse_record(struct tdb_context *tdb, TDB_DATA key,
3074		     int (*parser)(TDB_DATA key, TDB_DATA data,
3075				   void *private_data),
3076		     void *private_data)
3077{
3078	tdb_off_t rec_ptr;
3079	struct list_struct rec;
3080	int ret;
3081	u32 hash;
3082
3083	/* find which hash bucket it is in */
3084	hash = tdb->hash_fn(&key);
3085
3086	if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec))) {
3087		return TDB_ERRCODE(TDB_ERR_NOEXIST, 0);
3088	}
3089
3090	ret = tdb_parse_data(tdb, key, rec_ptr + sizeof(rec) + rec.key_len,
3091			     rec.data_len, parser, private_data);
3092
3093	tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
3094
3095	return ret;
3096}
3097
3098/* check if an entry in the database exists
3099
3100   note that 1 is returned if the key is found and 0 is returned if not found
3101   this doesn't match the conventions in the rest of this module, but is
3102   compatible with gdbm
3103*/
3104static int tdb_exists_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash)
3105{
3106	struct list_struct rec;
3107
3108	if (tdb_find_lock_hash(tdb, key, hash, F_RDLCK, &rec) == 0)
3109		return 0;
3110	tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
3111	return 1;
3112}
3113
3114int tdb_exists(struct tdb_context *tdb, TDB_DATA key)
3115{
3116	u32 hash = tdb->hash_fn(&key);
3117	return tdb_exists_hash(tdb, key, hash);
3118}
3119
3120/* actually delete an entry in the database given the offset */
3121int tdb_do_delete(struct tdb_context *tdb, tdb_off_t rec_ptr, struct list_struct*rec)
3122{
3123	tdb_off_t last_ptr, i;
3124	struct list_struct lastrec;
3125
3126	if (tdb->read_only || tdb->traverse_read) return -1;
3127
3128	if (tdb_write_lock_record(tdb, rec_ptr) == -1) {
3129		/* Someone traversing here: mark it as dead */
3130		rec->magic = TDB_DEAD_MAGIC;
3131		return tdb_rec_write(tdb, rec_ptr, rec);
3132	}
3133	if (tdb_write_unlock_record(tdb, rec_ptr) != 0)
3134		return -1;
3135
3136	/* find previous record in hash chain */
3137	if (tdb_ofs_read(tdb, TDB_HASH_TOP(rec->full_hash), &i) == -1)
3138		return -1;
3139	for (last_ptr = 0; i != rec_ptr; last_ptr = i, i = lastrec.next)
3140		if (tdb_rec_read(tdb, i, &lastrec) == -1)
3141			return -1;
3142
3143	/* unlink it: next ptr is at start of record. */
3144	if (last_ptr == 0)
3145		last_ptr = TDB_HASH_TOP(rec->full_hash);
3146	if (tdb_ofs_write(tdb, last_ptr, &rec->next) == -1)
3147		return -1;
3148
3149	/* recover the space */
3150	if (tdb_free(tdb, rec_ptr, rec) == -1)
3151		return -1;
3152	return 0;
3153}
3154
3155static int tdb_count_dead(struct tdb_context *tdb, u32 hash)
3156{
3157	int res = 0;
3158	tdb_off_t rec_ptr;
3159	struct list_struct rec;
3160
3161	/* read in the hash top */
3162	if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
3163		return 0;
3164
3165	while (rec_ptr) {
3166		if (tdb_rec_read(tdb, rec_ptr, &rec) == -1)
3167			return 0;
3168
3169		if (rec.magic == TDB_DEAD_MAGIC) {
3170			res += 1;
3171		}
3172		rec_ptr = rec.next;
3173	}
3174	return res;
3175}
3176
3177/*
3178 * Purge all DEAD records from a hash chain
3179 */
3180static int tdb_purge_dead(struct tdb_context *tdb, u32 hash)
3181{
3182	int res = -1;
3183	struct list_struct rec;
3184	tdb_off_t rec_ptr;
3185
3186	if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
3187		return -1;
3188	}
3189
3190	/* read in the hash top */
3191	if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
3192		goto fail;
3193
3194	while (rec_ptr) {
3195		tdb_off_t next;
3196
3197		if (tdb_rec_read(tdb, rec_ptr, &rec) == -1) {
3198			goto fail;
3199		}
3200
3201		next = rec.next;
3202
3203		if (rec.magic == TDB_DEAD_MAGIC
3204		    && tdb_do_delete(tdb, rec_ptr, &rec) == -1) {
3205			goto fail;
3206		}
3207		rec_ptr = next;
3208	}
3209	res = 0;
3210 fail:
3211	tdb_unlock(tdb, -1, F_WRLCK);
3212	return res;
3213}
3214
3215/* delete an entry in the database given a key */
3216static int tdb_delete_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash)
3217{
3218	tdb_off_t rec_ptr;
3219	struct list_struct rec;
3220	int ret;
3221
3222	if (tdb->max_dead_records != 0) {
3223
3224		/*
3225		 * Allow for some dead records per hash chain, mainly for
3226		 * tdb's with a very high create/delete rate like locking.tdb.
3227		 */
3228
3229		if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
3230			return -1;
3231
3232		if (tdb_count_dead(tdb, hash) >= tdb->max_dead_records) {
3233			/*
3234			 * Don't let the per-chain freelist grow too large,
3235			 * delete all existing dead records
3236			 */
3237			tdb_purge_dead(tdb, hash);
3238		}
3239
3240		if (!(rec_ptr = tdb_find(tdb, key, hash, &rec))) {
3241			tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
3242			return -1;
3243		}
3244
3245		/*
3246		 * Just mark the record as dead.
3247		 */
3248		rec.magic = TDB_DEAD_MAGIC;
3249		ret = tdb_rec_write(tdb, rec_ptr, &rec);
3250	}
3251	else {
3252		if (!(rec_ptr = tdb_find_lock_hash(tdb, key, hash, F_WRLCK,
3253						   &rec)))
3254			return -1;
3255
3256		ret = tdb_do_delete(tdb, rec_ptr, &rec);
3257	}
3258
3259	if (ret == 0) {
3260		tdb_increment_seqnum(tdb);
3261	}
3262
3263	if (tdb_unlock(tdb, BUCKET(rec.full_hash), F_WRLCK) != 0)
3264		TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_delete: WARNING tdb_unlock failed!\n"));
3265	return ret;
3266}
3267
3268int tdb_delete(struct tdb_context *tdb, TDB_DATA key)
3269{
3270	u32 hash = tdb->hash_fn(&key);
3271	return tdb_delete_hash(tdb, key, hash);
3272}
3273
3274/*
3275 * See if we have a dead record around with enough space
3276 */
3277static tdb_off_t tdb_find_dead(struct tdb_context *tdb, u32 hash,
3278			       struct list_struct *r, tdb_len_t length)
3279{
3280	tdb_off_t rec_ptr;
3281
3282	/* read in the hash top */
3283	if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
3284		return 0;
3285
3286	/* keep looking until we find the right record */
3287	while (rec_ptr) {
3288		if (tdb_rec_read(tdb, rec_ptr, r) == -1)
3289			return 0;
3290
3291		if (TDB_DEAD(r) && r->rec_len >= length) {
3292			/*
3293			 * First fit for simple coding, TODO: change to best
3294			 * fit
3295			 */
3296			return rec_ptr;
3297		}
3298		rec_ptr = r->next;
3299	}
3300	return 0;
3301}
3302
3303/* store an element in the database, replacing any existing element
3304   with the same key
3305
3306   return 0 on success, -1 on failure
3307*/
3308int tdb_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
3309{
3310	struct list_struct rec;
3311	u32 hash;
3312	tdb_off_t rec_ptr;
3313	char *p = NULL;
3314	int ret = -1;
3315
3316	if (tdb->read_only || tdb->traverse_read) {
3317		tdb->ecode = TDB_ERR_RDONLY;
3318		return -1;
3319	}
3320
3321	/* find which hash bucket it is in */
3322	hash = tdb->hash_fn(&key);
3323	if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
3324		return -1;
3325
3326	/* check for it existing, on insert. */
3327	if (flag == TDB_INSERT) {
3328		if (tdb_exists_hash(tdb, key, hash)) {
3329			tdb->ecode = TDB_ERR_EXISTS;
3330			goto fail;
3331		}
3332	} else {
3333		/* first try in-place update, on modify or replace. */
3334		if (tdb_update_hash(tdb, key, hash, dbuf) == 0) {
3335			goto done;
3336		}
3337		if (tdb->ecode == TDB_ERR_NOEXIST &&
3338		    flag == TDB_MODIFY) {
3339			/* if the record doesn't exist and we are in TDB_MODIFY mode then
3340			 we should fail the store */
3341			goto fail;
3342		}
3343	}
3344	/* reset the error code potentially set by the tdb_update() */
3345	tdb->ecode = TDB_SUCCESS;
3346
3347	/* delete any existing record - if it doesn't exist we don't
3348           care.  Doing this first reduces fragmentation, and avoids
3349           coalescing with `allocated' block before it's updated. */
3350	if (flag != TDB_INSERT)
3351		tdb_delete_hash(tdb, key, hash);
3352
3353	/* Copy key+value *before* allocating free space in case malloc
3354	   fails and we are left with a dead spot in the tdb. */
3355
3356	if (!(p = (char *)malloc(key.dsize + dbuf.dsize))) {
3357		tdb->ecode = TDB_ERR_OOM;
3358		goto fail;
3359	}
3360
3361	memcpy(p, key.dptr, key.dsize);
3362	if (dbuf.dsize)
3363		memcpy(p+key.dsize, dbuf.dptr, dbuf.dsize);
3364
3365	if (tdb->max_dead_records != 0) {
3366		/*
3367		 * Allow for some dead records per hash chain, look if we can
3368		 * find one that can hold the new record. We need enough space
3369		 * for key, data and tailer. If we find one, we don't have to
3370		 * consult the central freelist.
3371		 */
3372		rec_ptr = tdb_find_dead(
3373			tdb, hash, &rec,
3374			key.dsize + dbuf.dsize + sizeof(tdb_off_t));
3375
3376		if (rec_ptr != 0) {
3377			rec.key_len = key.dsize;
3378			rec.data_len = dbuf.dsize;
3379			rec.full_hash = hash;
3380			rec.magic = TDB_MAGIC;
3381			if (tdb_rec_write(tdb, rec_ptr, &rec) == -1
3382			    || tdb->methods->tdb_write(
3383				    tdb, rec_ptr + sizeof(rec),
3384				    p, key.dsize + dbuf.dsize) == -1) {
3385				goto fail;
3386			}
3387			goto done;
3388		}
3389	}
3390
3391	/*
3392	 * We have to allocate some space from the freelist, so this means we
3393	 * have to lock it. Use the chance to purge all the DEAD records from
3394	 * the hash chain under the freelist lock.
3395	 */
3396
3397	if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
3398		goto fail;
3399	}
3400
3401	if ((tdb->max_dead_records != 0)
3402	    && (tdb_purge_dead(tdb, hash) == -1)) {
3403		tdb_unlock(tdb, -1, F_WRLCK);
3404		goto fail;
3405	}
3406
3407	/* we have to allocate some space */
3408	rec_ptr = tdb_allocate(tdb, key.dsize + dbuf.dsize, &rec);
3409
3410	tdb_unlock(tdb, -1, F_WRLCK);
3411
3412	if (rec_ptr == 0) {
3413		goto fail;
3414	}
3415
3416	/* Read hash top into next ptr */
3417	if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
3418		goto fail;
3419
3420	rec.key_len = key.dsize;
3421	rec.data_len = dbuf.dsize;
3422	rec.full_hash = hash;
3423	rec.magic = TDB_MAGIC;
3424
3425	/* write out and point the top of the hash chain at it */
3426	if (tdb_rec_write(tdb, rec_ptr, &rec) == -1
3427	    || tdb->methods->tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+dbuf.dsize)==-1
3428	    || tdb_ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
3429		/* Need to tdb_unallocate() here */
3430		goto fail;
3431	}
3432
3433 done:
3434	ret = 0;
3435 fail:
3436	if (ret == 0) {
3437		tdb_increment_seqnum(tdb);
3438	}
3439
3440	SAFE_FREE(p);
3441	tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
3442	return ret;
3443}
3444
3445
3446/* Append to an entry. Create if not exist. */
3447int tdb_append(struct tdb_context *tdb, TDB_DATA key, TDB_DATA new_dbuf)
3448{
3449	u32 hash;
3450	TDB_DATA dbuf;
3451	int ret = -1;
3452
3453	/* find which hash bucket it is in */
3454	hash = tdb->hash_fn(&key);
3455	if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
3456		return -1;
3457
3458	dbuf = tdb_fetch(tdb, key);
3459
3460	if (dbuf.dptr == NULL) {
3461		dbuf.dptr = (unsigned char *)malloc(new_dbuf.dsize);
3462	} else {
3463		dbuf.dptr = (unsigned char *)realloc(dbuf.dptr,
3464						     dbuf.dsize + new_dbuf.dsize);
3465	}
3466
3467	if (dbuf.dptr == NULL) {
3468		tdb->ecode = TDB_ERR_OOM;
3469		goto failed;
3470	}
3471
3472	memcpy(dbuf.dptr + dbuf.dsize, new_dbuf.dptr, new_dbuf.dsize);
3473	dbuf.dsize += new_dbuf.dsize;
3474
3475	ret = tdb_store(tdb, key, dbuf, 0);
3476
3477failed:
3478	tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
3479	SAFE_FREE(dbuf.dptr);
3480	return ret;
3481}
3482
3483
3484/*
3485  return the name of the current tdb file
3486  useful for external logging functions
3487*/
3488const char *tdb_name(struct tdb_context *tdb)
3489{
3490	return tdb->name;
3491}
3492
3493/*
3494  return the underlying file descriptor being used by tdb, or -1
3495  useful for external routines that want to check the device/inode
3496  of the fd
3497*/
3498int tdb_fd(struct tdb_context *tdb)
3499{
3500	return tdb->fd;
3501}
3502
3503/*
3504  return the current logging function
3505  useful for external tdb routines that wish to log tdb errors
3506*/
3507tdb_log_func tdb_log_fn(struct tdb_context *tdb)
3508{
3509	return tdb->log.log_fn;
3510}
3511
3512
3513/*
3514  get the tdb sequence number. Only makes sense if the writers opened
3515  with TDB_SEQNUM set. Note that this sequence number will wrap quite
3516  quickly, so it should only be used for a 'has something changed'
3517  test, not for code that relies on the count of the number of changes
3518  made. If you want a counter then use a tdb record.
3519
3520  The aim of this sequence number is to allow for a very lightweight
3521  test of a possible tdb change.
3522*/
3523int tdb_get_seqnum(struct tdb_context *tdb)
3524{
3525	tdb_off_t seqnum=0;
3526
3527	tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
3528	return seqnum;
3529}
3530
3531int tdb_hash_size(struct tdb_context *tdb)
3532{
3533	return tdb->header.hash_size;
3534}
3535
3536size_t tdb_map_size(struct tdb_context *tdb)
3537{
3538	return tdb->map_size;
3539}
3540
3541int tdb_get_flags(struct tdb_context *tdb)
3542{
3543	return tdb->flags;
3544}
3545
3546/* file: open.c */
3547
3548/* all contexts, to ensure no double-opens (fcntl locks don't nest!) */
3549static struct tdb_context *tdbs = NULL;
3550
3551
3552/* This is based on the hash algorithm from gdbm */
3553static unsigned int default_tdb_hash(TDB_DATA *key)
3554{
3555	u32 value;	/* Used to compute the hash value.  */
3556	u32   i;	/* Used to cycle through random values. */
3557
3558	/* Set the initial value from the key size. */
3559	for (value = 0x238F13AF * key->dsize, i=0; i < key->dsize; i++)
3560		value = (value + (key->dptr[i] << (i*5 % 24)));
3561
3562	return (1103515243 * value + 12345);
3563}
3564
3565
3566/* initialise a new database with a specified hash size */
3567static int tdb_new_database(struct tdb_context *tdb, int hash_size)
3568{
3569	struct tdb_header *newdb;
3570	int size, ret = -1;
3571
3572	/* We make it up in memory, then write it out if not internal */
3573	size = sizeof(struct tdb_header) + (hash_size+1)*sizeof(tdb_off_t);
3574	if (!(newdb = (struct tdb_header *)calloc(size, 1)))
3575		return TDB_ERRCODE(TDB_ERR_OOM, -1);
3576
3577	/* Fill in the header */
3578	newdb->version = TDB_VERSION;
3579	newdb->hash_size = hash_size;
3580	if (tdb->flags & TDB_INTERNAL) {
3581		tdb->map_size = size;
3582		tdb->map_ptr = (char *)newdb;
3583		memcpy(&tdb->header, newdb, sizeof(tdb->header));
3584		/* Convert the `ondisk' version if asked. */
3585		CONVERT(*newdb);
3586		return 0;
3587	}
3588	if (lseek(tdb->fd, 0, SEEK_SET) == -1)
3589		goto fail;
3590
3591	if (ftruncate(tdb->fd, 0) == -1)
3592		goto fail;
3593
3594	/* This creates an endian-converted header, as if read from disk */
3595	CONVERT(*newdb);
3596	memcpy(&tdb->header, newdb, sizeof(tdb->header));
3597	/* Don't endian-convert the magic food! */
3598	memcpy(newdb->magic_food, TDB_MAGIC_FOOD, strlen(TDB_MAGIC_FOOD)+1);
3599	if (write(tdb->fd, newdb, size) != size) {
3600		ret = -1;
3601	} else {
3602		ret = 0;
3603	}
3604
3605  fail:
3606	SAFE_FREE(newdb);
3607	return ret;
3608}
3609
3610
3611
3612static int tdb_already_open(dev_t device,
3613			    ino_t ino)
3614{
3615	struct tdb_context *i;
3616
3617	for (i = tdbs; i; i = i->next) {
3618		if (i->device == device && i->inode == ino) {
3619			return 1;
3620		}
3621	}
3622
3623	return 0;
3624}
3625
3626/* open the database, creating it if necessary
3627
3628   The open_flags and mode are passed straight to the open call on the
3629   database file. A flags value of O_WRONLY is invalid. The hash size
3630   is advisory, use zero for a default value.
3631
3632   Return is NULL on error, in which case errno is also set.  Don't
3633   try to call tdb_error or tdb_errname, just do strerror(errno).
3634
3635   @param name may be NULL for internal databases. */
3636struct tdb_context *tdb_open(const char *name, int hash_size, int tdb_flags,
3637		      int open_flags, mode_t mode)
3638{
3639	return tdb_open_ex(name, hash_size, tdb_flags, open_flags, mode, NULL, NULL);
3640}
3641
3642/* a default logging function */
3643static void null_log_fn(struct tdb_context *tdb, enum tdb_debug_level level, const char *fmt, ...) PRINTF_ATTRIBUTE(3, 4);
3644static void null_log_fn(struct tdb_context *tdb, enum tdb_debug_level level, const char *fmt, ...)
3645{
3646}
3647
3648
3649struct tdb_context *tdb_open_ex(const char *name, int hash_size, int tdb_flags,
3650				int open_flags, mode_t mode,
3651				const struct tdb_logging_context *log_ctx,
3652				tdb_hash_func hash_fn)
3653{
3654	struct tdb_context *tdb;
3655	struct stat st;
3656	int rev = 0, locked = 0;
3657	unsigned char *vp;
3658	u32 vertest;
3659
3660	if (!(tdb = (struct tdb_context *)calloc(1, sizeof *tdb))) {
3661		/* Can't log this */
3662		errno = ENOMEM;
3663		goto fail;
3664	}
3665	tdb_io_init(tdb);
3666	tdb->fd = -1;
3667	tdb->name = NULL;
3668	tdb->map_ptr = NULL;
3669	tdb->flags = tdb_flags;
3670	tdb->open_flags = open_flags;
3671	if (log_ctx) {
3672		tdb->log = *log_ctx;
3673	} else {
3674		tdb->log.log_fn = null_log_fn;
3675		tdb->log.log_private = NULL;
3676	}
3677	tdb->hash_fn = hash_fn ? hash_fn : default_tdb_hash;
3678
3679	/* cache the page size */
3680	tdb->page_size = getpagesize();
3681	if (tdb->page_size <= 0) {
3682		tdb->page_size = 0x2000;
3683	}
3684
3685	if ((open_flags & O_ACCMODE) == O_WRONLY) {
3686		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: can't open tdb %s write-only\n",
3687			 name));
3688		errno = EINVAL;
3689		goto fail;
3690	}
3691
3692	if (hash_size == 0)
3693		hash_size = DEFAULT_HASH_SIZE;
3694	if ((open_flags & O_ACCMODE) == O_RDONLY) {
3695		tdb->read_only = 1;
3696		/* read only databases don't do locking or clear if first */
3697		tdb->flags |= TDB_NOLOCK;
3698		tdb->flags &= ~TDB_CLEAR_IF_FIRST;
3699	}
3700
3701	/* internal databases don't mmap or lock, and start off cleared */
3702	if (tdb->flags & TDB_INTERNAL) {
3703		tdb->flags |= (TDB_NOLOCK | TDB_NOMMAP);
3704		tdb->flags &= ~TDB_CLEAR_IF_FIRST;
3705		if (tdb_new_database(tdb, hash_size) != 0) {
3706			TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: tdb_new_database failed!"));
3707			goto fail;
3708		}
3709		goto internal;
3710	}
3711
3712	if ((tdb->fd = open(name, open_flags, mode)) == -1) {
3713		TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_open_ex: could not open file %s: %s\n",
3714			 name, strerror(errno)));
3715		goto fail;	/* errno set by open(2) */
3716	}
3717
3718	/* ensure there is only one process initialising at once */
3719	if (tdb->methods->tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
3720		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: failed to get global lock on %s: %s\n",
3721			 name, strerror(errno)));
3722		goto fail;	/* errno set by tdb_brlock */
3723	}
3724
3725	/* we need to zero database if we are the only one with it open */
3726	if ((tdb_flags & TDB_CLEAR_IF_FIRST) &&
3727	    (locked = (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_WRLCK, F_SETLK, 0, 1) == 0))) {
3728		open_flags |= O_CREAT;
3729		if (ftruncate(tdb->fd, 0) == -1) {
3730			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_open_ex: "
3731				 "failed to truncate %s: %s\n",
3732				 name, strerror(errno)));
3733			goto fail; /* errno set by ftruncate */
3734		}
3735	}
3736
3737	if (read(tdb->fd, &tdb->header, sizeof(tdb->header)) != sizeof(tdb->header)
3738	    || strcmp(tdb->header.magic_food, TDB_MAGIC_FOOD) != 0
3739	    || (tdb->header.version != TDB_VERSION
3740		&& !(rev = (tdb->header.version==TDB_BYTEREV(TDB_VERSION))))) {
3741		/* its not a valid database - possibly initialise it */
3742		if (!(open_flags & O_CREAT) || tdb_new_database(tdb, hash_size) == -1) {
3743			errno = EIO; /* ie bad format or something */
3744			goto fail;
3745		}
3746		rev = (tdb->flags & TDB_CONVERT);
3747	}
3748	vp = (unsigned char *)&tdb->header.version;
3749	vertest = (((u32)vp[0]) << 24) | (((u32)vp[1]) << 16) |
3750		  (((u32)vp[2]) << 8) | (u32)vp[3];
3751	tdb->flags |= (vertest==TDB_VERSION) ? TDB_BIGENDIAN : 0;
3752	if (!rev)
3753		tdb->flags &= ~TDB_CONVERT;
3754	else {
3755		tdb->flags |= TDB_CONVERT;
3756		tdb_convert(&tdb->header, sizeof(tdb->header));
3757	}
3758	if (fstat(tdb->fd, &st) == -1)
3759		goto fail;
3760
3761	if (tdb->header.rwlocks != 0) {
3762		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: spinlocks no longer supported\n"));
3763		goto fail;
3764	}
3765
3766	/* Is it already in the open list?  If so, fail. */
3767	if (tdb_already_open(st.st_dev, st.st_ino)) {
3768		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: "
3769			 "%s (%d,%d) is already open in this process\n",
3770			 name, (int)st.st_dev, (int)st.st_ino));
3771		errno = EBUSY;
3772		goto fail;
3773	}
3774
3775	if (!(tdb->name = (char *)strdup(name))) {
3776		errno = ENOMEM;
3777		goto fail;
3778	}
3779
3780	tdb->map_size = st.st_size;
3781	tdb->device = st.st_dev;
3782	tdb->inode = st.st_ino;
3783	tdb->max_dead_records = 0;
3784	tdb_mmap(tdb);
3785	if (locked) {
3786		if (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_UNLCK, F_SETLK, 0, 1) == -1) {
3787			TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: "
3788				 "failed to take ACTIVE_LOCK on %s: %s\n",
3789				 name, strerror(errno)));
3790			goto fail;
3791		}
3792
3793	}
3794
3795	/* We always need to do this if the CLEAR_IF_FIRST flag is set, even if
3796	   we didn't get the initial exclusive lock as we need to let all other
3797	   users know we're using it. */
3798
3799	if (tdb_flags & TDB_CLEAR_IF_FIRST) {
3800		/* leave this lock in place to indicate it's in use */
3801		if (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0, 1) == -1)
3802			goto fail;
3803	}
3804
3805	/* if needed, run recovery */
3806	if (tdb_transaction_recover(tdb) == -1) {
3807		goto fail;
3808	}
3809
3810 internal:
3811	/* Internal (memory-only) databases skip all the code above to
3812	 * do with disk files, and resume here by releasing their
3813	 * global lock and hooking into the active list. */
3814	if (tdb->methods->tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1) == -1)
3815		goto fail;
3816	tdb->next = tdbs;
3817	tdbs = tdb;
3818	return tdb;
3819
3820 fail:
3821	{ int save_errno = errno;
3822
3823	if (!tdb)
3824		return NULL;
3825
3826	if (tdb->map_ptr) {
3827		if (tdb->flags & TDB_INTERNAL)
3828			SAFE_FREE(tdb->map_ptr);
3829		else
3830			tdb_munmap(tdb);
3831	}
3832	SAFE_FREE(tdb->name);
3833	if (tdb->fd != -1)
3834		if (close(tdb->fd) != 0)
3835			TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: failed to close tdb->fd on error!\n"));
3836	SAFE_FREE(tdb);
3837	errno = save_errno;
3838	return NULL;
3839	}
3840}
3841
3842/*
3843 * Set the maximum number of dead records per hash chain
3844 */
3845
3846void tdb_set_max_dead(struct tdb_context *tdb, int max_dead)
3847{
3848	tdb->max_dead_records = max_dead;
3849}
3850
3851/**
3852 * Close a database.
3853 *
3854 * @returns -1 for error; 0 for success.
3855 **/
3856int tdb_close(struct tdb_context *tdb)
3857{
3858	struct tdb_context **i;
3859	int ret = 0;
3860
3861	if (tdb->transaction) {
3862		tdb_transaction_cancel(tdb);
3863	}
3864
3865	if (tdb->map_ptr) {
3866		if (tdb->flags & TDB_INTERNAL)
3867			SAFE_FREE(tdb->map_ptr);
3868		else
3869			tdb_munmap(tdb);
3870	}
3871	SAFE_FREE(tdb->name);
3872	if (tdb->fd != -1)
3873		ret = close(tdb->fd);
3874	SAFE_FREE(tdb->lockrecs);
3875
3876	/* Remove from contexts list */
3877	for (i = &tdbs; *i; i = &(*i)->next) {
3878		if (*i == tdb) {
3879			*i = tdb->next;
3880			break;
3881		}
3882	}
3883
3884	memset(tdb, 0, sizeof(*tdb));
3885	SAFE_FREE(tdb);
3886
3887	return ret;
3888}
3889
3890/* register a loging function */
3891void tdb_set_logging_function(struct tdb_context *tdb,
3892                              const struct tdb_logging_context *log_ctx)
3893{
3894        tdb->log = *log_ctx;
3895}
3896
3897void *tdb_get_logging_private(struct tdb_context *tdb)
3898{
3899	return tdb->log.log_private;
3900}
3901
3902/* reopen a tdb - this can be used after a fork to ensure that we have an independent
3903   seek pointer from our parent and to re-establish locks */
3904int tdb_reopen(struct tdb_context *tdb)
3905{
3906	struct stat st;
3907
3908	if (tdb->flags & TDB_INTERNAL) {
3909		return 0; /* Nothing to do. */
3910	}
3911
3912	if (tdb->num_locks != 0 || tdb->global_lock.count) {
3913		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_reopen: reopen not allowed with locks held\n"));
3914		goto fail;
3915	}
3916
3917	if (tdb->transaction != 0) {
3918		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_reopen: reopen not allowed inside a transaction\n"));
3919		goto fail;
3920	}
3921
3922	if (tdb_munmap(tdb) != 0) {
3923		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: munmap failed (%s)\n", strerror(errno)));
3924		goto fail;
3925	}
3926	if (close(tdb->fd) != 0)
3927		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: WARNING closing tdb->fd failed!\n"));
3928	tdb->fd = open(tdb->name, tdb->open_flags & ~(O_CREAT|O_TRUNC), 0);
3929	if (tdb->fd == -1) {
3930		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: open failed (%s)\n", strerror(errno)));
3931		goto fail;
3932	}
3933	if ((tdb->flags & TDB_CLEAR_IF_FIRST) &&
3934	    (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0, 1) == -1)) {
3935		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: failed to obtain active lock\n"));
3936		goto fail;
3937	}
3938	if (fstat(tdb->fd, &st) != 0) {
3939		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: fstat failed (%s)\n", strerror(errno)));
3940		goto fail;
3941	}
3942	if (st.st_ino != tdb->inode || st.st_dev != tdb->device) {
3943		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: file dev/inode has changed!\n"));
3944		goto fail;
3945	}
3946	tdb_mmap(tdb);
3947
3948	return 0;
3949
3950fail:
3951	tdb_close(tdb);
3952	return -1;
3953}
3954
3955/* reopen all tdb's */
3956int tdb_reopen_all(int parent_longlived)
3957{
3958	struct tdb_context *tdb;
3959
3960	for (tdb=tdbs; tdb; tdb = tdb->next) {
3961		/*
3962		 * If the parent is longlived (ie. a
3963		 * parent daemon architecture), we know
3964		 * it will keep it's active lock on a
3965		 * tdb opened with CLEAR_IF_FIRST. Thus
3966		 * for child processes we don't have to
3967		 * add an active lock. This is essential
3968		 * to improve performance on systems that
3969		 * keep POSIX locks as a non-scalable data
3970		 * structure in the kernel.
3971		 */
3972		if (parent_longlived) {
3973			/* Ensure no clear-if-first. */
3974			tdb->flags &= ~TDB_CLEAR_IF_FIRST;
3975		}
3976
3977		if (tdb_reopen(tdb) != 0)
3978			return -1;
3979	}
3980
3981	return 0;
3982}
3983