tdb.c revision 18a1444b4f1e6a0948fd38fa0de382d86cfe04de
1/*
2URL: svn://svnanon.samba.org/samba/branches/SAMBA_4_0/source/lib/tdb/common
3Rev: 23590
4Last Changed Date: 2007-06-22 13:36:10 -0400 (Fri, 22 Jun 2007)
5*/
6 /*
7   trivial database library - standalone version
8
9   Copyright (C) Andrew Tridgell              1999-2005
10   Copyright (C) Jeremy Allison               2000-2006
11   Copyright (C) Paul `Rusty' Russell         2000
12
13     ** NOTE! The following LGPL license applies to the tdb
14     ** library. This does NOT imply that all of Samba is released
15     ** under the LGPL
16
17   This library is free software; you can redistribute it and/or
18   modify it under the terms of the GNU Lesser General Public
19   License as published by the Free Software Foundation; either
20   version 2 of the License, or (at your option) any later version.
21
22   This library is distributed in the hope that it will be useful,
23   but WITHOUT ANY WARRANTY; without even the implied warranty of
24   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
25   Lesser General Public License for more details.
26
27   You should have received a copy of the GNU Lesser General Public
28   License along with this library; if not, write to the Free Software
29   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
30*/
31
32#ifdef CONFIG_STAND_ALONE
33#define HAVE_MMAP
34#define HAVE_STRDUP
35#define HAVE_SYS_MMAN_H
36#define HAVE_UTIME_H
37#define HAVE_UTIME
38#endif
39#define _XOPEN_SOURCE 600
40
41#include <unistd.h>
42#include <stdio.h>
43#include <stdlib.h>
44#include <stdarg.h>
45#include <stddef.h>
46#include <errno.h>
47#include <string.h>
48#ifdef HAVE_SYS_SELECT_H
49#include <sys/select.h>
50#endif
51#include <sys/time.h>
52#include <sys/types.h>
53#include <time.h>
54#ifdef HAVE_UTIME_H
55#include <utime.h>
56#endif
57#include <sys/stat.h>
58#include <sys/file.h>
59#include <fcntl.h>
60
61#ifdef HAVE_SYS_MMAN_H
62#include <sys/mman.h>
63#endif
64
65#ifndef MAP_FILE
66#define MAP_FILE 0
67#endif
68
69#ifndef MAP_FAILED
70#define MAP_FAILED ((void *)-1)
71#endif
72
73#ifndef HAVE_STRDUP
74#define strdup rep_strdup
75static char *rep_strdup(const char *s)
76{
77	char *ret;
78	int length;
79	if (!s)
80		return NULL;
81
82	if (!length)
83		length = strlen(s);
84
85	ret = malloc(length + 1);
86	if (ret) {
87		strncpy(ret, s, length);
88		ret[length] = '\0';
89	}
90	return ret;
91}
92#endif
93
94#ifndef PRINTF_ATTRIBUTE
95#if (__GNUC__ >= 3) && (__GNUC_MINOR__ >= 1 )
96/** Use gcc attribute to check printf fns.  a1 is the 1-based index of
97 * the parameter containing the format, and a2 the index of the first
98 * argument. Note that some gcc 2.x versions don't handle this
99 * properly **/
100#define PRINTF_ATTRIBUTE(a1, a2) __attribute__ ((format (__printf__, a1, a2)))
101#else
102#define PRINTF_ATTRIBUTE(a1, a2)
103#endif
104#endif
105
106typedef int bool;
107
108#include "tdb.h"
109
110static TDB_DATA tdb_null;
111
112#ifndef u32
113#define u32 unsigned
114#endif
115
116typedef u32 tdb_len_t;
117typedef u32 tdb_off_t;
118
119#ifndef offsetof
120#define offsetof(t,f) ((unsigned int)&((t *)0)->f)
121#endif
122
123#define TDB_MAGIC_FOOD "TDB file\n"
124#define TDB_VERSION (0x26011967 + 6)
125#define TDB_MAGIC (0x26011999U)
126#define TDB_FREE_MAGIC (~TDB_MAGIC)
127#define TDB_DEAD_MAGIC (0xFEE1DEAD)
128#define TDB_RECOVERY_MAGIC (0xf53bc0e7U)
129#define TDB_ALIGNMENT 4
130#define MIN_REC_SIZE (2*sizeof(struct list_struct) + TDB_ALIGNMENT)
131#define DEFAULT_HASH_SIZE 131
132#define FREELIST_TOP (sizeof(struct tdb_header))
133#define TDB_ALIGN(x,a) (((x) + (a)-1) & ~((a)-1))
134#define TDB_BYTEREV(x) (((((x)&0xff)<<24)|((x)&0xFF00)<<8)|(((x)>>8)&0xFF00)|((x)>>24))
135#define TDB_DEAD(r) ((r)->magic == TDB_DEAD_MAGIC)
136#define TDB_BAD_MAGIC(r) ((r)->magic != TDB_MAGIC && !TDB_DEAD(r))
137#define TDB_HASH_TOP(hash) (FREELIST_TOP + (BUCKET(hash)+1)*sizeof(tdb_off_t))
138#define TDB_HASHTABLE_SIZE(tdb) ((tdb->header.hash_size+1)*sizeof(tdb_off_t))
139#define TDB_DATA_START(hash_size) TDB_HASH_TOP(hash_size-1)
140#define TDB_RECOVERY_HEAD offsetof(struct tdb_header, recovery_start)
141#define TDB_SEQNUM_OFS    offsetof(struct tdb_header, sequence_number)
142#define TDB_PAD_BYTE 0x42
143#define TDB_PAD_U32  0x42424242
144
145/* NB assumes there is a local variable called "tdb" that is the
146 * current context, also takes doubly-parenthesized print-style
147 * argument. */
148#define TDB_LOG(x) tdb->log.log_fn x
149
150/* lock offsets */
151#define GLOBAL_LOCK      0
152#define ACTIVE_LOCK      4
153#define TRANSACTION_LOCK 8
154
155/* free memory if the pointer is valid and zero the pointer */
156#ifndef SAFE_FREE
157#define SAFE_FREE(x) do { if ((x) != NULL) {free(x); (x)=NULL;} } while(0)
158#endif
159
160#define BUCKET(hash) ((hash) % tdb->header.hash_size)
161
162#define DOCONV() (tdb->flags & TDB_CONVERT)
163#define CONVERT(x) (DOCONV() ? tdb_convert(&x, sizeof(x)) : &x)
164
165
166/* the body of the database is made of one list_struct for the free space
167   plus a separate data list for each hash value */
168struct list_struct {
169	tdb_off_t next; /* offset of the next record in the list */
170	tdb_len_t rec_len; /* total byte length of record */
171	tdb_len_t key_len; /* byte length of key */
172	tdb_len_t data_len; /* byte length of data */
173	u32 full_hash; /* the full 32 bit hash of the key */
174	u32 magic;   /* try to catch errors */
175	/* the following union is implied:
176		union {
177			char record[rec_len];
178			struct {
179				char key[key_len];
180				char data[data_len];
181			}
182			u32 totalsize; (tailer)
183		}
184	*/
185};
186
187
188/* this is stored at the front of every database */
189struct tdb_header {
190	char magic_food[32]; /* for /etc/magic */
191	u32 version; /* version of the code */
192	u32 hash_size; /* number of hash entries */
193	tdb_off_t rwlocks; /* obsolete - kept to detect old formats */
194	tdb_off_t recovery_start; /* offset of transaction recovery region */
195	tdb_off_t sequence_number; /* used when TDB_SEQNUM is set */
196	tdb_off_t reserved[29];
197};
198
199struct tdb_lock_type {
200	int list;
201	u32 count;
202	u32 ltype;
203};
204
205struct tdb_traverse_lock {
206	struct tdb_traverse_lock *next;
207	u32 off;
208	u32 hash;
209	int lock_rw;
210};
211
212
213struct tdb_methods {
214	int (*tdb_read)(struct tdb_context *, tdb_off_t , void *, tdb_len_t , int );
215	int (*tdb_write)(struct tdb_context *, tdb_off_t, const void *, tdb_len_t);
216	void (*next_hash_chain)(struct tdb_context *, u32 *);
217	int (*tdb_oob)(struct tdb_context *, tdb_off_t , int );
218	int (*tdb_expand_file)(struct tdb_context *, tdb_off_t , tdb_off_t );
219	int (*tdb_brlock)(struct tdb_context *, tdb_off_t , int, int, int, size_t);
220};
221
222struct tdb_context {
223	char *name; /* the name of the database */
224	void *map_ptr; /* where it is currently mapped */
225	int fd; /* open file descriptor for the database */
226	tdb_len_t map_size; /* how much space has been mapped */
227	int read_only; /* opened read-only */
228	int traverse_read; /* read-only traversal */
229	struct tdb_lock_type global_lock;
230	int num_lockrecs;
231	struct tdb_lock_type *lockrecs; /* only real locks, all with count>0 */
232	enum TDB_ERROR ecode; /* error code for last tdb error */
233	struct tdb_header header; /* a cached copy of the header */
234	u32 flags; /* the flags passed to tdb_open */
235	struct tdb_traverse_lock travlocks; /* current traversal locks */
236	struct tdb_context *next; /* all tdbs to avoid multiple opens */
237	dev_t device;	/* uniquely identifies this tdb */
238	ino_t inode;	/* uniquely identifies this tdb */
239	struct tdb_logging_context log;
240	unsigned int (*hash_fn)(TDB_DATA *key);
241	int open_flags; /* flags used in the open - needed by reopen */
242	unsigned int num_locks; /* number of chain locks held */
243	const struct tdb_methods *methods;
244	struct tdb_transaction *transaction;
245	int page_size;
246	int max_dead_records;
247	bool have_transaction_lock;
248};
249
250
251/*
252  internal prototypes
253*/
254static int tdb_munmap(struct tdb_context *tdb);
255static void tdb_mmap(struct tdb_context *tdb);
256static int tdb_lock(struct tdb_context *tdb, int list, int ltype);
257static int tdb_unlock(struct tdb_context *tdb, int list, int ltype);
258static int tdb_brlock(struct tdb_context *tdb, tdb_off_t offset, int rw_type, int lck_type, int probe, size_t len);
259static int tdb_transaction_lock(struct tdb_context *tdb, int ltype);
260static int tdb_transaction_unlock(struct tdb_context *tdb);
261static int tdb_brlock_upgrade(struct tdb_context *tdb, tdb_off_t offset, size_t len);
262static int tdb_write_lock_record(struct tdb_context *tdb, tdb_off_t off);
263static int tdb_write_unlock_record(struct tdb_context *tdb, tdb_off_t off);
264static int tdb_ofs_read(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
265static int tdb_ofs_write(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
266static void *tdb_convert(void *buf, u32 size);
267static int tdb_free(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec);
268static tdb_off_t tdb_allocate(struct tdb_context *tdb, tdb_len_t length, struct list_struct *rec);
269static int tdb_ofs_read(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
270static int tdb_ofs_write(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
271static int tdb_lock_record(struct tdb_context *tdb, tdb_off_t off);
272static int tdb_unlock_record(struct tdb_context *tdb, tdb_off_t off);
273static int tdb_rec_read(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec);
274static int tdb_rec_write(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec);
275static int tdb_do_delete(struct tdb_context *tdb, tdb_off_t rec_ptr, struct list_struct *rec);
276static unsigned char *tdb_alloc_read(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t len);
277static int tdb_parse_data(struct tdb_context *tdb, TDB_DATA key,
278		   tdb_off_t offset, tdb_len_t len,
279		   int (*parser)(TDB_DATA key, TDB_DATA data,
280				 void *private_data),
281		   void *private_data);
282static tdb_off_t tdb_find_lock_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash, int locktype,
283			   struct list_struct *rec);
284static void tdb_io_init(struct tdb_context *tdb);
285static int tdb_expand(struct tdb_context *tdb, tdb_off_t size);
286static int tdb_rec_free_read(struct tdb_context *tdb, tdb_off_t off,
287		      struct list_struct *rec);
288
289
290/* file: error.c */
291
292enum TDB_ERROR tdb_error(struct tdb_context *tdb)
293{
294	return tdb->ecode;
295}
296
297static struct tdb_errname {
298	enum TDB_ERROR ecode; const char *estring;
299} emap[] = { {TDB_SUCCESS, "Success"},
300	     {TDB_ERR_CORRUPT, "Corrupt database"},
301	     {TDB_ERR_IO, "IO Error"},
302	     {TDB_ERR_LOCK, "Locking error"},
303	     {TDB_ERR_OOM, "Out of memory"},
304	     {TDB_ERR_EXISTS, "Record exists"},
305	     {TDB_ERR_NOLOCK, "Lock exists on other keys"},
306	     {TDB_ERR_EINVAL, "Invalid parameter"},
307	     {TDB_ERR_NOEXIST, "Record does not exist"},
308	     {TDB_ERR_RDONLY, "write not permitted"} };
309
310/* Error string for the last tdb error */
311const char *tdb_errorstr(struct tdb_context *tdb)
312{
313	u32 i;
314	for (i = 0; i < sizeof(emap) / sizeof(struct tdb_errname); i++)
315		if (tdb->ecode == emap[i].ecode)
316			return emap[i].estring;
317	return "Invalid error code";
318}
319
320/* file: lock.c */
321
322#define TDB_MARK_LOCK 0x80000000
323
324/* a byte range locking function - return 0 on success
325   this functions locks/unlocks 1 byte at the specified offset.
326
327   On error, errno is also set so that errors are passed back properly
328   through tdb_open().
329
330   note that a len of zero means lock to end of file
331*/
332int tdb_brlock(struct tdb_context *tdb, tdb_off_t offset,
333	       int rw_type, int lck_type, int probe, size_t len)
334{
335	struct flock fl;
336	int ret;
337
338	if (tdb->flags & TDB_NOLOCK) {
339		return 0;
340	}
341
342	if ((rw_type == F_WRLCK) && (tdb->read_only || tdb->traverse_read)) {
343		tdb->ecode = TDB_ERR_RDONLY;
344		return -1;
345	}
346
347	fl.l_type = rw_type;
348	fl.l_whence = SEEK_SET;
349	fl.l_start = offset;
350	fl.l_len = len;
351	fl.l_pid = 0;
352
353	do {
354		ret = fcntl(tdb->fd,lck_type,&fl);
355	} while (ret == -1 && errno == EINTR);
356
357	if (ret == -1) {
358		/* Generic lock error. errno set by fcntl.
359		 * EAGAIN is an expected return from non-blocking
360		 * locks. */
361		if (!probe && lck_type != F_SETLK) {
362			/* Ensure error code is set for log fun to examine. */
363			tdb->ecode = TDB_ERR_LOCK;
364			TDB_LOG((tdb, TDB_DEBUG_TRACE,"tdb_brlock failed (fd=%d) at offset %d rw_type=%d lck_type=%d len=%d\n",
365				 tdb->fd, offset, rw_type, lck_type, (int)len));
366		}
367		return TDB_ERRCODE(TDB_ERR_LOCK, -1);
368	}
369	return 0;
370}
371
372
373/*
374  upgrade a read lock to a write lock. This needs to be handled in a
375  special way as some OSes (such as solaris) have too conservative
376  deadlock detection and claim a deadlock when progress can be
377  made. For those OSes we may loop for a while.
378*/
379int tdb_brlock_upgrade(struct tdb_context *tdb, tdb_off_t offset, size_t len)
380{
381	int count = 1000;
382	while (count--) {
383		struct timeval tv;
384		if (tdb_brlock(tdb, offset, F_WRLCK, F_SETLKW, 1, len) == 0) {
385			return 0;
386		}
387		if (errno != EDEADLK) {
388			break;
389		}
390		/* sleep for as short a time as we can - more portable than usleep() */
391		tv.tv_sec = 0;
392		tv.tv_usec = 1;
393		select(0, NULL, NULL, NULL, &tv);
394	}
395	TDB_LOG((tdb, TDB_DEBUG_TRACE,"tdb_brlock_upgrade failed at offset %d\n", offset));
396	return -1;
397}
398
399
400/* lock a list in the database. list -1 is the alloc list */
401static int _tdb_lock(struct tdb_context *tdb, int list, int ltype, int op)
402{
403	struct tdb_lock_type *new_lck;
404	int i;
405	bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
406
407	ltype &= ~TDB_MARK_LOCK;
408
409	/* a global lock allows us to avoid per chain locks */
410	if (tdb->global_lock.count &&
411	    (ltype == tdb->global_lock.ltype || ltype == F_RDLCK)) {
412		return 0;
413	}
414
415	if (tdb->global_lock.count) {
416		return TDB_ERRCODE(TDB_ERR_LOCK, -1);
417	}
418
419	if (list < -1 || list >= (int)tdb->header.hash_size) {
420		TDB_LOG((tdb, TDB_DEBUG_ERROR,"tdb_lock: invalid list %d for ltype=%d\n",
421			   list, ltype));
422		return -1;
423	}
424	if (tdb->flags & TDB_NOLOCK)
425		return 0;
426
427	for (i=0; i<tdb->num_lockrecs; i++) {
428		if (tdb->lockrecs[i].list == list) {
429			if (tdb->lockrecs[i].count == 0) {
430				/*
431				 * Can't happen, see tdb_unlock(). It should
432				 * be an assert.
433				 */
434				TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lock: "
435					 "lck->count == 0 for list %d", list));
436			}
437			/*
438			 * Just increment the in-memory struct, posix locks
439			 * don't stack.
440			 */
441			tdb->lockrecs[i].count++;
442			return 0;
443		}
444	}
445
446	new_lck = (struct tdb_lock_type *)realloc(
447		tdb->lockrecs,
448		sizeof(*tdb->lockrecs) * (tdb->num_lockrecs+1));
449	if (new_lck == NULL) {
450		errno = ENOMEM;
451		return -1;
452	}
453	tdb->lockrecs = new_lck;
454
455	/* Since fcntl locks don't nest, we do a lock for the first one,
456	   and simply bump the count for future ones */
457	if (!mark_lock &&
458	    tdb->methods->tdb_brlock(tdb,FREELIST_TOP+4*list, ltype, op,
459				     0, 1)) {
460		return -1;
461	}
462
463	tdb->num_locks++;
464
465	tdb->lockrecs[tdb->num_lockrecs].list = list;
466	tdb->lockrecs[tdb->num_lockrecs].count = 1;
467	tdb->lockrecs[tdb->num_lockrecs].ltype = ltype;
468	tdb->num_lockrecs += 1;
469
470	return 0;
471}
472
473/* lock a list in the database. list -1 is the alloc list */
474int tdb_lock(struct tdb_context *tdb, int list, int ltype)
475{
476	int ret;
477	ret = _tdb_lock(tdb, list, ltype, F_SETLKW);
478	if (ret) {
479		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lock failed on list %d "
480			 "ltype=%d (%s)\n",  list, ltype, strerror(errno)));
481	}
482	return ret;
483}
484
485/* lock a list in the database. list -1 is the alloc list. non-blocking lock */
486int tdb_lock_nonblock(struct tdb_context *tdb, int list, int ltype)
487{
488	return _tdb_lock(tdb, list, ltype, F_SETLK);
489}
490
491
492/* unlock the database: returns void because it's too late for errors. */
493	/* changed to return int it may be interesting to know there
494	   has been an error  --simo */
495int tdb_unlock(struct tdb_context *tdb, int list, int ltype)
496{
497	int ret = -1;
498	int i;
499	struct tdb_lock_type *lck = NULL;
500	bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
501
502	ltype &= ~TDB_MARK_LOCK;
503
504	/* a global lock allows us to avoid per chain locks */
505	if (tdb->global_lock.count &&
506	    (ltype == tdb->global_lock.ltype || ltype == F_RDLCK)) {
507		return 0;
508	}
509
510	if (tdb->global_lock.count) {
511		return TDB_ERRCODE(TDB_ERR_LOCK, -1);
512	}
513
514	if (tdb->flags & TDB_NOLOCK)
515		return 0;
516
517	/* Sanity checks */
518	if (list < -1 || list >= (int)tdb->header.hash_size) {
519		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: list %d invalid (%d)\n", list, tdb->header.hash_size));
520		return ret;
521	}
522
523	for (i=0; i<tdb->num_lockrecs; i++) {
524		if (tdb->lockrecs[i].list == list) {
525			lck = &tdb->lockrecs[i];
526			break;
527		}
528	}
529
530	if ((lck == NULL) || (lck->count == 0)) {
531		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: count is 0\n"));
532		return -1;
533	}
534
535	if (lck->count > 1) {
536		lck->count--;
537		return 0;
538	}
539
540	/*
541	 * This lock has count==1 left, so we need to unlock it in the
542	 * kernel. We don't bother with decrementing the in-memory array
543	 * element, we're about to overwrite it with the last array element
544	 * anyway.
545	 */
546
547	if (mark_lock) {
548		ret = 0;
549	} else {
550		ret = tdb->methods->tdb_brlock(tdb, FREELIST_TOP+4*list, F_UNLCK,
551					       F_SETLKW, 0, 1);
552	}
553	tdb->num_locks--;
554
555	/*
556	 * Shrink the array by overwriting the element just unlocked with the
557	 * last array element.
558	 */
559
560	if (tdb->num_lockrecs > 1) {
561		*lck = tdb->lockrecs[tdb->num_lockrecs-1];
562	}
563	tdb->num_lockrecs -= 1;
564
565	/*
566	 * We don't bother with realloc when the array shrinks, but if we have
567	 * a completely idle tdb we should get rid of the locked array.
568	 */
569
570	if (tdb->num_lockrecs == 0) {
571		SAFE_FREE(tdb->lockrecs);
572	}
573
574	if (ret)
575		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: An error occurred unlocking!\n"));
576	return ret;
577}
578
579/*
580  get the transaction lock
581 */
582int tdb_transaction_lock(struct tdb_context *tdb, int ltype)
583{
584	if (tdb->have_transaction_lock || tdb->global_lock.count) {
585		return 0;
586	}
587	if (tdb->methods->tdb_brlock(tdb, TRANSACTION_LOCK, ltype,
588				     F_SETLKW, 0, 1) == -1) {
589		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_lock: failed to get transaction lock\n"));
590		tdb->ecode = TDB_ERR_LOCK;
591		return -1;
592	}
593	tdb->have_transaction_lock = 1;
594	return 0;
595}
596
597/*
598  release the transaction lock
599 */
600int tdb_transaction_unlock(struct tdb_context *tdb)
601{
602	int ret;
603	if (!tdb->have_transaction_lock) {
604		return 0;
605	}
606	ret = tdb->methods->tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
607	if (ret == 0) {
608		tdb->have_transaction_lock = 0;
609	}
610	return ret;
611}
612
613
614
615
616/* lock/unlock entire database */
617static int _tdb_lockall(struct tdb_context *tdb, int ltype, int op)
618{
619	bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
620
621	ltype &= ~TDB_MARK_LOCK;
622
623	/* There are no locks on read-only dbs */
624	if (tdb->read_only || tdb->traverse_read)
625		return TDB_ERRCODE(TDB_ERR_LOCK, -1);
626
627	if (tdb->global_lock.count && tdb->global_lock.ltype == ltype) {
628		tdb->global_lock.count++;
629		return 0;
630	}
631
632	if (tdb->global_lock.count) {
633		/* a global lock of a different type exists */
634		return TDB_ERRCODE(TDB_ERR_LOCK, -1);
635	}
636
637	if (tdb->num_locks != 0) {
638		/* can't combine global and chain locks */
639		return TDB_ERRCODE(TDB_ERR_LOCK, -1);
640	}
641
642	if (!mark_lock &&
643	    tdb->methods->tdb_brlock(tdb, FREELIST_TOP, ltype, op,
644				     0, 4*tdb->header.hash_size)) {
645		if (op == F_SETLKW) {
646			TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lockall failed (%s)\n", strerror(errno)));
647		}
648		return -1;
649	}
650
651	tdb->global_lock.count = 1;
652	tdb->global_lock.ltype = ltype;
653
654	return 0;
655}
656
657
658
659/* unlock entire db */
660static int _tdb_unlockall(struct tdb_context *tdb, int ltype)
661{
662	bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
663
664	ltype &= ~TDB_MARK_LOCK;
665
666	/* There are no locks on read-only dbs */
667	if (tdb->read_only || tdb->traverse_read) {
668		return TDB_ERRCODE(TDB_ERR_LOCK, -1);
669	}
670
671	if (tdb->global_lock.ltype != ltype || tdb->global_lock.count == 0) {
672		return TDB_ERRCODE(TDB_ERR_LOCK, -1);
673	}
674
675	if (tdb->global_lock.count > 1) {
676		tdb->global_lock.count--;
677		return 0;
678	}
679
680	if (!mark_lock &&
681	    tdb->methods->tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW,
682				     0, 4*tdb->header.hash_size)) {
683		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlockall failed (%s)\n", strerror(errno)));
684		return -1;
685	}
686
687	tdb->global_lock.count = 0;
688	tdb->global_lock.ltype = 0;
689
690	return 0;
691}
692
693/* lock entire database with write lock */
694int tdb_lockall(struct tdb_context *tdb)
695{
696	return _tdb_lockall(tdb, F_WRLCK, F_SETLKW);
697}
698
699/* lock entire database with write lock - mark only */
700int tdb_lockall_mark(struct tdb_context *tdb)
701{
702	return _tdb_lockall(tdb, F_WRLCK | TDB_MARK_LOCK, F_SETLKW);
703}
704
705/* unlock entire database with write lock - unmark only */
706int tdb_lockall_unmark(struct tdb_context *tdb)
707{
708	return _tdb_unlockall(tdb, F_WRLCK | TDB_MARK_LOCK);
709}
710
711/* lock entire database with write lock - nonblocking varient */
712int tdb_lockall_nonblock(struct tdb_context *tdb)
713{
714	return _tdb_lockall(tdb, F_WRLCK, F_SETLK);
715}
716
717/* unlock entire database with write lock */
718int tdb_unlockall(struct tdb_context *tdb)
719{
720	return _tdb_unlockall(tdb, F_WRLCK);
721}
722
723/* lock entire database with read lock */
724int tdb_lockall_read(struct tdb_context *tdb)
725{
726	return _tdb_lockall(tdb, F_RDLCK, F_SETLKW);
727}
728
729/* lock entire database with read lock - nonblock varient */
730int tdb_lockall_read_nonblock(struct tdb_context *tdb)
731{
732	return _tdb_lockall(tdb, F_RDLCK, F_SETLK);
733}
734
735/* unlock entire database with read lock */
736int tdb_unlockall_read(struct tdb_context *tdb)
737{
738	return _tdb_unlockall(tdb, F_RDLCK);
739}
740
741/* lock/unlock one hash chain. This is meant to be used to reduce
742   contention - it cannot guarantee how many records will be locked */
743int tdb_chainlock(struct tdb_context *tdb, TDB_DATA key)
744{
745	return tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
746}
747
748/* lock/unlock one hash chain, non-blocking. This is meant to be used
749   to reduce contention - it cannot guarantee how many records will be
750   locked */
751int tdb_chainlock_nonblock(struct tdb_context *tdb, TDB_DATA key)
752{
753	return tdb_lock_nonblock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
754}
755
756/* mark a chain as locked without actually locking it. Warning! use with great caution! */
757int tdb_chainlock_mark(struct tdb_context *tdb, TDB_DATA key)
758{
759	return tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK | TDB_MARK_LOCK);
760}
761
762/* unmark a chain as locked without actually locking it. Warning! use with great caution! */
763int tdb_chainlock_unmark(struct tdb_context *tdb, TDB_DATA key)
764{
765	return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK | TDB_MARK_LOCK);
766}
767
768int tdb_chainunlock(struct tdb_context *tdb, TDB_DATA key)
769{
770	return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
771}
772
773int tdb_chainlock_read(struct tdb_context *tdb, TDB_DATA key)
774{
775	return tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_RDLCK);
776}
777
778int tdb_chainunlock_read(struct tdb_context *tdb, TDB_DATA key)
779{
780	return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_RDLCK);
781}
782
783
784
785/* record lock stops delete underneath */
786int tdb_lock_record(struct tdb_context *tdb, tdb_off_t off)
787{
788	return off ? tdb->methods->tdb_brlock(tdb, off, F_RDLCK, F_SETLKW, 0, 1) : 0;
789}
790
791/*
792  Write locks override our own fcntl readlocks, so check it here.
793  Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
794  an error to fail to get the lock here.
795*/
796int tdb_write_lock_record(struct tdb_context *tdb, tdb_off_t off)
797{
798	struct tdb_traverse_lock *i;
799	for (i = &tdb->travlocks; i; i = i->next)
800		if (i->off == off)
801			return -1;
802	return tdb->methods->tdb_brlock(tdb, off, F_WRLCK, F_SETLK, 1, 1);
803}
804
805/*
806  Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
807  an error to fail to get the lock here.
808*/
809int tdb_write_unlock_record(struct tdb_context *tdb, tdb_off_t off)
810{
811	return tdb->methods->tdb_brlock(tdb, off, F_UNLCK, F_SETLK, 0, 1);
812}
813
814/* fcntl locks don't stack: avoid unlocking someone else's */
815int tdb_unlock_record(struct tdb_context *tdb, tdb_off_t off)
816{
817	struct tdb_traverse_lock *i;
818	u32 count = 0;
819
820	if (off == 0)
821		return 0;
822	for (i = &tdb->travlocks; i; i = i->next)
823		if (i->off == off)
824			count++;
825	return (count == 1 ? tdb->methods->tdb_brlock(tdb, off, F_UNLCK, F_SETLKW, 0, 1) : 0);
826}
827
828/* file: io.c */
829
830/* check for an out of bounds access - if it is out of bounds then
831   see if the database has been expanded by someone else and expand
832   if necessary
833   note that "len" is the minimum length needed for the db
834*/
835static int tdb_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
836{
837	struct stat st;
838	if (len <= tdb->map_size)
839		return 0;
840	if (tdb->flags & TDB_INTERNAL) {
841		if (!probe) {
842			/* Ensure ecode is set for log fn. */
843			tdb->ecode = TDB_ERR_IO;
844			TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_oob len %d beyond internal malloc size %d\n",
845				 (int)len, (int)tdb->map_size));
846		}
847		return TDB_ERRCODE(TDB_ERR_IO, -1);
848	}
849
850	if (fstat(tdb->fd, &st) == -1) {
851		return TDB_ERRCODE(TDB_ERR_IO, -1);
852	}
853
854	if (st.st_size < (size_t)len) {
855		if (!probe) {
856			/* Ensure ecode is set for log fn. */
857			tdb->ecode = TDB_ERR_IO;
858			TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_oob len %d beyond eof at %d\n",
859				 (int)len, (int)st.st_size));
860		}
861		return TDB_ERRCODE(TDB_ERR_IO, -1);
862	}
863
864	/* Unmap, update size, remap */
865	if (tdb_munmap(tdb) == -1)
866		return TDB_ERRCODE(TDB_ERR_IO, -1);
867	tdb->map_size = st.st_size;
868	tdb_mmap(tdb);
869	return 0;
870}
871
872/* write a lump of data at a specified offset */
873static int tdb_write(struct tdb_context *tdb, tdb_off_t off,
874		     const void *buf, tdb_len_t len)
875{
876	if (len == 0) {
877		return 0;
878	}
879
880	if (tdb->read_only || tdb->traverse_read) {
881		tdb->ecode = TDB_ERR_RDONLY;
882		return -1;
883	}
884
885	if (tdb->methods->tdb_oob(tdb, off + len, 0) != 0)
886		return -1;
887
888	if (tdb->map_ptr) {
889		memcpy(off + (char *)tdb->map_ptr, buf, len);
890	} else if (pwrite(tdb->fd, buf, len, off) != (ssize_t)len) {
891		/* Ensure ecode is set for log fn. */
892		tdb->ecode = TDB_ERR_IO;
893		TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_write failed at %d len=%d (%s)\n",
894			   off, len, strerror(errno)));
895		return TDB_ERRCODE(TDB_ERR_IO, -1);
896	}
897	return 0;
898}
899
900/* Endian conversion: we only ever deal with 4 byte quantities */
901void *tdb_convert(void *buf, u32 size)
902{
903	u32 i, *p = (u32 *)buf;
904	for (i = 0; i < size / 4; i++)
905		p[i] = TDB_BYTEREV(p[i]);
906	return buf;
907}
908
909
910/* read a lump of data at a specified offset, maybe convert */
911static int tdb_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
912		    tdb_len_t len, int cv)
913{
914	if (tdb->methods->tdb_oob(tdb, off + len, 0) != 0) {
915		return -1;
916	}
917
918	if (tdb->map_ptr) {
919		memcpy(buf, off + (char *)tdb->map_ptr, len);
920	} else {
921		ssize_t ret = pread(tdb->fd, buf, len, off);
922		if (ret != (ssize_t)len) {
923			/* Ensure ecode is set for log fn. */
924			tdb->ecode = TDB_ERR_IO;
925			TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_read failed at %d "
926				 "len=%d ret=%d (%s) map_size=%d\n",
927				 (int)off, (int)len, (int)ret, strerror(errno),
928				 (int)tdb->map_size));
929			return TDB_ERRCODE(TDB_ERR_IO, -1);
930		}
931	}
932	if (cv) {
933		tdb_convert(buf, len);
934	}
935	return 0;
936}
937
938
939
940/*
941  do an unlocked scan of the hash table heads to find the next non-zero head. The value
942  will then be confirmed with the lock held
943*/
944static void tdb_next_hash_chain(struct tdb_context *tdb, u32 *chain)
945{
946	u32 h = *chain;
947	if (tdb->map_ptr) {
948		for (;h < tdb->header.hash_size;h++) {
949			if (0 != *(u32 *)(TDB_HASH_TOP(h) + (unsigned char *)tdb->map_ptr)) {
950				break;
951			}
952		}
953	} else {
954		u32 off=0;
955		for (;h < tdb->header.hash_size;h++) {
956			if (tdb_ofs_read(tdb, TDB_HASH_TOP(h), &off) != 0 || off != 0) {
957				break;
958			}
959		}
960	}
961	(*chain) = h;
962}
963
964
965int tdb_munmap(struct tdb_context *tdb)
966{
967	if (tdb->flags & TDB_INTERNAL)
968		return 0;
969
970#ifdef HAVE_MMAP
971	if (tdb->map_ptr) {
972		int ret = munmap(tdb->map_ptr, tdb->map_size);
973		if (ret != 0)
974			return ret;
975	}
976#endif
977	tdb->map_ptr = NULL;
978	return 0;
979}
980
981void tdb_mmap(struct tdb_context *tdb)
982{
983	if (tdb->flags & TDB_INTERNAL)
984		return;
985
986#ifdef HAVE_MMAP
987	if (!(tdb->flags & TDB_NOMMAP)) {
988		tdb->map_ptr = mmap(NULL, tdb->map_size,
989				    PROT_READ|(tdb->read_only? 0:PROT_WRITE),
990				    MAP_SHARED|MAP_FILE, tdb->fd, 0);
991
992		/*
993		 * NB. When mmap fails it returns MAP_FAILED *NOT* NULL !!!!
994		 */
995
996		if (tdb->map_ptr == MAP_FAILED) {
997			tdb->map_ptr = NULL;
998			TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_mmap failed for size %d (%s)\n",
999				 tdb->map_size, strerror(errno)));
1000		}
1001	} else {
1002		tdb->map_ptr = NULL;
1003	}
1004#else
1005	tdb->map_ptr = NULL;
1006#endif
1007}
1008
1009/* expand a file.  we prefer to use ftruncate, as that is what posix
1010  says to use for mmap expansion */
1011static int tdb_expand_file(struct tdb_context *tdb, tdb_off_t size, tdb_off_t addition)
1012{
1013	char buf[1024];
1014
1015	if (tdb->read_only || tdb->traverse_read) {
1016		tdb->ecode = TDB_ERR_RDONLY;
1017		return -1;
1018	}
1019
1020	if (ftruncate(tdb->fd, size+addition) == -1) {
1021		char b = 0;
1022		if (pwrite(tdb->fd,  &b, 1, (size+addition) - 1) != 1) {
1023			TDB_LOG((tdb, TDB_DEBUG_FATAL, "expand_file to %d failed (%s)\n",
1024				 size+addition, strerror(errno)));
1025			return -1;
1026		}
1027	}
1028
1029	/* now fill the file with something. This ensures that the
1030	   file isn't sparse, which would be very bad if we ran out of
1031	   disk. This must be done with write, not via mmap */
1032	memset(buf, TDB_PAD_BYTE, sizeof(buf));
1033	while (addition) {
1034		int n = addition>sizeof(buf)?sizeof(buf):addition;
1035		int ret = pwrite(tdb->fd, buf, n, size);
1036		if (ret != n) {
1037			TDB_LOG((tdb, TDB_DEBUG_FATAL, "expand_file write of %d failed (%s)\n",
1038				   n, strerror(errno)));
1039			return -1;
1040		}
1041		addition -= n;
1042		size += n;
1043	}
1044	return 0;
1045}
1046
1047
1048/* expand the database at least size bytes by expanding the underlying
1049   file and doing the mmap again if necessary */
1050int tdb_expand(struct tdb_context *tdb, tdb_off_t size)
1051{
1052	struct list_struct rec;
1053	tdb_off_t offset;
1054
1055	if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
1056		TDB_LOG((tdb, TDB_DEBUG_ERROR, "lock failed in tdb_expand\n"));
1057		return -1;
1058	}
1059
1060	/* must know about any previous expansions by another process */
1061	tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
1062
1063	/* always make room for at least 10 more records, and round
1064           the database up to a multiple of the page size */
1065	size = TDB_ALIGN(tdb->map_size + size*10, tdb->page_size) - tdb->map_size;
1066
1067	if (!(tdb->flags & TDB_INTERNAL))
1068		tdb_munmap(tdb);
1069
1070	/*
1071	 * We must ensure the file is unmapped before doing this
1072	 * to ensure consistency with systems like OpenBSD where
1073	 * writes and mmaps are not consistent.
1074	 */
1075
1076	/* expand the file itself */
1077	if (!(tdb->flags & TDB_INTERNAL)) {
1078		if (tdb->methods->tdb_expand_file(tdb, tdb->map_size, size) != 0)
1079			goto fail;
1080	}
1081
1082	tdb->map_size += size;
1083
1084	if (tdb->flags & TDB_INTERNAL) {
1085		char *new_map_ptr = (char *)realloc(tdb->map_ptr,
1086						    tdb->map_size);
1087		if (!new_map_ptr) {
1088			tdb->map_size -= size;
1089			goto fail;
1090		}
1091		tdb->map_ptr = new_map_ptr;
1092	} else {
1093		/*
1094		 * We must ensure the file is remapped before adding the space
1095		 * to ensure consistency with systems like OpenBSD where
1096		 * writes and mmaps are not consistent.
1097		 */
1098
1099		/* We're ok if the mmap fails as we'll fallback to read/write */
1100		tdb_mmap(tdb);
1101	}
1102
1103	/* form a new freelist record */
1104	memset(&rec,'\0',sizeof(rec));
1105	rec.rec_len = size - sizeof(rec);
1106
1107	/* link it into the free list */
1108	offset = tdb->map_size - size;
1109	if (tdb_free(tdb, offset, &rec) == -1)
1110		goto fail;
1111
1112	tdb_unlock(tdb, -1, F_WRLCK);
1113	return 0;
1114 fail:
1115	tdb_unlock(tdb, -1, F_WRLCK);
1116	return -1;
1117}
1118
1119/* read/write a tdb_off_t */
1120int tdb_ofs_read(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d)
1121{
1122	return tdb->methods->tdb_read(tdb, offset, (char*)d, sizeof(*d), DOCONV());
1123}
1124
1125int tdb_ofs_write(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d)
1126{
1127	tdb_off_t off = *d;
1128	return tdb->methods->tdb_write(tdb, offset, CONVERT(off), sizeof(*d));
1129}
1130
1131
1132/* read a lump of data, allocating the space for it */
1133unsigned char *tdb_alloc_read(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t len)
1134{
1135	unsigned char *buf;
1136
1137	/* some systems don't like zero length malloc */
1138	if (len == 0) {
1139		len = 1;
1140	}
1141
1142	if (!(buf = (unsigned char *)malloc(len))) {
1143		/* Ensure ecode is set for log fn. */
1144		tdb->ecode = TDB_ERR_OOM;
1145		TDB_LOG((tdb, TDB_DEBUG_ERROR,"tdb_alloc_read malloc failed len=%d (%s)\n",
1146			   len, strerror(errno)));
1147		return TDB_ERRCODE(TDB_ERR_OOM, buf);
1148	}
1149	if (tdb->methods->tdb_read(tdb, offset, buf, len, 0) == -1) {
1150		SAFE_FREE(buf);
1151		return NULL;
1152	}
1153	return buf;
1154}
1155
1156/* Give a piece of tdb data to a parser */
1157
1158int tdb_parse_data(struct tdb_context *tdb, TDB_DATA key,
1159		   tdb_off_t offset, tdb_len_t len,
1160		   int (*parser)(TDB_DATA key, TDB_DATA data,
1161				 void *private_data),
1162		   void *private_data)
1163{
1164	TDB_DATA data;
1165	int result;
1166
1167	data.dsize = len;
1168
1169	if ((tdb->transaction == NULL) && (tdb->map_ptr != NULL)) {
1170		/*
1171		 * Optimize by avoiding the malloc/memcpy/free, point the
1172		 * parser directly at the mmap area.
1173		 */
1174		if (tdb->methods->tdb_oob(tdb, offset+len, 0) != 0) {
1175			return -1;
1176		}
1177		data.dptr = offset + (unsigned char *)tdb->map_ptr;
1178		return parser(key, data, private_data);
1179	}
1180
1181	if (!(data.dptr = tdb_alloc_read(tdb, offset, len))) {
1182		return -1;
1183	}
1184
1185	result = parser(key, data, private_data);
1186	free(data.dptr);
1187	return result;
1188}
1189
1190/* read/write a record */
1191int tdb_rec_read(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec)
1192{
1193	if (tdb->methods->tdb_read(tdb, offset, rec, sizeof(*rec),DOCONV()) == -1)
1194		return -1;
1195	if (TDB_BAD_MAGIC(rec)) {
1196		/* Ensure ecode is set for log fn. */
1197		tdb->ecode = TDB_ERR_CORRUPT;
1198		TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_rec_read bad magic 0x%x at offset=%d\n", rec->magic, offset));
1199		return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
1200	}
1201	return tdb->methods->tdb_oob(tdb, rec->next+sizeof(*rec), 0);
1202}
1203
1204int tdb_rec_write(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec)
1205{
1206	struct list_struct r = *rec;
1207	return tdb->methods->tdb_write(tdb, offset, CONVERT(r), sizeof(r));
1208}
1209
1210static const struct tdb_methods io_methods = {
1211	tdb_read,
1212	tdb_write,
1213	tdb_next_hash_chain,
1214	tdb_oob,
1215	tdb_expand_file,
1216	tdb_brlock
1217};
1218
1219/*
1220  initialise the default methods table
1221*/
1222void tdb_io_init(struct tdb_context *tdb)
1223{
1224	tdb->methods = &io_methods;
1225}
1226
1227/* file: transaction.c */
1228
1229/*
1230  transaction design:
1231
1232  - only allow a single transaction at a time per database. This makes
1233    using the transaction API simpler, as otherwise the caller would
1234    have to cope with temporary failures in transactions that conflict
1235    with other current transactions
1236
1237  - keep the transaction recovery information in the same file as the
1238    database, using a special 'transaction recovery' record pointed at
1239    by the header. This removes the need for extra journal files as
1240    used by some other databases
1241
1242  - dynamically allocated the transaction recover record, re-using it
1243    for subsequent transactions. If a larger record is needed then
1244    tdb_free() the old record to place it on the normal tdb freelist
1245    before allocating the new record
1246
1247  - during transactions, keep a linked list of writes all that have
1248    been performed by intercepting all tdb_write() calls. The hooked
1249    transaction versions of tdb_read() and tdb_write() check this
1250    linked list and try to use the elements of the list in preference
1251    to the real database.
1252
1253  - don't allow any locks to be held when a transaction starts,
1254    otherwise we can end up with deadlock (plus lack of lock nesting
1255    in posix locks would mean the lock is lost)
1256
1257  - if the caller gains a lock during the transaction but doesn't
1258    release it then fail the commit
1259
1260  - allow for nested calls to tdb_transaction_start(), re-using the
1261    existing transaction record. If the inner transaction is cancelled
1262    then a subsequent commit will fail
1263
1264  - keep a mirrored copy of the tdb hash chain heads to allow for the
1265    fast hash heads scan on traverse, updating the mirrored copy in
1266    the transaction version of tdb_write
1267
1268  - allow callers to mix transaction and non-transaction use of tdb,
1269    although once a transaction is started then an exclusive lock is
1270    gained until the transaction is committed or cancelled
1271
1272  - the commit stategy involves first saving away all modified data
1273    into a linearised buffer in the transaction recovery area, then
1274    marking the transaction recovery area with a magic value to
1275    indicate a valid recovery record. In total 4 fsync/msync calls are
1276    needed per commit to prevent race conditions. It might be possible
1277    to reduce this to 3 or even 2 with some more work.
1278
1279  - check for a valid recovery record on open of the tdb, while the
1280    global lock is held. Automatically recover from the transaction
1281    recovery area if needed, then continue with the open as
1282    usual. This allows for smooth crash recovery with no administrator
1283    intervention.
1284
1285  - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
1286    still available, but no transaction recovery area is used and no
1287    fsync/msync calls are made.
1288
1289*/
1290
1291struct tdb_transaction_el {
1292	struct tdb_transaction_el *next, *prev;
1293	tdb_off_t offset;
1294	tdb_len_t length;
1295	unsigned char *data;
1296};
1297
1298/*
1299  hold the context of any current transaction
1300*/
1301struct tdb_transaction {
1302	/* we keep a mirrored copy of the tdb hash heads here so
1303	   tdb_next_hash_chain() can operate efficiently */
1304	u32 *hash_heads;
1305
1306	/* the original io methods - used to do IOs to the real db */
1307	const struct tdb_methods *io_methods;
1308
1309	/* the list of transaction elements. We use a doubly linked
1310	   list with a last pointer to allow us to keep the list
1311	   ordered, with first element at the front of the list. It
1312	   needs to be doubly linked as the read/write traversals need
1313	   to be backwards, while the commit needs to be forwards */
1314	struct tdb_transaction_el *elements, *elements_last;
1315
1316	/* non-zero when an internal transaction error has
1317	   occurred. All write operations will then fail until the
1318	   transaction is ended */
1319	int transaction_error;
1320
1321	/* when inside a transaction we need to keep track of any
1322	   nested tdb_transaction_start() calls, as these are allowed,
1323	   but don't create a new transaction */
1324	int nesting;
1325
1326	/* old file size before transaction */
1327	tdb_len_t old_map_size;
1328};
1329
1330
1331/*
1332  read while in a transaction. We need to check first if the data is in our list
1333  of transaction elements, then if not do a real read
1334*/
1335static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
1336			    tdb_len_t len, int cv)
1337{
1338	struct tdb_transaction_el *el;
1339
1340	/* we need to walk the list backwards to get the most recent data */
1341	for (el=tdb->transaction->elements_last;el;el=el->prev) {
1342		tdb_len_t partial;
1343
1344		if (off+len <= el->offset) {
1345			continue;
1346		}
1347		if (off >= el->offset + el->length) {
1348			continue;
1349		}
1350
1351		/* an overlapping read - needs to be split into up to
1352		   2 reads and a memcpy */
1353		if (off < el->offset) {
1354			partial = el->offset - off;
1355			if (transaction_read(tdb, off, buf, partial, cv) != 0) {
1356				goto fail;
1357			}
1358			len -= partial;
1359			off += partial;
1360			buf = (void *)(partial + (char *)buf);
1361		}
1362		if (off + len <= el->offset + el->length) {
1363			partial = len;
1364		} else {
1365			partial = el->offset + el->length - off;
1366		}
1367		memcpy(buf, el->data + (off - el->offset), partial);
1368		if (cv) {
1369			tdb_convert(buf, len);
1370		}
1371		len -= partial;
1372		off += partial;
1373		buf = (void *)(partial + (char *)buf);
1374
1375		if (len != 0 && transaction_read(tdb, off, buf, len, cv) != 0) {
1376			goto fail;
1377		}
1378
1379		return 0;
1380	}
1381
1382	/* its not in the transaction elements - do a real read */
1383	return tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv);
1384
1385fail:
1386	TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%d len=%d\n", off, len));
1387	tdb->ecode = TDB_ERR_IO;
1388	tdb->transaction->transaction_error = 1;
1389	return -1;
1390}
1391
1392
1393/*
1394  write while in a transaction
1395*/
1396static int transaction_write(struct tdb_context *tdb, tdb_off_t off,
1397			     const void *buf, tdb_len_t len)
1398{
1399	struct tdb_transaction_el *el, *best_el=NULL;
1400
1401	if (len == 0) {
1402		return 0;
1403	}
1404
1405	/* if the write is to a hash head, then update the transaction
1406	   hash heads */
1407	if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
1408	    off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
1409		u32 chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
1410		memcpy(&tdb->transaction->hash_heads[chain], buf, len);
1411	}
1412
1413	/* first see if we can replace an existing entry */
1414	for (el=tdb->transaction->elements_last;el;el=el->prev) {
1415		tdb_len_t partial;
1416
1417		if (best_el == NULL && off == el->offset+el->length) {
1418			best_el = el;
1419		}
1420
1421		if (off+len <= el->offset) {
1422			continue;
1423		}
1424		if (off >= el->offset + el->length) {
1425			continue;
1426		}
1427
1428		/* an overlapping write - needs to be split into up to
1429		   2 writes and a memcpy */
1430		if (off < el->offset) {
1431			partial = el->offset - off;
1432			if (transaction_write(tdb, off, buf, partial) != 0) {
1433				goto fail;
1434			}
1435			len -= partial;
1436			off += partial;
1437			buf = (const void *)(partial + (const char *)buf);
1438		}
1439		if (off + len <= el->offset + el->length) {
1440			partial = len;
1441		} else {
1442			partial = el->offset + el->length - off;
1443		}
1444		memcpy(el->data + (off - el->offset), buf, partial);
1445		len -= partial;
1446		off += partial;
1447		buf = (const void *)(partial + (const char *)buf);
1448
1449		if (len != 0 && transaction_write(tdb, off, buf, len) != 0) {
1450			goto fail;
1451		}
1452
1453		return 0;
1454	}
1455
1456	/* see if we can append the new entry to an existing entry */
1457	if (best_el && best_el->offset + best_el->length == off &&
1458	    (off+len < tdb->transaction->old_map_size ||
1459	     off > tdb->transaction->old_map_size)) {
1460		unsigned char *data = best_el->data;
1461		el = best_el;
1462		el->data = (unsigned char *)realloc(el->data,
1463						    el->length + len);
1464		if (el->data == NULL) {
1465			tdb->ecode = TDB_ERR_OOM;
1466			tdb->transaction->transaction_error = 1;
1467			el->data = data;
1468			return -1;
1469		}
1470		if (buf) {
1471			memcpy(el->data + el->length, buf, len);
1472		} else {
1473			memset(el->data + el->length, TDB_PAD_BYTE, len);
1474		}
1475		el->length += len;
1476		return 0;
1477	}
1478
1479	/* add a new entry at the end of the list */
1480	el = (struct tdb_transaction_el *)malloc(sizeof(*el));
1481	if (el == NULL) {
1482		tdb->ecode = TDB_ERR_OOM;
1483		tdb->transaction->transaction_error = 1;
1484		return -1;
1485	}
1486	el->next = NULL;
1487	el->prev = tdb->transaction->elements_last;
1488	el->offset = off;
1489	el->length = len;
1490	el->data = (unsigned char *)malloc(len);
1491	if (el->data == NULL) {
1492		free(el);
1493		tdb->ecode = TDB_ERR_OOM;
1494		tdb->transaction->transaction_error = 1;
1495		return -1;
1496	}
1497	if (buf) {
1498		memcpy(el->data, buf, len);
1499	} else {
1500		memset(el->data, TDB_PAD_BYTE, len);
1501	}
1502	if (el->prev) {
1503		el->prev->next = el;
1504	} else {
1505		tdb->transaction->elements = el;
1506	}
1507	tdb->transaction->elements_last = el;
1508	return 0;
1509
1510fail:
1511	TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n", off, len));
1512	tdb->ecode = TDB_ERR_IO;
1513	tdb->transaction->transaction_error = 1;
1514	return -1;
1515}
1516
1517/*
1518  accelerated hash chain head search, using the cached hash heads
1519*/
1520static void transaction_next_hash_chain(struct tdb_context *tdb, u32 *chain)
1521{
1522	u32 h = *chain;
1523	for (;h < tdb->header.hash_size;h++) {
1524		/* the +1 takes account of the freelist */
1525		if (0 != tdb->transaction->hash_heads[h+1]) {
1526			break;
1527		}
1528	}
1529	(*chain) = h;
1530}
1531
1532/*
1533  out of bounds check during a transaction
1534*/
1535static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
1536{
1537	if (len <= tdb->map_size) {
1538		return 0;
1539	}
1540	return TDB_ERRCODE(TDB_ERR_IO, -1);
1541}
1542
1543/*
1544  transaction version of tdb_expand().
1545*/
1546static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size,
1547				   tdb_off_t addition)
1548{
1549	/* add a write to the transaction elements, so subsequent
1550	   reads see the zero data */
1551	if (transaction_write(tdb, size, NULL, addition) != 0) {
1552		return -1;
1553	}
1554
1555	return 0;
1556}
1557
1558/*
1559  brlock during a transaction - ignore them
1560*/
1561static int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset,
1562			      int rw_type, int lck_type, int probe, size_t len)
1563{
1564	return 0;
1565}
1566
1567static const struct tdb_methods transaction_methods = {
1568	transaction_read,
1569	transaction_write,
1570	transaction_next_hash_chain,
1571	transaction_oob,
1572	transaction_expand_file,
1573	transaction_brlock
1574};
1575
1576
1577/*
1578  start a tdb transaction. No token is returned, as only a single
1579  transaction is allowed to be pending per tdb_context
1580*/
1581int tdb_transaction_start(struct tdb_context *tdb)
1582{
1583	/* some sanity checks */
1584	if (tdb->read_only || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) {
1585		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
1586		tdb->ecode = TDB_ERR_EINVAL;
1587		return -1;
1588	}
1589
1590	/* cope with nested tdb_transaction_start() calls */
1591	if (tdb->transaction != NULL) {
1592		tdb->transaction->nesting++;
1593		TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n",
1594			 tdb->transaction->nesting));
1595		return 0;
1596	}
1597
1598	if (tdb->num_locks != 0 || tdb->global_lock.count) {
1599		/* the caller must not have any locks when starting a
1600		   transaction as otherwise we'll be screwed by lack
1601		   of nested locks in posix */
1602		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n"));
1603		tdb->ecode = TDB_ERR_LOCK;
1604		return -1;
1605	}
1606
1607	if (tdb->travlocks.next != NULL) {
1608		/* you cannot use transactions inside a traverse (although you can use
1609		   traverse inside a transaction) as otherwise you can end up with
1610		   deadlock */
1611		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
1612		tdb->ecode = TDB_ERR_LOCK;
1613		return -1;
1614	}
1615
1616	tdb->transaction = (struct tdb_transaction *)
1617		calloc(sizeof(struct tdb_transaction), 1);
1618	if (tdb->transaction == NULL) {
1619		tdb->ecode = TDB_ERR_OOM;
1620		return -1;
1621	}
1622
1623	/* get the transaction write lock. This is a blocking lock. As
1624	   discussed with Volker, there are a number of ways we could
1625	   make this async, which we will probably do in the future */
1626	if (tdb_transaction_lock(tdb, F_WRLCK) == -1) {
1627		SAFE_FREE(tdb->transaction);
1628		return -1;
1629	}
1630
1631	/* get a read lock from the freelist to the end of file. This
1632	   is upgraded to a write lock during the commit */
1633	if (tdb_brlock(tdb, FREELIST_TOP, F_RDLCK, F_SETLKW, 0, 0) == -1) {
1634		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
1635		tdb->ecode = TDB_ERR_LOCK;
1636		goto fail;
1637	}
1638
1639	/* setup a copy of the hash table heads so the hash scan in
1640	   traverse can be fast */
1641	tdb->transaction->hash_heads = (u32 *)
1642		calloc(tdb->header.hash_size+1, sizeof(u32));
1643	if (tdb->transaction->hash_heads == NULL) {
1644		tdb->ecode = TDB_ERR_OOM;
1645		goto fail;
1646	}
1647	if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
1648				   TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
1649		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n"));
1650		tdb->ecode = TDB_ERR_IO;
1651		goto fail;
1652	}
1653
1654	/* make sure we know about any file expansions already done by
1655	   anyone else */
1656	tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
1657	tdb->transaction->old_map_size = tdb->map_size;
1658
1659	/* finally hook the io methods, replacing them with
1660	   transaction specific methods */
1661	tdb->transaction->io_methods = tdb->methods;
1662	tdb->methods = &transaction_methods;
1663
1664	/* by calling this transaction write here, we ensure that we don't grow the
1665	   transaction linked list due to hash table updates */
1666	if (transaction_write(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
1667			      TDB_HASHTABLE_SIZE(tdb)) != 0) {
1668		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to prime hash table\n"));
1669		tdb->ecode = TDB_ERR_IO;
1670		tdb->methods = tdb->transaction->io_methods;
1671		goto fail;
1672	}
1673
1674	return 0;
1675
1676fail:
1677	tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
1678	tdb_transaction_unlock(tdb);
1679	SAFE_FREE(tdb->transaction->hash_heads);
1680	SAFE_FREE(tdb->transaction);
1681	return -1;
1682}
1683
1684
1685/*
1686  cancel the current transaction
1687*/
1688int tdb_transaction_cancel(struct tdb_context *tdb)
1689{
1690	if (tdb->transaction == NULL) {
1691		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
1692		return -1;
1693	}
1694
1695	if (tdb->transaction->nesting != 0) {
1696		tdb->transaction->transaction_error = 1;
1697		tdb->transaction->nesting--;
1698		return 0;
1699	}
1700
1701	tdb->map_size = tdb->transaction->old_map_size;
1702
1703	/* free all the transaction elements */
1704	while (tdb->transaction->elements) {
1705		struct tdb_transaction_el *el = tdb->transaction->elements;
1706		tdb->transaction->elements = el->next;
1707		free(el->data);
1708		free(el);
1709	}
1710
1711	/* remove any global lock created during the transaction */
1712	if (tdb->global_lock.count != 0) {
1713		tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 4*tdb->header.hash_size);
1714		tdb->global_lock.count = 0;
1715	}
1716
1717	/* remove any locks created during the transaction */
1718	if (tdb->num_locks != 0) {
1719		int i;
1720		for (i=0;i<tdb->num_lockrecs;i++) {
1721			tdb_brlock(tdb,FREELIST_TOP+4*tdb->lockrecs[i].list,
1722				   F_UNLCK,F_SETLKW, 0, 1);
1723		}
1724		tdb->num_locks = 0;
1725		tdb->num_lockrecs = 0;
1726		SAFE_FREE(tdb->lockrecs);
1727	}
1728
1729	/* restore the normal io methods */
1730	tdb->methods = tdb->transaction->io_methods;
1731
1732	tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
1733	tdb_transaction_unlock(tdb);
1734	SAFE_FREE(tdb->transaction->hash_heads);
1735	SAFE_FREE(tdb->transaction);
1736
1737	return 0;
1738}
1739
1740/*
1741  sync to disk
1742*/
1743static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
1744{
1745	if (fsync(tdb->fd) != 0) {
1746		tdb->ecode = TDB_ERR_IO;
1747		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
1748		return -1;
1749	}
1750#if defined(HAVE_MSYNC) && defined(MS_SYNC)
1751	if (tdb->map_ptr) {
1752		tdb_off_t moffset = offset & ~(tdb->page_size-1);
1753		if (msync(moffset + (char *)tdb->map_ptr,
1754			  length + (offset - moffset), MS_SYNC) != 0) {
1755			tdb->ecode = TDB_ERR_IO;
1756			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
1757				 strerror(errno)));
1758			return -1;
1759		}
1760	}
1761#endif
1762	return 0;
1763}
1764
1765
1766/*
1767  work out how much space the linearised recovery data will consume
1768*/
1769static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
1770{
1771	struct tdb_transaction_el *el;
1772	tdb_len_t recovery_size = 0;
1773
1774	recovery_size = sizeof(u32);
1775	for (el=tdb->transaction->elements;el;el=el->next) {
1776		if (el->offset >= tdb->transaction->old_map_size) {
1777			continue;
1778		}
1779		recovery_size += 2*sizeof(tdb_off_t) + el->length;
1780	}
1781
1782	return recovery_size;
1783}
1784
1785/*
1786  allocate the recovery area, or use an existing recovery area if it is
1787  large enough
1788*/
1789static int tdb_recovery_allocate(struct tdb_context *tdb,
1790				 tdb_len_t *recovery_size,
1791				 tdb_off_t *recovery_offset,
1792				 tdb_len_t *recovery_max_size)
1793{
1794	struct list_struct rec;
1795	const struct tdb_methods *methods = tdb->transaction->io_methods;
1796	tdb_off_t recovery_head;
1797
1798	if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
1799		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
1800		return -1;
1801	}
1802
1803	rec.rec_len = 0;
1804
1805	if (recovery_head != 0 &&
1806	    methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
1807		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery record\n"));
1808		return -1;
1809	}
1810
1811	*recovery_size = tdb_recovery_size(tdb);
1812
1813	if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
1814		/* it fits in the existing area */
1815		*recovery_max_size = rec.rec_len;
1816		*recovery_offset = recovery_head;
1817		return 0;
1818	}
1819
1820	/* we need to free up the old recovery area, then allocate a
1821	   new one at the end of the file. Note that we cannot use
1822	   tdb_allocate() to allocate the new one as that might return
1823	   us an area that is being currently used (as of the start of
1824	   the transaction) */
1825	if (recovery_head != 0) {
1826		if (tdb_free(tdb, recovery_head, &rec) == -1) {
1827			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to free previous recovery area\n"));
1828			return -1;
1829		}
1830	}
1831
1832	/* the tdb_free() call might have increased the recovery size */
1833	*recovery_size = tdb_recovery_size(tdb);
1834
1835	/* round up to a multiple of page size */
1836	*recovery_max_size = TDB_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
1837	*recovery_offset = tdb->map_size;
1838	recovery_head = *recovery_offset;
1839
1840	if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
1841				     (tdb->map_size - tdb->transaction->old_map_size) +
1842				     sizeof(rec) + *recovery_max_size) == -1) {
1843		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
1844		return -1;
1845	}
1846
1847	/* remap the file (if using mmap) */
1848	methods->tdb_oob(tdb, tdb->map_size + 1, 1);
1849
1850	/* we have to reset the old map size so that we don't try to expand the file
1851	   again in the transaction commit, which would destroy the recovery area */
1852	tdb->transaction->old_map_size = tdb->map_size;
1853
1854	/* write the recovery header offset and sync - we can sync without a race here
1855	   as the magic ptr in the recovery record has not been set */
1856	CONVERT(recovery_head);
1857	if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD,
1858			       &recovery_head, sizeof(tdb_off_t)) == -1) {
1859		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
1860		return -1;
1861	}
1862
1863	return 0;
1864}
1865
1866
1867/*
1868  setup the recovery data that will be used on a crash during commit
1869*/
1870static int transaction_setup_recovery(struct tdb_context *tdb,
1871				      tdb_off_t *magic_offset)
1872{
1873	struct tdb_transaction_el *el;
1874	tdb_len_t recovery_size;
1875	unsigned char *data, *p;
1876	const struct tdb_methods *methods = tdb->transaction->io_methods;
1877	struct list_struct *rec;
1878	tdb_off_t recovery_offset, recovery_max_size;
1879	tdb_off_t old_map_size = tdb->transaction->old_map_size;
1880	u32 magic, tailer;
1881
1882	/*
1883	  check that the recovery area has enough space
1884	*/
1885	if (tdb_recovery_allocate(tdb, &recovery_size,
1886				  &recovery_offset, &recovery_max_size) == -1) {
1887		return -1;
1888	}
1889
1890	data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
1891	if (data == NULL) {
1892		tdb->ecode = TDB_ERR_OOM;
1893		return -1;
1894	}
1895
1896	rec = (struct list_struct *)data;
1897	memset(rec, 0, sizeof(*rec));
1898
1899	rec->magic    = 0;
1900	rec->data_len = recovery_size;
1901	rec->rec_len  = recovery_max_size;
1902	rec->key_len  = old_map_size;
1903	CONVERT(rec);
1904
1905	/* build the recovery data into a single blob to allow us to do a single
1906	   large write, which should be more efficient */
1907	p = data + sizeof(*rec);
1908	for (el=tdb->transaction->elements;el;el=el->next) {
1909		if (el->offset >= old_map_size) {
1910			continue;
1911		}
1912		if (el->offset + el->length > tdb->transaction->old_map_size) {
1913			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
1914			free(data);
1915			tdb->ecode = TDB_ERR_CORRUPT;
1916			return -1;
1917		}
1918		memcpy(p, &el->offset, 4);
1919		memcpy(p+4, &el->length, 4);
1920		if (DOCONV()) {
1921			tdb_convert(p, 8);
1922		}
1923		/* the recovery area contains the old data, not the
1924		   new data, so we have to call the original tdb_read
1925		   method to get it */
1926		if (methods->tdb_read(tdb, el->offset, p + 8, el->length, 0) != 0) {
1927			free(data);
1928			tdb->ecode = TDB_ERR_IO;
1929			return -1;
1930		}
1931		p += 8 + el->length;
1932	}
1933
1934	/* and the tailer */
1935	tailer = sizeof(*rec) + recovery_max_size;
1936	memcpy(p, &tailer, 4);
1937	CONVERT(p);
1938
1939	/* write the recovery data to the recovery area */
1940	if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
1941		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
1942		free(data);
1943		tdb->ecode = TDB_ERR_IO;
1944		return -1;
1945	}
1946
1947	/* as we don't have ordered writes, we have to sync the recovery
1948	   data before we update the magic to indicate that the recovery
1949	   data is present */
1950	if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
1951		free(data);
1952		return -1;
1953	}
1954
1955	free(data);
1956
1957	magic = TDB_RECOVERY_MAGIC;
1958	CONVERT(magic);
1959
1960	*magic_offset = recovery_offset + offsetof(struct list_struct, magic);
1961
1962	if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
1963		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
1964		tdb->ecode = TDB_ERR_IO;
1965		return -1;
1966	}
1967
1968	/* ensure the recovery magic marker is on disk */
1969	if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
1970		return -1;
1971	}
1972
1973	return 0;
1974}
1975
1976/*
1977  commit the current transaction
1978*/
1979int tdb_transaction_commit(struct tdb_context *tdb)
1980{
1981	const struct tdb_methods *methods;
1982	tdb_off_t magic_offset = 0;
1983	u32 zero = 0;
1984
1985	if (tdb->transaction == NULL) {
1986		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
1987		return -1;
1988	}
1989
1990	if (tdb->transaction->transaction_error) {
1991		tdb->ecode = TDB_ERR_IO;
1992		tdb_transaction_cancel(tdb);
1993		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
1994		return -1;
1995	}
1996
1997	if (tdb->transaction->nesting != 0) {
1998		tdb->transaction->nesting--;
1999		return 0;
2000	}
2001
2002	/* check for a null transaction */
2003	if (tdb->transaction->elements == NULL) {
2004		tdb_transaction_cancel(tdb);
2005		return 0;
2006	}
2007
2008	methods = tdb->transaction->io_methods;
2009
2010	/* if there are any locks pending then the caller has not
2011	   nested their locks properly, so fail the transaction */
2012	if (tdb->num_locks || tdb->global_lock.count) {
2013		tdb->ecode = TDB_ERR_LOCK;
2014		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: locks pending on commit\n"));
2015		tdb_transaction_cancel(tdb);
2016		return -1;
2017	}
2018
2019	/* upgrade the main transaction lock region to a write lock */
2020	if (tdb_brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) {
2021		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to upgrade hash locks\n"));
2022		tdb->ecode = TDB_ERR_LOCK;
2023		tdb_transaction_cancel(tdb);
2024		return -1;
2025	}
2026
2027	/* get the global lock - this prevents new users attaching to the database
2028	   during the commit */
2029	if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
2030		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: failed to get global lock\n"));
2031		tdb->ecode = TDB_ERR_LOCK;
2032		tdb_transaction_cancel(tdb);
2033		return -1;
2034	}
2035
2036	if (!(tdb->flags & TDB_NOSYNC)) {
2037		/* write the recovery data to the end of the file */
2038		if (transaction_setup_recovery(tdb, &magic_offset) == -1) {
2039			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to setup recovery data\n"));
2040			tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
2041			tdb_transaction_cancel(tdb);
2042			return -1;
2043		}
2044	}
2045
2046	/* expand the file to the new size if needed */
2047	if (tdb->map_size != tdb->transaction->old_map_size) {
2048		if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
2049					     tdb->map_size -
2050					     tdb->transaction->old_map_size) == -1) {
2051			tdb->ecode = TDB_ERR_IO;
2052			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: expansion failed\n"));
2053			tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
2054			tdb_transaction_cancel(tdb);
2055			return -1;
2056		}
2057		tdb->map_size = tdb->transaction->old_map_size;
2058		methods->tdb_oob(tdb, tdb->map_size + 1, 1);
2059	}
2060
2061	/* perform all the writes */
2062	while (tdb->transaction->elements) {
2063		struct tdb_transaction_el *el = tdb->transaction->elements;
2064
2065		if (methods->tdb_write(tdb, el->offset, el->data, el->length) == -1) {
2066			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
2067
2068			/* we've overwritten part of the data and
2069			   possibly expanded the file, so we need to
2070			   run the crash recovery code */
2071			tdb->methods = methods;
2072			tdb_transaction_recover(tdb);
2073
2074			tdb_transaction_cancel(tdb);
2075			tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
2076
2077			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
2078			return -1;
2079		}
2080		tdb->transaction->elements = el->next;
2081		free(el->data);
2082		free(el);
2083	}
2084
2085	if (!(tdb->flags & TDB_NOSYNC)) {
2086		/* ensure the new data is on disk */
2087		if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
2088			return -1;
2089		}
2090
2091		/* remove the recovery marker */
2092		if (methods->tdb_write(tdb, magic_offset, &zero, 4) == -1) {
2093			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to remove recovery magic\n"));
2094			return -1;
2095		}
2096
2097		/* ensure the recovery marker has been removed on disk */
2098		if (transaction_sync(tdb, magic_offset, 4) == -1) {
2099			return -1;
2100		}
2101	}
2102
2103	tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
2104
2105	/*
2106	  TODO: maybe write to some dummy hdr field, or write to magic
2107	  offset without mmap, before the last sync, instead of the
2108	  utime() call
2109	*/
2110
2111	/* on some systems (like Linux 2.6.x) changes via mmap/msync
2112	   don't change the mtime of the file, this means the file may
2113	   not be backed up (as tdb rounding to block sizes means that
2114	   file size changes are quite rare too). The following forces
2115	   mtime changes when a transaction completes */
2116#ifdef HAVE_UTIME
2117	utime(tdb->name, NULL);
2118#endif
2119
2120	/* use a transaction cancel to free memory and remove the
2121	   transaction locks */
2122	tdb_transaction_cancel(tdb);
2123	return 0;
2124}
2125
2126
2127/*
2128  recover from an aborted transaction. Must be called with exclusive
2129  database write access already established (including the global
2130  lock to prevent new processes attaching)
2131*/
2132int tdb_transaction_recover(struct tdb_context *tdb)
2133{
2134	tdb_off_t recovery_head, recovery_eof;
2135	unsigned char *data, *p;
2136	u32 zero = 0;
2137	struct list_struct rec;
2138
2139	/* find the recovery area */
2140	if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
2141		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery head\n"));
2142		tdb->ecode = TDB_ERR_IO;
2143		return -1;
2144	}
2145
2146	if (recovery_head == 0) {
2147		/* we have never allocated a recovery record */
2148		return 0;
2149	}
2150
2151	/* read the recovery record */
2152	if (tdb->methods->tdb_read(tdb, recovery_head, &rec,
2153				   sizeof(rec), DOCONV()) == -1) {
2154		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));
2155		tdb->ecode = TDB_ERR_IO;
2156		return -1;
2157	}
2158
2159	if (rec.magic != TDB_RECOVERY_MAGIC) {
2160		/* there is no valid recovery data */
2161		return 0;
2162	}
2163
2164	if (tdb->read_only) {
2165		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: attempt to recover read only database\n"));
2166		tdb->ecode = TDB_ERR_CORRUPT;
2167		return -1;
2168	}
2169
2170	recovery_eof = rec.key_len;
2171
2172	data = (unsigned char *)malloc(rec.data_len);
2173	if (data == NULL) {
2174		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));
2175		tdb->ecode = TDB_ERR_OOM;
2176		return -1;
2177	}
2178
2179	/* read the full recovery data */
2180	if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
2181				   rec.data_len, 0) == -1) {
2182		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));
2183		tdb->ecode = TDB_ERR_IO;
2184		return -1;
2185	}
2186
2187	/* recover the file data */
2188	p = data;
2189	while (p+8 < data + rec.data_len) {
2190		u32 ofs, len;
2191		if (DOCONV()) {
2192			tdb_convert(p, 8);
2193		}
2194		memcpy(&ofs, p, 4);
2195		memcpy(&len, p+4, 4);
2196
2197		if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
2198			free(data);
2199			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len, ofs));
2200			tdb->ecode = TDB_ERR_IO;
2201			return -1;
2202		}
2203		p += 8 + len;
2204	}
2205
2206	free(data);
2207
2208	if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
2209		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync recovery\n"));
2210		tdb->ecode = TDB_ERR_IO;
2211		return -1;
2212	}
2213
2214	/* if the recovery area is after the recovered eof then remove it */
2215	if (recovery_eof <= recovery_head) {
2216		if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
2217			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
2218			tdb->ecode = TDB_ERR_IO;
2219			return -1;
2220		}
2221	}
2222
2223	/* remove the recovery magic */
2224	if (tdb_ofs_write(tdb, recovery_head + offsetof(struct list_struct, magic),
2225			  &zero) == -1) {
2226		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
2227		tdb->ecode = TDB_ERR_IO;
2228		return -1;
2229	}
2230
2231	/* reduce the file size to the old size */
2232	tdb_munmap(tdb);
2233	if (ftruncate(tdb->fd, recovery_eof) != 0) {
2234		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to reduce to recovery size\n"));
2235		tdb->ecode = TDB_ERR_IO;
2236		return -1;
2237	}
2238	tdb->map_size = recovery_eof;
2239	tdb_mmap(tdb);
2240
2241	if (transaction_sync(tdb, 0, recovery_eof) == -1) {
2242		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
2243		tdb->ecode = TDB_ERR_IO;
2244		return -1;
2245	}
2246
2247	TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %d byte database\n",
2248		 recovery_eof));
2249
2250	/* all done */
2251	return 0;
2252}
2253
2254/* file: freelist.c */
2255
2256/* read a freelist record and check for simple errors */
2257static int tdb_rec_free_read(struct tdb_context *tdb, tdb_off_t off, struct list_struct *rec)
2258{
2259	if (tdb->methods->tdb_read(tdb, off, rec, sizeof(*rec),DOCONV()) == -1)
2260		return -1;
2261
2262	if (rec->magic == TDB_MAGIC) {
2263		/* this happens when a app is showdown while deleting a record - we should
2264		   not completely fail when this happens */
2265		TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_rec_free_read non-free magic 0x%x at offset=%d - fixing\n",
2266			 rec->magic, off));
2267		rec->magic = TDB_FREE_MAGIC;
2268		if (tdb->methods->tdb_write(tdb, off, rec, sizeof(*rec)) == -1)
2269			return -1;
2270	}
2271
2272	if (rec->magic != TDB_FREE_MAGIC) {
2273		/* Ensure ecode is set for log fn. */
2274		tdb->ecode = TDB_ERR_CORRUPT;
2275		TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_rec_free_read bad magic 0x%x at offset=%d\n",
2276			   rec->magic, off));
2277		return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
2278	}
2279	if (tdb->methods->tdb_oob(tdb, rec->next+sizeof(*rec), 0) != 0)
2280		return -1;
2281	return 0;
2282}
2283
2284
2285
2286/* Remove an element from the freelist.  Must have alloc lock. */
2287static int remove_from_freelist(struct tdb_context *tdb, tdb_off_t off, tdb_off_t next)
2288{
2289	tdb_off_t last_ptr, i;
2290
2291	/* read in the freelist top */
2292	last_ptr = FREELIST_TOP;
2293	while (tdb_ofs_read(tdb, last_ptr, &i) != -1 && i != 0) {
2294		if (i == off) {
2295			/* We've found it! */
2296			return tdb_ofs_write(tdb, last_ptr, &next);
2297		}
2298		/* Follow chain (next offset is at start of record) */
2299		last_ptr = i;
2300	}
2301	TDB_LOG((tdb, TDB_DEBUG_FATAL,"remove_from_freelist: not on list at off=%d\n", off));
2302	return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
2303}
2304
2305
2306/* update a record tailer (must hold allocation lock) */
2307static int update_tailer(struct tdb_context *tdb, tdb_off_t offset,
2308			 const struct list_struct *rec)
2309{
2310	tdb_off_t totalsize;
2311
2312	/* Offset of tailer from record header */
2313	totalsize = sizeof(*rec) + rec->rec_len;
2314	return tdb_ofs_write(tdb, offset + totalsize - sizeof(tdb_off_t),
2315			 &totalsize);
2316}
2317
2318/* Add an element into the freelist. Merge adjacent records if
2319   neccessary. */
2320int tdb_free(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec)
2321{
2322	tdb_off_t right, left;
2323
2324	/* Allocation and tailer lock */
2325	if (tdb_lock(tdb, -1, F_WRLCK) != 0)
2326		return -1;
2327
2328	/* set an initial tailer, so if we fail we don't leave a bogus record */
2329	if (update_tailer(tdb, offset, rec) != 0) {
2330		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: update_tailer failed!\n"));
2331		goto fail;
2332	}
2333
2334	/* Look right first (I'm an Australian, dammit) */
2335	right = offset + sizeof(*rec) + rec->rec_len;
2336	if (right + sizeof(*rec) <= tdb->map_size) {
2337		struct list_struct r;
2338
2339		if (tdb->methods->tdb_read(tdb, right, &r, sizeof(r), DOCONV()) == -1) {
2340			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: right read failed at %u\n", right));
2341			goto left;
2342		}
2343
2344		/* If it's free, expand to include it. */
2345		if (r.magic == TDB_FREE_MAGIC) {
2346			if (remove_from_freelist(tdb, right, r.next) == -1) {
2347				TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: right free failed at %u\n", right));
2348				goto left;
2349			}
2350			rec->rec_len += sizeof(r) + r.rec_len;
2351		}
2352	}
2353
2354left:
2355	/* Look left */
2356	left = offset - sizeof(tdb_off_t);
2357	if (left > TDB_DATA_START(tdb->header.hash_size)) {
2358		struct list_struct l;
2359		tdb_off_t leftsize;
2360
2361		/* Read in tailer and jump back to header */
2362		if (tdb_ofs_read(tdb, left, &leftsize) == -1) {
2363			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: left offset read failed at %u\n", left));
2364			goto update;
2365		}
2366
2367		/* it could be uninitialised data */
2368		if (leftsize == 0 || leftsize == TDB_PAD_U32) {
2369			goto update;
2370		}
2371
2372		left = offset - leftsize;
2373
2374		/* Now read in record */
2375		if (tdb->methods->tdb_read(tdb, left, &l, sizeof(l), DOCONV()) == -1) {
2376			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: left read failed at %u (%u)\n", left, leftsize));
2377			goto update;
2378		}
2379
2380		/* If it's free, expand to include it. */
2381		if (l.magic == TDB_FREE_MAGIC) {
2382			if (remove_from_freelist(tdb, left, l.next) == -1) {
2383				TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: left free failed at %u\n", left));
2384				goto update;
2385			} else {
2386				offset = left;
2387				rec->rec_len += leftsize;
2388			}
2389		}
2390	}
2391
2392update:
2393	if (update_tailer(tdb, offset, rec) == -1) {
2394		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: update_tailer failed at %u\n", offset));
2395		goto fail;
2396	}
2397
2398	/* Now, prepend to free list */
2399	rec->magic = TDB_FREE_MAGIC;
2400
2401	if (tdb_ofs_read(tdb, FREELIST_TOP, &rec->next) == -1 ||
2402	    tdb_rec_write(tdb, offset, rec) == -1 ||
2403	    tdb_ofs_write(tdb, FREELIST_TOP, &offset) == -1) {
2404		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free record write failed at offset=%d\n", offset));
2405		goto fail;
2406	}
2407
2408	/* And we're done. */
2409	tdb_unlock(tdb, -1, F_WRLCK);
2410	return 0;
2411
2412 fail:
2413	tdb_unlock(tdb, -1, F_WRLCK);
2414	return -1;
2415}
2416
2417
2418/*
2419   the core of tdb_allocate - called when we have decided which
2420   free list entry to use
2421 */
2422static tdb_off_t tdb_allocate_ofs(struct tdb_context *tdb, tdb_len_t length, tdb_off_t rec_ptr,
2423				struct list_struct *rec, tdb_off_t last_ptr)
2424{
2425	struct list_struct newrec;
2426	tdb_off_t newrec_ptr;
2427
2428	memset(&newrec, '\0', sizeof(newrec));
2429
2430	/* found it - now possibly split it up  */
2431	if (rec->rec_len > length + MIN_REC_SIZE) {
2432		/* Length of left piece */
2433		length = TDB_ALIGN(length, TDB_ALIGNMENT);
2434
2435		/* Right piece to go on free list */
2436		newrec.rec_len = rec->rec_len - (sizeof(*rec) + length);
2437		newrec_ptr = rec_ptr + sizeof(*rec) + length;
2438
2439		/* And left record is shortened */
2440		rec->rec_len = length;
2441	} else {
2442		newrec_ptr = 0;
2443	}
2444
2445	/* Remove allocated record from the free list */
2446	if (tdb_ofs_write(tdb, last_ptr, &rec->next) == -1) {
2447		return 0;
2448	}
2449
2450	/* Update header: do this before we drop alloc
2451	   lock, otherwise tdb_free() might try to
2452	   merge with us, thinking we're free.
2453	   (Thanks Jeremy Allison). */
2454	rec->magic = TDB_MAGIC;
2455	if (tdb_rec_write(tdb, rec_ptr, rec) == -1) {
2456		return 0;
2457	}
2458
2459	/* Did we create new block? */
2460	if (newrec_ptr) {
2461		/* Update allocated record tailer (we
2462		   shortened it). */
2463		if (update_tailer(tdb, rec_ptr, rec) == -1) {
2464			return 0;
2465		}
2466
2467		/* Free new record */
2468		if (tdb_free(tdb, newrec_ptr, &newrec) == -1) {
2469			return 0;
2470		}
2471	}
2472
2473	/* all done - return the new record offset */
2474	return rec_ptr;
2475}
2476
2477/* allocate some space from the free list. The offset returned points
2478   to a unconnected list_struct within the database with room for at
2479   least length bytes of total data
2480
2481   0 is returned if the space could not be allocated
2482 */
2483tdb_off_t tdb_allocate(struct tdb_context *tdb, tdb_len_t length, struct list_struct *rec)
2484{
2485	tdb_off_t rec_ptr, last_ptr, newrec_ptr;
2486	struct {
2487		tdb_off_t rec_ptr, last_ptr;
2488		tdb_len_t rec_len;
2489	} bestfit;
2490
2491	if (tdb_lock(tdb, -1, F_WRLCK) == -1)
2492		return 0;
2493
2494	/* Extra bytes required for tailer */
2495	length += sizeof(tdb_off_t);
2496
2497 again:
2498	last_ptr = FREELIST_TOP;
2499
2500	/* read in the freelist top */
2501	if (tdb_ofs_read(tdb, FREELIST_TOP, &rec_ptr) == -1)
2502		goto fail;
2503
2504	bestfit.rec_ptr = 0;
2505	bestfit.last_ptr = 0;
2506	bestfit.rec_len = 0;
2507
2508	/*
2509	   this is a best fit allocation strategy. Originally we used
2510	   a first fit strategy, but it suffered from massive fragmentation
2511	   issues when faced with a slowly increasing record size.
2512	 */
2513	while (rec_ptr) {
2514		if (tdb_rec_free_read(tdb, rec_ptr, rec) == -1) {
2515			goto fail;
2516		}
2517
2518		if (rec->rec_len >= length) {
2519			if (bestfit.rec_ptr == 0 ||
2520			    rec->rec_len < bestfit.rec_len) {
2521				bestfit.rec_len = rec->rec_len;
2522				bestfit.rec_ptr = rec_ptr;
2523				bestfit.last_ptr = last_ptr;
2524				/* consider a fit to be good enough if
2525				   we aren't wasting more than half
2526				   the space */
2527				if (bestfit.rec_len < 2*length) {
2528					break;
2529				}
2530			}
2531		}
2532
2533		/* move to the next record */
2534		last_ptr = rec_ptr;
2535		rec_ptr = rec->next;
2536	}
2537
2538	if (bestfit.rec_ptr != 0) {
2539		if (tdb_rec_free_read(tdb, bestfit.rec_ptr, rec) == -1) {
2540			goto fail;
2541		}
2542
2543		newrec_ptr = tdb_allocate_ofs(tdb, length, bestfit.rec_ptr, rec, bestfit.last_ptr);
2544		tdb_unlock(tdb, -1, F_WRLCK);
2545		return newrec_ptr;
2546	}
2547
2548	/* we didn't find enough space. See if we can expand the
2549	   database and if we can then try again */
2550	if (tdb_expand(tdb, length + sizeof(*rec)) == 0)
2551		goto again;
2552 fail:
2553	tdb_unlock(tdb, -1, F_WRLCK);
2554	return 0;
2555}
2556
2557/* file: freelistcheck.c */
2558
2559/* Check the freelist is good and contains no loops.
2560   Very memory intensive - only do this as a consistency
2561   checker. Heh heh - uses an in memory tdb as the storage
2562   for the "seen" record list. For some reason this strikes
2563   me as extremely clever as I don't have to write another tree
2564   data structure implementation :-).
2565 */
2566
2567static int seen_insert(struct tdb_context *mem_tdb, tdb_off_t rec_ptr)
2568{
2569	TDB_DATA key, data;
2570
2571	memset(&data, '\0', sizeof(data));
2572	key.dptr = (unsigned char *)&rec_ptr;
2573	key.dsize = sizeof(rec_ptr);
2574	return tdb_store(mem_tdb, key, data, TDB_INSERT);
2575}
2576
2577int tdb_validate_freelist(struct tdb_context *tdb, int *pnum_entries)
2578{
2579	struct tdb_context *mem_tdb = NULL;
2580	struct list_struct rec;
2581	tdb_off_t rec_ptr, last_ptr;
2582	int ret = -1;
2583
2584	*pnum_entries = 0;
2585
2586	mem_tdb = tdb_open("flval", tdb->header.hash_size,
2587				TDB_INTERNAL, O_RDWR, 0600);
2588	if (!mem_tdb) {
2589		return -1;
2590	}
2591
2592	if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
2593		tdb_close(mem_tdb);
2594		return 0;
2595	}
2596
2597	last_ptr = FREELIST_TOP;
2598
2599	/* Store the FREELIST_TOP record. */
2600	if (seen_insert(mem_tdb, last_ptr) == -1) {
2601		ret = TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
2602		goto fail;
2603	}
2604
2605	/* read in the freelist top */
2606	if (tdb_ofs_read(tdb, FREELIST_TOP, &rec_ptr) == -1) {
2607		goto fail;
2608	}
2609
2610	while (rec_ptr) {
2611
2612		/* If we can't store this record (we've seen it
2613		   before) then the free list has a loop and must
2614		   be corrupt. */
2615
2616		if (seen_insert(mem_tdb, rec_ptr)) {
2617			ret = TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
2618			goto fail;
2619		}
2620
2621		if (tdb_rec_free_read(tdb, rec_ptr, &rec) == -1) {
2622			goto fail;
2623		}
2624
2625		/* move to the next record */
2626		last_ptr = rec_ptr;
2627		rec_ptr = rec.next;
2628		*pnum_entries += 1;
2629	}
2630
2631	ret = 0;
2632
2633  fail:
2634
2635	tdb_close(mem_tdb);
2636	tdb_unlock(tdb, -1, F_WRLCK);
2637	return ret;
2638}
2639
2640/* file: traverse.c */
2641
2642/* Uses traverse lock: 0 = finish, -1 = error, other = record offset */
2643static int tdb_next_lock(struct tdb_context *tdb, struct tdb_traverse_lock *tlock,
2644			 struct list_struct *rec)
2645{
2646	int want_next = (tlock->off != 0);
2647
2648	/* Lock each chain from the start one. */
2649	for (; tlock->hash < tdb->header.hash_size; tlock->hash++) {
2650		if (!tlock->off && tlock->hash != 0) {
2651			/* this is an optimisation for the common case where
2652			   the hash chain is empty, which is particularly
2653			   common for the use of tdb with ldb, where large
2654			   hashes are used. In that case we spend most of our
2655			   time in tdb_brlock(), locking empty hash chains.
2656
2657			   To avoid this, we do an unlocked pre-check to see
2658			   if the hash chain is empty before starting to look
2659			   inside it. If it is empty then we can avoid that
2660			   hash chain. If it isn't empty then we can't believe
2661			   the value we get back, as we read it without a
2662			   lock, so instead we get the lock and re-fetch the
2663			   value below.
2664
2665			   Notice that not doing this optimisation on the
2666			   first hash chain is critical. We must guarantee
2667			   that we have done at least one fcntl lock at the
2668			   start of a search to guarantee that memory is
2669			   coherent on SMP systems. If records are added by
2670			   others during the search then thats OK, and we
2671			   could possibly miss those with this trick, but we
2672			   could miss them anyway without this trick, so the
2673			   semantics don't change.
2674
2675			   With a non-indexed ldb search this trick gains us a
2676			   factor of around 80 in speed on a linux 2.6.x
2677			   system (testing using ldbtest).
2678			*/
2679			tdb->methods->next_hash_chain(tdb, &tlock->hash);
2680			if (tlock->hash == tdb->header.hash_size) {
2681				continue;
2682			}
2683		}
2684
2685		if (tdb_lock(tdb, tlock->hash, tlock->lock_rw) == -1)
2686			return -1;
2687
2688		/* No previous record?  Start at top of chain. */
2689		if (!tlock->off) {
2690			if (tdb_ofs_read(tdb, TDB_HASH_TOP(tlock->hash),
2691				     &tlock->off) == -1)
2692				goto fail;
2693		} else {
2694			/* Otherwise unlock the previous record. */
2695			if (tdb_unlock_record(tdb, tlock->off) != 0)
2696				goto fail;
2697		}
2698
2699		if (want_next) {
2700			/* We have offset of old record: grab next */
2701			if (tdb_rec_read(tdb, tlock->off, rec) == -1)
2702				goto fail;
2703			tlock->off = rec->next;
2704		}
2705
2706		/* Iterate through chain */
2707		while( tlock->off) {
2708			tdb_off_t current;
2709			if (tdb_rec_read(tdb, tlock->off, rec) == -1)
2710				goto fail;
2711
2712			/* Detect infinite loops. From "Shlomi Yaakobovich" <Shlomi@exanet.com>. */
2713			if (tlock->off == rec->next) {
2714				TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_next_lock: loop detected.\n"));
2715				goto fail;
2716			}
2717
2718			if (!TDB_DEAD(rec)) {
2719				/* Woohoo: we found one! */
2720				if (tdb_lock_record(tdb, tlock->off) != 0)
2721					goto fail;
2722				return tlock->off;
2723			}
2724
2725			/* Try to clean dead ones from old traverses */
2726			current = tlock->off;
2727			tlock->off = rec->next;
2728			if (!(tdb->read_only || tdb->traverse_read) &&
2729			    tdb_do_delete(tdb, current, rec) != 0)
2730				goto fail;
2731		}
2732		tdb_unlock(tdb, tlock->hash, tlock->lock_rw);
2733		want_next = 0;
2734	}
2735	/* We finished iteration without finding anything */
2736	return TDB_ERRCODE(TDB_SUCCESS, 0);
2737
2738 fail:
2739	tlock->off = 0;
2740	if (tdb_unlock(tdb, tlock->hash, tlock->lock_rw) != 0)
2741		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_next_lock: On error unlock failed!\n"));
2742	return -1;
2743}
2744
2745/* traverse the entire database - calling fn(tdb, key, data) on each element.
2746   return -1 on error or the record count traversed
2747   if fn is NULL then it is not called
2748   a non-zero return value from fn() indicates that the traversal should stop
2749  */
2750static int tdb_traverse_internal(struct tdb_context *tdb,
2751				 tdb_traverse_func fn, void *private_data,
2752				 struct tdb_traverse_lock *tl)
2753{
2754	TDB_DATA key, dbuf;
2755	struct list_struct rec;
2756	int ret, count = 0;
2757
2758	/* This was in the initializaton, above, but the IRIX compiler
2759	 * did not like it.  crh
2760	 */
2761	tl->next = tdb->travlocks.next;
2762
2763	/* fcntl locks don't stack: beware traverse inside traverse */
2764	tdb->travlocks.next = tl;
2765
2766	/* tdb_next_lock places locks on the record returned, and its chain */
2767	while ((ret = tdb_next_lock(tdb, tl, &rec)) > 0) {
2768		count++;
2769		/* now read the full record */
2770		key.dptr = tdb_alloc_read(tdb, tl->off + sizeof(rec),
2771					  rec.key_len + rec.data_len);
2772		if (!key.dptr) {
2773			ret = -1;
2774			if (tdb_unlock(tdb, tl->hash, tl->lock_rw) != 0)
2775				goto out;
2776			if (tdb_unlock_record(tdb, tl->off) != 0)
2777				TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_traverse: key.dptr == NULL and unlock_record failed!\n"));
2778			goto out;
2779		}
2780		key.dsize = rec.key_len;
2781		dbuf.dptr = key.dptr + rec.key_len;
2782		dbuf.dsize = rec.data_len;
2783
2784		/* Drop chain lock, call out */
2785		if (tdb_unlock(tdb, tl->hash, tl->lock_rw) != 0) {
2786			ret = -1;
2787			SAFE_FREE(key.dptr);
2788			goto out;
2789		}
2790		if (fn && fn(tdb, key, dbuf, private_data)) {
2791			/* They want us to terminate traversal */
2792			ret = count;
2793			if (tdb_unlock_record(tdb, tl->off) != 0) {
2794				TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_traverse: unlock_record failed!\n"));;
2795				ret = -1;
2796			}
2797			SAFE_FREE(key.dptr);
2798			goto out;
2799		}
2800		SAFE_FREE(key.dptr);
2801	}
2802out:
2803	tdb->travlocks.next = tl->next;
2804	if (ret < 0)
2805		return -1;
2806	else
2807		return count;
2808}
2809
2810
2811/*
2812  a write style traverse - temporarily marks the db read only
2813*/
2814int tdb_traverse_read(struct tdb_context *tdb,
2815		      tdb_traverse_func fn, void *private_data)
2816{
2817	struct tdb_traverse_lock tl = { NULL, 0, 0, F_RDLCK };
2818	int ret;
2819
2820	/* we need to get a read lock on the transaction lock here to
2821	   cope with the lock ordering semantics of solaris10 */
2822	if (tdb_transaction_lock(tdb, F_RDLCK)) {
2823		return -1;
2824	}
2825
2826	tdb->traverse_read++;
2827	ret = tdb_traverse_internal(tdb, fn, private_data, &tl);
2828	tdb->traverse_read--;
2829
2830	tdb_transaction_unlock(tdb);
2831
2832	return ret;
2833}
2834
2835/*
2836  a write style traverse - needs to get the transaction lock to
2837  prevent deadlocks
2838*/
2839int tdb_traverse(struct tdb_context *tdb,
2840		 tdb_traverse_func fn, void *private_data)
2841{
2842	struct tdb_traverse_lock tl = { NULL, 0, 0, F_WRLCK };
2843	int ret;
2844
2845	if (tdb->read_only || tdb->traverse_read) {
2846		return tdb_traverse_read(tdb, fn, private_data);
2847	}
2848
2849	if (tdb_transaction_lock(tdb, F_WRLCK)) {
2850		return -1;
2851	}
2852
2853	ret = tdb_traverse_internal(tdb, fn, private_data, &tl);
2854
2855	tdb_transaction_unlock(tdb);
2856
2857	return ret;
2858}
2859
2860
2861/* find the first entry in the database and return its key */
2862TDB_DATA tdb_firstkey(struct tdb_context *tdb)
2863{
2864	TDB_DATA key;
2865	struct list_struct rec;
2866
2867	/* release any old lock */
2868	if (tdb_unlock_record(tdb, tdb->travlocks.off) != 0)
2869		return tdb_null;
2870	tdb->travlocks.off = tdb->travlocks.hash = 0;
2871	tdb->travlocks.lock_rw = F_RDLCK;
2872
2873	/* Grab first record: locks chain and returned record. */
2874	if (tdb_next_lock(tdb, &tdb->travlocks, &rec) <= 0)
2875		return tdb_null;
2876	/* now read the key */
2877	key.dsize = rec.key_len;
2878	key.dptr =tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),key.dsize);
2879
2880	/* Unlock the hash chain of the record we just read. */
2881	if (tdb_unlock(tdb, tdb->travlocks.hash, tdb->travlocks.lock_rw) != 0)
2882		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_firstkey: error occurred while tdb_unlocking!\n"));
2883	return key;
2884}
2885
2886/* find the next entry in the database, returning its key */
2887TDB_DATA tdb_nextkey(struct tdb_context *tdb, TDB_DATA oldkey)
2888{
2889	u32 oldhash;
2890	TDB_DATA key = tdb_null;
2891	struct list_struct rec;
2892	unsigned char *k = NULL;
2893
2894	/* Is locked key the old key?  If so, traverse will be reliable. */
2895	if (tdb->travlocks.off) {
2896		if (tdb_lock(tdb,tdb->travlocks.hash,tdb->travlocks.lock_rw))
2897			return tdb_null;
2898		if (tdb_rec_read(tdb, tdb->travlocks.off, &rec) == -1
2899		    || !(k = tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),
2900					    rec.key_len))
2901		    || memcmp(k, oldkey.dptr, oldkey.dsize) != 0) {
2902			/* No, it wasn't: unlock it and start from scratch */
2903			if (tdb_unlock_record(tdb, tdb->travlocks.off) != 0) {
2904				SAFE_FREE(k);
2905				return tdb_null;
2906			}
2907			if (tdb_unlock(tdb, tdb->travlocks.hash, tdb->travlocks.lock_rw) != 0) {
2908				SAFE_FREE(k);
2909				return tdb_null;
2910			}
2911			tdb->travlocks.off = 0;
2912		}
2913
2914		SAFE_FREE(k);
2915	}
2916
2917	if (!tdb->travlocks.off) {
2918		/* No previous element: do normal find, and lock record */
2919		tdb->travlocks.off = tdb_find_lock_hash(tdb, oldkey, tdb->hash_fn(&oldkey), tdb->travlocks.lock_rw, &rec);
2920		if (!tdb->travlocks.off)
2921			return tdb_null;
2922		tdb->travlocks.hash = BUCKET(rec.full_hash);
2923		if (tdb_lock_record(tdb, tdb->travlocks.off) != 0) {
2924			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_nextkey: lock_record failed (%s)!\n", strerror(errno)));
2925			return tdb_null;
2926		}
2927	}
2928	oldhash = tdb->travlocks.hash;
2929
2930	/* Grab next record: locks chain and returned record,
2931	   unlocks old record */
2932	if (tdb_next_lock(tdb, &tdb->travlocks, &rec) > 0) {
2933		key.dsize = rec.key_len;
2934		key.dptr = tdb_alloc_read(tdb, tdb->travlocks.off+sizeof(rec),
2935					  key.dsize);
2936		/* Unlock the chain of this new record */
2937		if (tdb_unlock(tdb, tdb->travlocks.hash, tdb->travlocks.lock_rw) != 0)
2938			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
2939	}
2940	/* Unlock the chain of old record */
2941	if (tdb_unlock(tdb, BUCKET(oldhash), tdb->travlocks.lock_rw) != 0)
2942		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
2943	return key;
2944}
2945
2946/* file: dump.c */
2947
2948static tdb_off_t tdb_dump_record(struct tdb_context *tdb, int hash,
2949				 tdb_off_t offset)
2950{
2951	struct list_struct rec;
2952	tdb_off_t tailer_ofs, tailer;
2953
2954	if (tdb->methods->tdb_read(tdb, offset, (char *)&rec,
2955				   sizeof(rec), DOCONV()) == -1) {
2956		printf("ERROR: failed to read record at %u\n", offset);
2957		return 0;
2958	}
2959
2960	printf(" rec: hash=%d offset=0x%08x next=0x%08x rec_len=%d "
2961	       "key_len=%d data_len=%d full_hash=0x%x magic=0x%x\n",
2962	       hash, offset, rec.next, rec.rec_len, rec.key_len, rec.data_len,
2963	       rec.full_hash, rec.magic);
2964
2965	tailer_ofs = offset + sizeof(rec) + rec.rec_len - sizeof(tdb_off_t);
2966
2967	if (tdb_ofs_read(tdb, tailer_ofs, &tailer) == -1) {
2968		printf("ERROR: failed to read tailer at %u\n", tailer_ofs);
2969		return rec.next;
2970	}
2971
2972	if (tailer != rec.rec_len + sizeof(rec)) {
2973		printf("ERROR: tailer does not match record! tailer=%u totalsize=%u\n",
2974				(unsigned int)tailer, (unsigned int)(rec.rec_len + sizeof(rec)));
2975	}
2976	return rec.next;
2977}
2978
2979static int tdb_dump_chain(struct tdb_context *tdb, int i)
2980{
2981	tdb_off_t rec_ptr, top;
2982
2983	top = TDB_HASH_TOP(i);
2984
2985	if (tdb_lock(tdb, i, F_WRLCK) != 0)
2986		return -1;
2987
2988	if (tdb_ofs_read(tdb, top, &rec_ptr) == -1)
2989		return tdb_unlock(tdb, i, F_WRLCK);
2990
2991	if (rec_ptr)
2992		printf("hash=%d\n", i);
2993
2994	while (rec_ptr) {
2995		rec_ptr = tdb_dump_record(tdb, i, rec_ptr);
2996	}
2997
2998	return tdb_unlock(tdb, i, F_WRLCK);
2999}
3000
3001void tdb_dump_all(struct tdb_context *tdb)
3002{
3003	int i;
3004	for (i=0;i<tdb->header.hash_size;i++) {
3005		tdb_dump_chain(tdb, i);
3006	}
3007	printf("freelist:\n");
3008	tdb_dump_chain(tdb, -1);
3009}
3010
3011int tdb_printfreelist(struct tdb_context *tdb)
3012{
3013	int ret;
3014	long total_free = 0;
3015	tdb_off_t offset, rec_ptr;
3016	struct list_struct rec;
3017
3018	if ((ret = tdb_lock(tdb, -1, F_WRLCK)) != 0)
3019		return ret;
3020
3021	offset = FREELIST_TOP;
3022
3023	/* read in the freelist top */
3024	if (tdb_ofs_read(tdb, offset, &rec_ptr) == -1) {
3025		tdb_unlock(tdb, -1, F_WRLCK);
3026		return 0;
3027	}
3028
3029	printf("freelist top=[0x%08x]\n", rec_ptr );
3030	while (rec_ptr) {
3031		if (tdb->methods->tdb_read(tdb, rec_ptr, (char *)&rec,
3032					   sizeof(rec), DOCONV()) == -1) {
3033			tdb_unlock(tdb, -1, F_WRLCK);
3034			return -1;
3035		}
3036
3037		if (rec.magic != TDB_FREE_MAGIC) {
3038			printf("bad magic 0x%08x in free list\n", rec.magic);
3039			tdb_unlock(tdb, -1, F_WRLCK);
3040			return -1;
3041		}
3042
3043		printf("entry offset=[0x%08x], rec.rec_len = [0x%08x (%d)] (end = 0x%08x)\n",
3044		       rec_ptr, rec.rec_len, rec.rec_len, rec_ptr + rec.rec_len);
3045		total_free += rec.rec_len;
3046
3047		/* move to the next record */
3048		rec_ptr = rec.next;
3049	}
3050	printf("total rec_len = [0x%08x (%d)]\n", (int)total_free,
3051               (int)total_free);
3052
3053	return tdb_unlock(tdb, -1, F_WRLCK);
3054}
3055
3056/* file: tdb.c */
3057
3058/*
3059  non-blocking increment of the tdb sequence number if the tdb has been opened using
3060  the TDB_SEQNUM flag
3061*/
3062void tdb_increment_seqnum_nonblock(struct tdb_context *tdb)
3063{
3064	tdb_off_t seqnum=0;
3065
3066	if (!(tdb->flags & TDB_SEQNUM)) {
3067		return;
3068	}
3069
3070	/* we ignore errors from this, as we have no sane way of
3071	   dealing with them.
3072	*/
3073	tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
3074	seqnum++;
3075	tdb_ofs_write(tdb, TDB_SEQNUM_OFS, &seqnum);
3076}
3077
3078/*
3079  increment the tdb sequence number if the tdb has been opened using
3080  the TDB_SEQNUM flag
3081*/
3082static void tdb_increment_seqnum(struct tdb_context *tdb)
3083{
3084	if (!(tdb->flags & TDB_SEQNUM)) {
3085		return;
3086	}
3087
3088	if (tdb_brlock(tdb, TDB_SEQNUM_OFS, F_WRLCK, F_SETLKW, 1, 1) != 0) {
3089		return;
3090	}
3091
3092	tdb_increment_seqnum_nonblock(tdb);
3093
3094	tdb_brlock(tdb, TDB_SEQNUM_OFS, F_UNLCK, F_SETLKW, 1, 1);
3095}
3096
3097static int tdb_key_compare(TDB_DATA key, TDB_DATA data, void *private_data)
3098{
3099	return memcmp(data.dptr, key.dptr, data.dsize);
3100}
3101
3102/* Returns 0 on fail.  On success, return offset of record, and fills
3103   in rec */
3104static tdb_off_t tdb_find(struct tdb_context *tdb, TDB_DATA key, u32 hash,
3105			struct list_struct *r)
3106{
3107	tdb_off_t rec_ptr;
3108
3109	/* read in the hash top */
3110	if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
3111		return 0;
3112
3113	/* keep looking until we find the right record */
3114	while (rec_ptr) {
3115		if (tdb_rec_read(tdb, rec_ptr, r) == -1)
3116			return 0;
3117
3118		if (!TDB_DEAD(r) && hash==r->full_hash
3119		    && key.dsize==r->key_len
3120		    && tdb_parse_data(tdb, key, rec_ptr + sizeof(*r),
3121				      r->key_len, tdb_key_compare,
3122				      NULL) == 0) {
3123			return rec_ptr;
3124		}
3125		rec_ptr = r->next;
3126	}
3127	return TDB_ERRCODE(TDB_ERR_NOEXIST, 0);
3128}
3129
3130/* As tdb_find, but if you succeed, keep the lock */
3131tdb_off_t tdb_find_lock_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash, int locktype,
3132			   struct list_struct *rec)
3133{
3134	u32 rec_ptr;
3135
3136	if (tdb_lock(tdb, BUCKET(hash), locktype) == -1)
3137		return 0;
3138	if (!(rec_ptr = tdb_find(tdb, key, hash, rec)))
3139		tdb_unlock(tdb, BUCKET(hash), locktype);
3140	return rec_ptr;
3141}
3142
3143
3144/* update an entry in place - this only works if the new data size
3145   is <= the old data size and the key exists.
3146   on failure return -1.
3147*/
3148static int tdb_update_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash, TDB_DATA dbuf)
3149{
3150	struct list_struct rec;
3151	tdb_off_t rec_ptr;
3152
3153	/* find entry */
3154	if (!(rec_ptr = tdb_find(tdb, key, hash, &rec)))
3155		return -1;
3156
3157	/* must be long enough key, data and tailer */
3158	if (rec.rec_len < key.dsize + dbuf.dsize + sizeof(tdb_off_t)) {
3159		tdb->ecode = TDB_SUCCESS; /* Not really an error */
3160		return -1;
3161	}
3162
3163	if (tdb->methods->tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len,
3164		      dbuf.dptr, dbuf.dsize) == -1)
3165		return -1;
3166
3167	if (dbuf.dsize != rec.data_len) {
3168		/* update size */
3169		rec.data_len = dbuf.dsize;
3170		return tdb_rec_write(tdb, rec_ptr, &rec);
3171	}
3172
3173	return 0;
3174}
3175
3176/* find an entry in the database given a key */
3177/* If an entry doesn't exist tdb_err will be set to
3178 * TDB_ERR_NOEXIST. If a key has no data attached
3179 * then the TDB_DATA will have zero length but
3180 * a non-zero pointer
3181 */
3182TDB_DATA tdb_fetch(struct tdb_context *tdb, TDB_DATA key)
3183{
3184	tdb_off_t rec_ptr;
3185	struct list_struct rec;
3186	TDB_DATA ret;
3187	u32 hash;
3188
3189	/* find which hash bucket it is in */
3190	hash = tdb->hash_fn(&key);
3191	if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec)))
3192		return tdb_null;
3193
3194	ret.dptr = tdb_alloc_read(tdb, rec_ptr + sizeof(rec) + rec.key_len,
3195				  rec.data_len);
3196	ret.dsize = rec.data_len;
3197	tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
3198	return ret;
3199}
3200
3201/*
3202 * Find an entry in the database and hand the record's data to a parsing
3203 * function. The parsing function is executed under the chain read lock, so it
3204 * should be fast and should not block on other syscalls.
3205 *
3206 * DONT CALL OTHER TDB CALLS FROM THE PARSER, THIS MIGHT LEAD TO SEGFAULTS.
3207 *
3208 * For mmapped tdb's that do not have a transaction open it points the parsing
3209 * function directly at the mmap area, it avoids the malloc/memcpy in this
3210 * case. If a transaction is open or no mmap is available, it has to do
3211 * malloc/read/parse/free.
3212 *
3213 * This is interesting for all readers of potentially large data structures in
3214 * the tdb records, ldb indexes being one example.
3215 */
3216
3217int tdb_parse_record(struct tdb_context *tdb, TDB_DATA key,
3218		     int (*parser)(TDB_DATA key, TDB_DATA data,
3219				   void *private_data),
3220		     void *private_data)
3221{
3222	tdb_off_t rec_ptr;
3223	struct list_struct rec;
3224	int ret;
3225	u32 hash;
3226
3227	/* find which hash bucket it is in */
3228	hash = tdb->hash_fn(&key);
3229
3230	if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec))) {
3231		return TDB_ERRCODE(TDB_ERR_NOEXIST, 0);
3232	}
3233
3234	ret = tdb_parse_data(tdb, key, rec_ptr + sizeof(rec) + rec.key_len,
3235			     rec.data_len, parser, private_data);
3236
3237	tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
3238
3239	return ret;
3240}
3241
3242/* check if an entry in the database exists
3243
3244   note that 1 is returned if the key is found and 0 is returned if not found
3245   this doesn't match the conventions in the rest of this module, but is
3246   compatible with gdbm
3247*/
3248static int tdb_exists_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash)
3249{
3250	struct list_struct rec;
3251
3252	if (tdb_find_lock_hash(tdb, key, hash, F_RDLCK, &rec) == 0)
3253		return 0;
3254	tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
3255	return 1;
3256}
3257
3258int tdb_exists(struct tdb_context *tdb, TDB_DATA key)
3259{
3260	u32 hash = tdb->hash_fn(&key);
3261	return tdb_exists_hash(tdb, key, hash);
3262}
3263
3264/* actually delete an entry in the database given the offset */
3265int tdb_do_delete(struct tdb_context *tdb, tdb_off_t rec_ptr, struct list_struct*rec)
3266{
3267	tdb_off_t last_ptr, i;
3268	struct list_struct lastrec;
3269
3270	if (tdb->read_only || tdb->traverse_read) return -1;
3271
3272	if (tdb_write_lock_record(tdb, rec_ptr) == -1) {
3273		/* Someone traversing here: mark it as dead */
3274		rec->magic = TDB_DEAD_MAGIC;
3275		return tdb_rec_write(tdb, rec_ptr, rec);
3276	}
3277	if (tdb_write_unlock_record(tdb, rec_ptr) != 0)
3278		return -1;
3279
3280	/* find previous record in hash chain */
3281	if (tdb_ofs_read(tdb, TDB_HASH_TOP(rec->full_hash), &i) == -1)
3282		return -1;
3283	for (last_ptr = 0; i != rec_ptr; last_ptr = i, i = lastrec.next)
3284		if (tdb_rec_read(tdb, i, &lastrec) == -1)
3285			return -1;
3286
3287	/* unlink it: next ptr is at start of record. */
3288	if (last_ptr == 0)
3289		last_ptr = TDB_HASH_TOP(rec->full_hash);
3290	if (tdb_ofs_write(tdb, last_ptr, &rec->next) == -1)
3291		return -1;
3292
3293	/* recover the space */
3294	if (tdb_free(tdb, rec_ptr, rec) == -1)
3295		return -1;
3296	return 0;
3297}
3298
3299static int tdb_count_dead(struct tdb_context *tdb, u32 hash)
3300{
3301	int res = 0;
3302	tdb_off_t rec_ptr;
3303	struct list_struct rec;
3304
3305	/* read in the hash top */
3306	if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
3307		return 0;
3308
3309	while (rec_ptr) {
3310		if (tdb_rec_read(tdb, rec_ptr, &rec) == -1)
3311			return 0;
3312
3313		if (rec.magic == TDB_DEAD_MAGIC) {
3314			res += 1;
3315		}
3316		rec_ptr = rec.next;
3317	}
3318	return res;
3319}
3320
3321/*
3322 * Purge all DEAD records from a hash chain
3323 */
3324static int tdb_purge_dead(struct tdb_context *tdb, u32 hash)
3325{
3326	int res = -1;
3327	struct list_struct rec;
3328	tdb_off_t rec_ptr;
3329
3330	if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
3331		return -1;
3332	}
3333
3334	/* read in the hash top */
3335	if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
3336		goto fail;
3337
3338	while (rec_ptr) {
3339		tdb_off_t next;
3340
3341		if (tdb_rec_read(tdb, rec_ptr, &rec) == -1) {
3342			goto fail;
3343		}
3344
3345		next = rec.next;
3346
3347		if (rec.magic == TDB_DEAD_MAGIC
3348		    && tdb_do_delete(tdb, rec_ptr, &rec) == -1) {
3349			goto fail;
3350		}
3351		rec_ptr = next;
3352	}
3353	res = 0;
3354 fail:
3355	tdb_unlock(tdb, -1, F_WRLCK);
3356	return res;
3357}
3358
3359/* delete an entry in the database given a key */
3360static int tdb_delete_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash)
3361{
3362	tdb_off_t rec_ptr;
3363	struct list_struct rec;
3364	int ret;
3365
3366	if (tdb->max_dead_records != 0) {
3367
3368		/*
3369		 * Allow for some dead records per hash chain, mainly for
3370		 * tdb's with a very high create/delete rate like locking.tdb.
3371		 */
3372
3373		if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
3374			return -1;
3375
3376		if (tdb_count_dead(tdb, hash) >= tdb->max_dead_records) {
3377			/*
3378			 * Don't let the per-chain freelist grow too large,
3379			 * delete all existing dead records
3380			 */
3381			tdb_purge_dead(tdb, hash);
3382		}
3383
3384		if (!(rec_ptr = tdb_find(tdb, key, hash, &rec))) {
3385			tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
3386			return -1;
3387		}
3388
3389		/*
3390		 * Just mark the record as dead.
3391		 */
3392		rec.magic = TDB_DEAD_MAGIC;
3393		ret = tdb_rec_write(tdb, rec_ptr, &rec);
3394	}
3395	else {
3396		if (!(rec_ptr = tdb_find_lock_hash(tdb, key, hash, F_WRLCK,
3397						   &rec)))
3398			return -1;
3399
3400		ret = tdb_do_delete(tdb, rec_ptr, &rec);
3401	}
3402
3403	if (ret == 0) {
3404		tdb_increment_seqnum(tdb);
3405	}
3406
3407	if (tdb_unlock(tdb, BUCKET(rec.full_hash), F_WRLCK) != 0)
3408		TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_delete: WARNING tdb_unlock failed!\n"));
3409	return ret;
3410}
3411
3412int tdb_delete(struct tdb_context *tdb, TDB_DATA key)
3413{
3414	u32 hash = tdb->hash_fn(&key);
3415	return tdb_delete_hash(tdb, key, hash);
3416}
3417
3418/*
3419 * See if we have a dead record around with enough space
3420 */
3421static tdb_off_t tdb_find_dead(struct tdb_context *tdb, u32 hash,
3422			       struct list_struct *r, tdb_len_t length)
3423{
3424	tdb_off_t rec_ptr;
3425
3426	/* read in the hash top */
3427	if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
3428		return 0;
3429
3430	/* keep looking until we find the right record */
3431	while (rec_ptr) {
3432		if (tdb_rec_read(tdb, rec_ptr, r) == -1)
3433			return 0;
3434
3435		if (TDB_DEAD(r) && r->rec_len >= length) {
3436			/*
3437			 * First fit for simple coding, TODO: change to best
3438			 * fit
3439			 */
3440			return rec_ptr;
3441		}
3442		rec_ptr = r->next;
3443	}
3444	return 0;
3445}
3446
3447/* store an element in the database, replacing any existing element
3448   with the same key
3449
3450   return 0 on success, -1 on failure
3451*/
3452int tdb_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
3453{
3454	struct list_struct rec;
3455	u32 hash;
3456	tdb_off_t rec_ptr;
3457	char *p = NULL;
3458	int ret = -1;
3459
3460	if (tdb->read_only || tdb->traverse_read) {
3461		tdb->ecode = TDB_ERR_RDONLY;
3462		return -1;
3463	}
3464
3465	/* find which hash bucket it is in */
3466	hash = tdb->hash_fn(&key);
3467	if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
3468		return -1;
3469
3470	/* check for it existing, on insert. */
3471	if (flag == TDB_INSERT) {
3472		if (tdb_exists_hash(tdb, key, hash)) {
3473			tdb->ecode = TDB_ERR_EXISTS;
3474			goto fail;
3475		}
3476	} else {
3477		/* first try in-place update, on modify or replace. */
3478		if (tdb_update_hash(tdb, key, hash, dbuf) == 0) {
3479			goto done;
3480		}
3481		if (tdb->ecode == TDB_ERR_NOEXIST &&
3482		    flag == TDB_MODIFY) {
3483			/* if the record doesn't exist and we are in TDB_MODIFY mode then
3484			 we should fail the store */
3485			goto fail;
3486		}
3487	}
3488	/* reset the error code potentially set by the tdb_update() */
3489	tdb->ecode = TDB_SUCCESS;
3490
3491	/* delete any existing record - if it doesn't exist we don't
3492           care.  Doing this first reduces fragmentation, and avoids
3493           coalescing with `allocated' block before it's updated. */
3494	if (flag != TDB_INSERT)
3495		tdb_delete_hash(tdb, key, hash);
3496
3497	/* Copy key+value *before* allocating free space in case malloc
3498	   fails and we are left with a dead spot in the tdb. */
3499
3500	if (!(p = (char *)malloc(key.dsize + dbuf.dsize))) {
3501		tdb->ecode = TDB_ERR_OOM;
3502		goto fail;
3503	}
3504
3505	memcpy(p, key.dptr, key.dsize);
3506	if (dbuf.dsize)
3507		memcpy(p+key.dsize, dbuf.dptr, dbuf.dsize);
3508
3509	if (tdb->max_dead_records != 0) {
3510		/*
3511		 * Allow for some dead records per hash chain, look if we can
3512		 * find one that can hold the new record. We need enough space
3513		 * for key, data and tailer. If we find one, we don't have to
3514		 * consult the central freelist.
3515		 */
3516		rec_ptr = tdb_find_dead(
3517			tdb, hash, &rec,
3518			key.dsize + dbuf.dsize + sizeof(tdb_off_t));
3519
3520		if (rec_ptr != 0) {
3521			rec.key_len = key.dsize;
3522			rec.data_len = dbuf.dsize;
3523			rec.full_hash = hash;
3524			rec.magic = TDB_MAGIC;
3525			if (tdb_rec_write(tdb, rec_ptr, &rec) == -1
3526			    || tdb->methods->tdb_write(
3527				    tdb, rec_ptr + sizeof(rec),
3528				    p, key.dsize + dbuf.dsize) == -1) {
3529				goto fail;
3530			}
3531			goto done;
3532		}
3533	}
3534
3535	/*
3536	 * We have to allocate some space from the freelist, so this means we
3537	 * have to lock it. Use the chance to purge all the DEAD records from
3538	 * the hash chain under the freelist lock.
3539	 */
3540
3541	if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
3542		goto fail;
3543	}
3544
3545	if ((tdb->max_dead_records != 0)
3546	    && (tdb_purge_dead(tdb, hash) == -1)) {
3547		tdb_unlock(tdb, -1, F_WRLCK);
3548		goto fail;
3549	}
3550
3551	/* we have to allocate some space */
3552	rec_ptr = tdb_allocate(tdb, key.dsize + dbuf.dsize, &rec);
3553
3554	tdb_unlock(tdb, -1, F_WRLCK);
3555
3556	if (rec_ptr == 0) {
3557		goto fail;
3558	}
3559
3560	/* Read hash top into next ptr */
3561	if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
3562		goto fail;
3563
3564	rec.key_len = key.dsize;
3565	rec.data_len = dbuf.dsize;
3566	rec.full_hash = hash;
3567	rec.magic = TDB_MAGIC;
3568
3569	/* write out and point the top of the hash chain at it */
3570	if (tdb_rec_write(tdb, rec_ptr, &rec) == -1
3571	    || tdb->methods->tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+dbuf.dsize)==-1
3572	    || tdb_ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
3573		/* Need to tdb_unallocate() here */
3574		goto fail;
3575	}
3576
3577 done:
3578	ret = 0;
3579 fail:
3580	if (ret == 0) {
3581		tdb_increment_seqnum(tdb);
3582	}
3583
3584	SAFE_FREE(p);
3585	tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
3586	return ret;
3587}
3588
3589
3590/* Append to an entry. Create if not exist. */
3591int tdb_append(struct tdb_context *tdb, TDB_DATA key, TDB_DATA new_dbuf)
3592{
3593	u32 hash;
3594	TDB_DATA dbuf;
3595	int ret = -1;
3596
3597	/* find which hash bucket it is in */
3598	hash = tdb->hash_fn(&key);
3599	if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
3600		return -1;
3601
3602	dbuf = tdb_fetch(tdb, key);
3603
3604	if (dbuf.dptr == NULL) {
3605		dbuf.dptr = (unsigned char *)malloc(new_dbuf.dsize);
3606	} else {
3607		unsigned char *new_dptr = (unsigned char *)realloc(dbuf.dptr,
3608						     dbuf.dsize + new_dbuf.dsize);
3609		if (new_dptr == NULL) {
3610			free(dbuf.dptr);
3611		}
3612		dbuf.dptr = new_dptr;
3613	}
3614
3615	if (dbuf.dptr == NULL) {
3616		tdb->ecode = TDB_ERR_OOM;
3617		goto failed;
3618	}
3619
3620	memcpy(dbuf.dptr + dbuf.dsize, new_dbuf.dptr, new_dbuf.dsize);
3621	dbuf.dsize += new_dbuf.dsize;
3622
3623	ret = tdb_store(tdb, key, dbuf, 0);
3624
3625failed:
3626	tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
3627	SAFE_FREE(dbuf.dptr);
3628	return ret;
3629}
3630
3631
3632/*
3633  return the name of the current tdb file
3634  useful for external logging functions
3635*/
3636const char *tdb_name(struct tdb_context *tdb)
3637{
3638	return tdb->name;
3639}
3640
3641/*
3642  return the underlying file descriptor being used by tdb, or -1
3643  useful for external routines that want to check the device/inode
3644  of the fd
3645*/
3646int tdb_fd(struct tdb_context *tdb)
3647{
3648	return tdb->fd;
3649}
3650
3651/*
3652  return the current logging function
3653  useful for external tdb routines that wish to log tdb errors
3654*/
3655tdb_log_func tdb_log_fn(struct tdb_context *tdb)
3656{
3657	return tdb->log.log_fn;
3658}
3659
3660
3661/*
3662  get the tdb sequence number. Only makes sense if the writers opened
3663  with TDB_SEQNUM set. Note that this sequence number will wrap quite
3664  quickly, so it should only be used for a 'has something changed'
3665  test, not for code that relies on the count of the number of changes
3666  made. If you want a counter then use a tdb record.
3667
3668  The aim of this sequence number is to allow for a very lightweight
3669  test of a possible tdb change.
3670*/
3671int tdb_get_seqnum(struct tdb_context *tdb)
3672{
3673	tdb_off_t seqnum=0;
3674
3675	tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
3676	return seqnum;
3677}
3678
3679int tdb_hash_size(struct tdb_context *tdb)
3680{
3681	return tdb->header.hash_size;
3682}
3683
3684size_t tdb_map_size(struct tdb_context *tdb)
3685{
3686	return tdb->map_size;
3687}
3688
3689int tdb_get_flags(struct tdb_context *tdb)
3690{
3691	return tdb->flags;
3692}
3693
3694
3695/*
3696  enable sequence number handling on an open tdb
3697*/
3698void tdb_enable_seqnum(struct tdb_context *tdb)
3699{
3700	tdb->flags |= TDB_SEQNUM;
3701}
3702
3703/* file: open.c */
3704
3705/* all contexts, to ensure no double-opens (fcntl locks don't nest!) */
3706static struct tdb_context *tdbs = NULL;
3707
3708
3709/* This is from a hash algorithm suggested by Rogier Wolff */
3710static unsigned int default_tdb_hash(TDB_DATA *key)
3711{
3712	u32 value;	/* Used to compute the hash value.  */
3713	u32   i;	/* Used to cycle through random values. */
3714
3715	/* Set the initial value from the key size. */
3716	for (value = 0, i=0; i < key->dsize; i++)
3717		value = value * 256 + key->dptr[i] + (value >> 24) * 241;
3718
3719	return value;
3720}
3721
3722
3723/* initialise a new database with a specified hash size */
3724static int tdb_new_database(struct tdb_context *tdb, int hash_size)
3725{
3726	struct tdb_header *newdb;
3727	int size, ret = -1;
3728
3729	/* We make it up in memory, then write it out if not internal */
3730	size = sizeof(struct tdb_header) + (hash_size+1)*sizeof(tdb_off_t);
3731	if (!(newdb = (struct tdb_header *)calloc(size, 1)))
3732		return TDB_ERRCODE(TDB_ERR_OOM, -1);
3733
3734	/* Fill in the header */
3735	newdb->version = TDB_VERSION;
3736	newdb->hash_size = hash_size;
3737	if (tdb->flags & TDB_INTERNAL) {
3738		tdb->map_size = size;
3739		tdb->map_ptr = (char *)newdb;
3740		memcpy(&tdb->header, newdb, sizeof(tdb->header));
3741		/* Convert the `ondisk' version if asked. */
3742		CONVERT(*newdb);
3743		return 0;
3744	}
3745	if (lseek(tdb->fd, 0, SEEK_SET) == -1)
3746		goto fail;
3747
3748	if (ftruncate(tdb->fd, 0) == -1)
3749		goto fail;
3750
3751	/* This creates an endian-converted header, as if read from disk */
3752	CONVERT(*newdb);
3753	memcpy(&tdb->header, newdb, sizeof(tdb->header));
3754	/* Don't endian-convert the magic food! */
3755	memcpy(newdb->magic_food, TDB_MAGIC_FOOD, strlen(TDB_MAGIC_FOOD)+1);
3756	if (write(tdb->fd, newdb, size) != size) {
3757		ret = -1;
3758	} else {
3759		ret = 0;
3760	}
3761
3762  fail:
3763	SAFE_FREE(newdb);
3764	return ret;
3765}
3766
3767
3768
3769static int tdb_already_open(dev_t device,
3770			    ino_t ino)
3771{
3772	struct tdb_context *i;
3773
3774	for (i = tdbs; i; i = i->next) {
3775		if (i->device == device && i->inode == ino) {
3776			return 1;
3777		}
3778	}
3779
3780	return 0;
3781}
3782
3783/* open the database, creating it if necessary
3784
3785   The open_flags and mode are passed straight to the open call on the
3786   database file. A flags value of O_WRONLY is invalid. The hash size
3787   is advisory, use zero for a default value.
3788
3789   Return is NULL on error, in which case errno is also set.  Don't
3790   try to call tdb_error or tdb_errname, just do strerror(errno).
3791
3792   @param name may be NULL for internal databases. */
3793struct tdb_context *tdb_open(const char *name, int hash_size, int tdb_flags,
3794		      int open_flags, mode_t mode)
3795{
3796	return tdb_open_ex(name, hash_size, tdb_flags, open_flags, mode, NULL, NULL);
3797}
3798
3799/* a default logging function */
3800static void null_log_fn(struct tdb_context *tdb, enum tdb_debug_level level, const char *fmt, ...) PRINTF_ATTRIBUTE(3, 4);
3801static void null_log_fn(struct tdb_context *tdb, enum tdb_debug_level level, const char *fmt, ...)
3802{
3803}
3804
3805
3806struct tdb_context *tdb_open_ex(const char *name, int hash_size, int tdb_flags,
3807				int open_flags, mode_t mode,
3808				const struct tdb_logging_context *log_ctx,
3809				tdb_hash_func hash_fn)
3810{
3811	struct tdb_context *tdb;
3812	struct stat st;
3813	int rev = 0, locked = 0;
3814	unsigned char *vp;
3815	u32 vertest;
3816
3817	if (!(tdb = (struct tdb_context *)calloc(1, sizeof *tdb))) {
3818		/* Can't log this */
3819		errno = ENOMEM;
3820		goto fail;
3821	}
3822	tdb_io_init(tdb);
3823	tdb->fd = -1;
3824	tdb->name = NULL;
3825	tdb->map_ptr = NULL;
3826	tdb->flags = tdb_flags;
3827	tdb->open_flags = open_flags;
3828	if (log_ctx) {
3829		tdb->log = *log_ctx;
3830	} else {
3831		tdb->log.log_fn = null_log_fn;
3832		tdb->log.log_private = NULL;
3833	}
3834	tdb->hash_fn = hash_fn ? hash_fn : default_tdb_hash;
3835
3836	/* cache the page size */
3837	tdb->page_size = sysconf(_SC_PAGESIZE);
3838	if (tdb->page_size <= 0) {
3839		tdb->page_size = 0x2000;
3840	}
3841
3842	if ((open_flags & O_ACCMODE) == O_WRONLY) {
3843		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: can't open tdb %s write-only\n",
3844			 name));
3845		errno = EINVAL;
3846		goto fail;
3847	}
3848
3849	if (hash_size == 0)
3850		hash_size = DEFAULT_HASH_SIZE;
3851	if ((open_flags & O_ACCMODE) == O_RDONLY) {
3852		tdb->read_only = 1;
3853		/* read only databases don't do locking or clear if first */
3854		tdb->flags |= TDB_NOLOCK;
3855		tdb->flags &= ~TDB_CLEAR_IF_FIRST;
3856	}
3857
3858	/* internal databases don't mmap or lock, and start off cleared */
3859	if (tdb->flags & TDB_INTERNAL) {
3860		tdb->flags |= (TDB_NOLOCK | TDB_NOMMAP);
3861		tdb->flags &= ~TDB_CLEAR_IF_FIRST;
3862		if (tdb_new_database(tdb, hash_size) != 0) {
3863			TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: tdb_new_database failed!"));
3864			goto fail;
3865		}
3866		goto internal;
3867	}
3868
3869	if ((tdb->fd = open(name, open_flags, mode)) == -1) {
3870		TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_open_ex: could not open file %s: %s\n",
3871			 name, strerror(errno)));
3872		goto fail;	/* errno set by open(2) */
3873	}
3874
3875	/* ensure there is only one process initialising at once */
3876	if (tdb->methods->tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
3877		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: failed to get global lock on %s: %s\n",
3878			 name, strerror(errno)));
3879		goto fail;	/* errno set by tdb_brlock */
3880	}
3881
3882	/* we need to zero database if we are the only one with it open */
3883	if ((tdb_flags & TDB_CLEAR_IF_FIRST) &&
3884	    (locked = (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_WRLCK, F_SETLK, 0, 1) == 0))) {
3885		open_flags |= O_CREAT;
3886		if (ftruncate(tdb->fd, 0) == -1) {
3887			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_open_ex: "
3888				 "failed to truncate %s: %s\n",
3889				 name, strerror(errno)));
3890			goto fail; /* errno set by ftruncate */
3891		}
3892	}
3893
3894	if (read(tdb->fd, &tdb->header, sizeof(tdb->header)) != sizeof(tdb->header)
3895	    || strcmp(tdb->header.magic_food, TDB_MAGIC_FOOD) != 0
3896	    || (tdb->header.version != TDB_VERSION
3897		&& !(rev = (tdb->header.version==TDB_BYTEREV(TDB_VERSION))))) {
3898		/* its not a valid database - possibly initialise it */
3899		if (!(open_flags & O_CREAT) || tdb_new_database(tdb, hash_size) == -1) {
3900			errno = EIO; /* ie bad format or something */
3901			goto fail;
3902		}
3903		rev = (tdb->flags & TDB_CONVERT);
3904	}
3905	vp = (unsigned char *)&tdb->header.version;
3906	vertest = (((u32)vp[0]) << 24) | (((u32)vp[1]) << 16) |
3907		  (((u32)vp[2]) << 8) | (u32)vp[3];
3908	tdb->flags |= (vertest==TDB_VERSION) ? TDB_BIGENDIAN : 0;
3909	if (!rev)
3910		tdb->flags &= ~TDB_CONVERT;
3911	else {
3912		tdb->flags |= TDB_CONVERT;
3913		tdb_convert(&tdb->header, sizeof(tdb->header));
3914	}
3915	if (fstat(tdb->fd, &st) == -1)
3916		goto fail;
3917
3918	if (tdb->header.rwlocks != 0) {
3919		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: spinlocks no longer supported\n"));
3920		goto fail;
3921	}
3922
3923	/* Is it already in the open list?  If so, fail. */
3924	if (tdb_already_open(st.st_dev, st.st_ino)) {
3925		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: "
3926			 "%s (%d,%d) is already open in this process\n",
3927			 name, (int)st.st_dev, (int)st.st_ino));
3928		errno = EBUSY;
3929		goto fail;
3930	}
3931
3932	if (!(tdb->name = (char *)strdup(name))) {
3933		errno = ENOMEM;
3934		goto fail;
3935	}
3936
3937	tdb->map_size = st.st_size;
3938	tdb->device = st.st_dev;
3939	tdb->inode = st.st_ino;
3940	tdb->max_dead_records = 0;
3941	tdb_mmap(tdb);
3942	if (locked) {
3943		if (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_UNLCK, F_SETLK, 0, 1) == -1) {
3944			TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: "
3945				 "failed to take ACTIVE_LOCK on %s: %s\n",
3946				 name, strerror(errno)));
3947			goto fail;
3948		}
3949
3950	}
3951
3952	/* We always need to do this if the CLEAR_IF_FIRST flag is set, even if
3953	   we didn't get the initial exclusive lock as we need to let all other
3954	   users know we're using it. */
3955
3956	if (tdb_flags & TDB_CLEAR_IF_FIRST) {
3957		/* leave this lock in place to indicate it's in use */
3958		if (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0, 1) == -1)
3959			goto fail;
3960	}
3961
3962	/* if needed, run recovery */
3963	if (tdb_transaction_recover(tdb) == -1) {
3964		goto fail;
3965	}
3966
3967 internal:
3968	/* Internal (memory-only) databases skip all the code above to
3969	 * do with disk files, and resume here by releasing their
3970	 * global lock and hooking into the active list. */
3971	if (tdb->methods->tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1) == -1)
3972		goto fail;
3973	tdb->next = tdbs;
3974	tdbs = tdb;
3975	return tdb;
3976
3977 fail:
3978	{ int save_errno = errno;
3979
3980	if (!tdb)
3981		return NULL;
3982
3983	if (tdb->map_ptr) {
3984		if (tdb->flags & TDB_INTERNAL)
3985			SAFE_FREE(tdb->map_ptr);
3986		else
3987			tdb_munmap(tdb);
3988	}
3989	SAFE_FREE(tdb->name);
3990	if (tdb->fd != -1)
3991		if (close(tdb->fd) != 0)
3992			TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: failed to close tdb->fd on error!\n"));
3993	SAFE_FREE(tdb);
3994	errno = save_errno;
3995	return NULL;
3996	}
3997}
3998
3999/*
4000 * Set the maximum number of dead records per hash chain
4001 */
4002
4003void tdb_set_max_dead(struct tdb_context *tdb, int max_dead)
4004{
4005	tdb->max_dead_records = max_dead;
4006}
4007
4008/**
4009 * Close a database.
4010 *
4011 * @returns -1 for error; 0 for success.
4012 **/
4013int tdb_close(struct tdb_context *tdb)
4014{
4015	struct tdb_context **i;
4016	int ret = 0;
4017
4018	if (tdb->transaction) {
4019		tdb_transaction_cancel(tdb);
4020	}
4021
4022	if (tdb->map_ptr) {
4023		if (tdb->flags & TDB_INTERNAL)
4024			SAFE_FREE(tdb->map_ptr);
4025		else
4026			tdb_munmap(tdb);
4027	}
4028	SAFE_FREE(tdb->name);
4029	if (tdb->fd != -1)
4030		ret = close(tdb->fd);
4031	SAFE_FREE(tdb->lockrecs);
4032
4033	/* Remove from contexts list */
4034	for (i = &tdbs; *i; i = &(*i)->next) {
4035		if (*i == tdb) {
4036			*i = tdb->next;
4037			break;
4038		}
4039	}
4040
4041	memset(tdb, 0, sizeof(*tdb));
4042	SAFE_FREE(tdb);
4043
4044	return ret;
4045}
4046
4047/* register a loging function */
4048void tdb_set_logging_function(struct tdb_context *tdb,
4049                              const struct tdb_logging_context *log_ctx)
4050{
4051        tdb->log = *log_ctx;
4052}
4053
4054void *tdb_get_logging_private(struct tdb_context *tdb)
4055{
4056	return tdb->log.log_private;
4057}
4058
4059/* reopen a tdb - this can be used after a fork to ensure that we have an independent
4060   seek pointer from our parent and to re-establish locks */
4061int tdb_reopen(struct tdb_context *tdb)
4062{
4063	struct stat st;
4064
4065	if (tdb->flags & TDB_INTERNAL) {
4066		return 0; /* Nothing to do. */
4067	}
4068
4069	if (tdb->num_locks != 0 || tdb->global_lock.count) {
4070		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_reopen: reopen not allowed with locks held\n"));
4071		goto fail;
4072	}
4073
4074	if (tdb->transaction != 0) {
4075		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_reopen: reopen not allowed inside a transaction\n"));
4076		goto fail;
4077	}
4078
4079	if (tdb_munmap(tdb) != 0) {
4080		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: munmap failed (%s)\n", strerror(errno)));
4081		goto fail;
4082	}
4083	if (close(tdb->fd) != 0)
4084		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: WARNING closing tdb->fd failed!\n"));
4085	tdb->fd = open(tdb->name, tdb->open_flags & ~(O_CREAT|O_TRUNC), 0);
4086	if (tdb->fd == -1) {
4087		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: open failed (%s)\n", strerror(errno)));
4088		goto fail;
4089	}
4090	if ((tdb->flags & TDB_CLEAR_IF_FIRST) &&
4091	    (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0, 1) == -1)) {
4092		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: failed to obtain active lock\n"));
4093		goto fail;
4094	}
4095	if (fstat(tdb->fd, &st) != 0) {
4096		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: fstat failed (%s)\n", strerror(errno)));
4097		goto fail;
4098	}
4099	if (st.st_ino != tdb->inode || st.st_dev != tdb->device) {
4100		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: file dev/inode has changed!\n"));
4101		goto fail;
4102	}
4103	tdb_mmap(tdb);
4104
4105	return 0;
4106
4107fail:
4108	tdb_close(tdb);
4109	return -1;
4110}
4111
4112/* reopen all tdb's */
4113int tdb_reopen_all(int parent_longlived)
4114{
4115	struct tdb_context *tdb;
4116
4117	for (tdb=tdbs; tdb; tdb = tdb->next) {
4118		/*
4119		 * If the parent is longlived (ie. a
4120		 * parent daemon architecture), we know
4121		 * it will keep it's active lock on a
4122		 * tdb opened with CLEAR_IF_FIRST. Thus
4123		 * for child processes we don't have to
4124		 * add an active lock. This is essential
4125		 * to improve performance on systems that
4126		 * keep POSIX locks as a non-scalable data
4127		 * structure in the kernel.
4128		 */
4129		if (parent_longlived) {
4130			/* Ensure no clear-if-first. */
4131			tdb->flags &= ~TDB_CLEAR_IF_FIRST;
4132		}
4133
4134		if (tdb_reopen(tdb) != 0)
4135			return -1;
4136	}
4137
4138	return 0;
4139}
4140