tdb.c revision 65f0aab98b20b5994a726ab90d355248bcddfffd
1/*
2URL: svn://svnanon.samba.org/samba/branches/SAMBA_4_0/source/lib/tdb/common
3Rev: 23590
4Last Changed Date: 2007-06-22 13:36:10 -0400 (Fri, 22 Jun 2007)
5*/
6 /*
7   trivial database library - standalone version
8
9   Copyright (C) Andrew Tridgell              1999-2005
10   Copyright (C) Jeremy Allison               2000-2006
11   Copyright (C) Paul `Rusty' Russell         2000
12
13     ** NOTE! The following LGPL license applies to the tdb
14     ** library. This does NOT imply that all of Samba is released
15     ** under the LGPL
16
17   This library is free software; you can redistribute it and/or
18   modify it under the terms of the GNU Lesser General Public
19   License as published by the Free Software Foundation; either
20   version 2 of the License, or (at your option) any later version.
21
22   This library is distributed in the hope that it will be useful,
23   but WITHOUT ANY WARRANTY; without even the implied warranty of
24   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
25   Lesser General Public License for more details.
26
27   You should have received a copy of the GNU Lesser General Public
28   License along with this library; if not, write to the Free Software
29   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
30*/
31
32#ifdef CONFIG_STAND_ALONE
33#define HAVE_MMAP
34#define HAVE_STRDUP
35#define HAVE_SYS_MMAN_H
36#define HAVE_UTIME_H
37#define HAVE_UTIME
38#endif
39#define _XOPEN_SOURCE 600
40
41#include <unistd.h>
42#include <stdio.h>
43#include <stdlib.h>
44#include <stdarg.h>
45#include <stddef.h>
46#include <errno.h>
47#include <string.h>
48#ifdef HAVE_SYS_SELECT_H
49#include <sys/select.h>
50#endif
51#include <sys/time.h>
52#include <sys/types.h>
53#include <time.h>
54#ifdef HAVE_UTIME_H
55#include <utime.h>
56#endif
57#include <sys/stat.h>
58#include <sys/file.h>
59#include <fcntl.h>
60
61#ifdef HAVE_SYS_MMAN_H
62#include <sys/mman.h>
63#endif
64
65#ifndef MAP_FILE
66#define MAP_FILE 0
67#endif
68
69#ifndef MAP_FAILED
70#define MAP_FAILED ((void *)-1)
71#endif
72
73#ifndef HAVE_STRDUP
74#define strdup rep_strdup
75static char *rep_strdup(const char *s)
76{
77	char *ret;
78	int length;
79	if (!s)
80		return NULL;
81
82	if (!length)
83		length = strlen(s);
84
85	ret = malloc(length + 1);
86	if (ret) {
87		strncpy(ret, s, length);
88		ret[length] = '\0';
89	}
90	return ret;
91}
92#endif
93
94#ifndef PRINTF_ATTRIBUTE
95#if (__GNUC__ >= 3) && (__GNUC_MINOR__ >= 1 )
96/** Use gcc attribute to check printf fns.  a1 is the 1-based index of
97 * the parameter containing the format, and a2 the index of the first
98 * argument. Note that some gcc 2.x versions don't handle this
99 * properly **/
100#define PRINTF_ATTRIBUTE(a1, a2) __attribute__ ((format (__printf__, a1, a2)))
101#else
102#define PRINTF_ATTRIBUTE(a1, a2)
103#endif
104#endif
105
106typedef int bool;
107
108#include "tdb.h"
109
110#ifndef u32
111#define u32 unsigned
112#endif
113
114#ifndef HAVE_GETPAGESIZE
115#define getpagesize() 0x2000
116#endif
117
118typedef u32 tdb_len_t;
119typedef u32 tdb_off_t;
120
121#ifndef offsetof
122#define offsetof(t,f) ((unsigned int)&((t *)0)->f)
123#endif
124
125#define TDB_MAGIC_FOOD "TDB file\n"
126#define TDB_VERSION (0x26011967 + 6)
127#define TDB_MAGIC (0x26011999U)
128#define TDB_FREE_MAGIC (~TDB_MAGIC)
129#define TDB_DEAD_MAGIC (0xFEE1DEAD)
130#define TDB_RECOVERY_MAGIC (0xf53bc0e7U)
131#define TDB_ALIGNMENT 4
132#define MIN_REC_SIZE (2*sizeof(struct list_struct) + TDB_ALIGNMENT)
133#define DEFAULT_HASH_SIZE 131
134#define FREELIST_TOP (sizeof(struct tdb_header))
135#define TDB_ALIGN(x,a) (((x) + (a)-1) & ~((a)-1))
136#define TDB_BYTEREV(x) (((((x)&0xff)<<24)|((x)&0xFF00)<<8)|(((x)>>8)&0xFF00)|((x)>>24))
137#define TDB_DEAD(r) ((r)->magic == TDB_DEAD_MAGIC)
138#define TDB_BAD_MAGIC(r) ((r)->magic != TDB_MAGIC && !TDB_DEAD(r))
139#define TDB_HASH_TOP(hash) (FREELIST_TOP + (BUCKET(hash)+1)*sizeof(tdb_off_t))
140#define TDB_HASHTABLE_SIZE(tdb) ((tdb->header.hash_size+1)*sizeof(tdb_off_t))
141#define TDB_DATA_START(hash_size) TDB_HASH_TOP(hash_size-1)
142#define TDB_RECOVERY_HEAD offsetof(struct tdb_header, recovery_start)
143#define TDB_SEQNUM_OFS    offsetof(struct tdb_header, sequence_number)
144#define TDB_PAD_BYTE 0x42
145#define TDB_PAD_U32  0x42424242
146
147/* NB assumes there is a local variable called "tdb" that is the
148 * current context, also takes doubly-parenthesized print-style
149 * argument. */
150#define TDB_LOG(x) tdb->log.log_fn x
151
152/* lock offsets */
153#define GLOBAL_LOCK      0
154#define ACTIVE_LOCK      4
155#define TRANSACTION_LOCK 8
156
157/* free memory if the pointer is valid and zero the pointer */
158#ifndef SAFE_FREE
159#define SAFE_FREE(x) do { if ((x) != NULL) {free(x); (x)=NULL;} } while(0)
160#endif
161
162#define BUCKET(hash) ((hash) % tdb->header.hash_size)
163
164#define DOCONV() (tdb->flags & TDB_CONVERT)
165#define CONVERT(x) (DOCONV() ? tdb_convert(&x, sizeof(x)) : &x)
166
167
168/* the body of the database is made of one list_struct for the free space
169   plus a separate data list for each hash value */
170struct list_struct {
171	tdb_off_t next; /* offset of the next record in the list */
172	tdb_len_t rec_len; /* total byte length of record */
173	tdb_len_t key_len; /* byte length of key */
174	tdb_len_t data_len; /* byte length of data */
175	u32 full_hash; /* the full 32 bit hash of the key */
176	u32 magic;   /* try to catch errors */
177	/* the following union is implied:
178		union {
179			char record[rec_len];
180			struct {
181				char key[key_len];
182				char data[data_len];
183			}
184			u32 totalsize; (tailer)
185		}
186	*/
187};
188
189
190/* this is stored at the front of every database */
191struct tdb_header {
192	char magic_food[32]; /* for /etc/magic */
193	u32 version; /* version of the code */
194	u32 hash_size; /* number of hash entries */
195	tdb_off_t rwlocks; /* obsolete - kept to detect old formats */
196	tdb_off_t recovery_start; /* offset of transaction recovery region */
197	tdb_off_t sequence_number; /* used when TDB_SEQNUM is set */
198	tdb_off_t reserved[29];
199};
200
201struct tdb_lock_type {
202	int list;
203	u32 count;
204	u32 ltype;
205};
206
207struct tdb_traverse_lock {
208	struct tdb_traverse_lock *next;
209	u32 off;
210	u32 hash;
211	int lock_rw;
212};
213
214
215struct tdb_methods {
216	int (*tdb_read)(struct tdb_context *, tdb_off_t , void *, tdb_len_t , int );
217	int (*tdb_write)(struct tdb_context *, tdb_off_t, const void *, tdb_len_t);
218	void (*next_hash_chain)(struct tdb_context *, u32 *);
219	int (*tdb_oob)(struct tdb_context *, tdb_off_t , int );
220	int (*tdb_expand_file)(struct tdb_context *, tdb_off_t , tdb_off_t );
221	int (*tdb_brlock)(struct tdb_context *, tdb_off_t , int, int, int, size_t);
222};
223
224struct tdb_context {
225	char *name; /* the name of the database */
226	void *map_ptr; /* where it is currently mapped */
227	int fd; /* open file descriptor for the database */
228	tdb_len_t map_size; /* how much space has been mapped */
229	int read_only; /* opened read-only */
230	int traverse_read; /* read-only traversal */
231	struct tdb_lock_type global_lock;
232	int num_lockrecs;
233	struct tdb_lock_type *lockrecs; /* only real locks, all with count>0 */
234	enum TDB_ERROR ecode; /* error code for last tdb error */
235	struct tdb_header header; /* a cached copy of the header */
236	u32 flags; /* the flags passed to tdb_open */
237	struct tdb_traverse_lock travlocks; /* current traversal locks */
238	struct tdb_context *next; /* all tdbs to avoid multiple opens */
239	dev_t device;	/* uniquely identifies this tdb */
240	ino_t inode;	/* uniquely identifies this tdb */
241	struct tdb_logging_context log;
242	unsigned int (*hash_fn)(TDB_DATA *key);
243	int open_flags; /* flags used in the open - needed by reopen */
244	unsigned int num_locks; /* number of chain locks held */
245	const struct tdb_methods *methods;
246	struct tdb_transaction *transaction;
247	int page_size;
248	int max_dead_records;
249	bool have_transaction_lock;
250};
251
252
253/*
254  internal prototypes
255*/
256static int tdb_munmap(struct tdb_context *tdb);
257static void tdb_mmap(struct tdb_context *tdb);
258static int tdb_lock(struct tdb_context *tdb, int list, int ltype);
259static int tdb_unlock(struct tdb_context *tdb, int list, int ltype);
260static int tdb_brlock(struct tdb_context *tdb, tdb_off_t offset, int rw_type, int lck_type, int probe, size_t len);
261static int tdb_transaction_lock(struct tdb_context *tdb, int ltype);
262static int tdb_transaction_unlock(struct tdb_context *tdb);
263static int tdb_brlock_upgrade(struct tdb_context *tdb, tdb_off_t offset, size_t len);
264static int tdb_write_lock_record(struct tdb_context *tdb, tdb_off_t off);
265static int tdb_write_unlock_record(struct tdb_context *tdb, tdb_off_t off);
266static int tdb_ofs_read(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
267static int tdb_ofs_write(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
268static void *tdb_convert(void *buf, u32 size);
269static int tdb_free(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec);
270static tdb_off_t tdb_allocate(struct tdb_context *tdb, tdb_len_t length, struct list_struct *rec);
271static int tdb_ofs_read(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
272static int tdb_ofs_write(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
273static int tdb_lock_record(struct tdb_context *tdb, tdb_off_t off);
274static int tdb_unlock_record(struct tdb_context *tdb, tdb_off_t off);
275static int tdb_rec_read(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec);
276static int tdb_rec_write(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec);
277static int tdb_do_delete(struct tdb_context *tdb, tdb_off_t rec_ptr, struct list_struct *rec);
278static unsigned char *tdb_alloc_read(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t len);
279static int tdb_parse_data(struct tdb_context *tdb, TDB_DATA key,
280		   tdb_off_t offset, tdb_len_t len,
281		   int (*parser)(TDB_DATA key, TDB_DATA data,
282				 void *private_data),
283		   void *private_data);
284static tdb_off_t tdb_find_lock_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash, int locktype,
285			   struct list_struct *rec);
286static void tdb_io_init(struct tdb_context *tdb);
287static int tdb_expand(struct tdb_context *tdb, tdb_off_t size);
288static int tdb_rec_free_read(struct tdb_context *tdb, tdb_off_t off,
289		      struct list_struct *rec);
290
291
292/* file: error.c */
293
294enum TDB_ERROR tdb_error(struct tdb_context *tdb)
295{
296	return tdb->ecode;
297}
298
299static struct tdb_errname {
300	enum TDB_ERROR ecode; const char *estring;
301} emap[] = { {TDB_SUCCESS, "Success"},
302	     {TDB_ERR_CORRUPT, "Corrupt database"},
303	     {TDB_ERR_IO, "IO Error"},
304	     {TDB_ERR_LOCK, "Locking error"},
305	     {TDB_ERR_OOM, "Out of memory"},
306	     {TDB_ERR_EXISTS, "Record exists"},
307	     {TDB_ERR_NOLOCK, "Lock exists on other keys"},
308	     {TDB_ERR_EINVAL, "Invalid parameter"},
309	     {TDB_ERR_NOEXIST, "Record does not exist"},
310	     {TDB_ERR_RDONLY, "write not permitted"} };
311
312/* Error string for the last tdb error */
313const char *tdb_errorstr(struct tdb_context *tdb)
314{
315	u32 i;
316	for (i = 0; i < sizeof(emap) / sizeof(struct tdb_errname); i++)
317		if (tdb->ecode == emap[i].ecode)
318			return emap[i].estring;
319	return "Invalid error code";
320}
321
322/* file: lock.c */
323
324#define TDB_MARK_LOCK 0x80000000
325
326/* a byte range locking function - return 0 on success
327   this functions locks/unlocks 1 byte at the specified offset.
328
329   On error, errno is also set so that errors are passed back properly
330   through tdb_open().
331
332   note that a len of zero means lock to end of file
333*/
334int tdb_brlock(struct tdb_context *tdb, tdb_off_t offset,
335	       int rw_type, int lck_type, int probe, size_t len)
336{
337	struct flock fl;
338	int ret;
339
340	if (tdb->flags & TDB_NOLOCK) {
341		return 0;
342	}
343
344	if ((rw_type == F_WRLCK) && (tdb->read_only || tdb->traverse_read)) {
345		tdb->ecode = TDB_ERR_RDONLY;
346		return -1;
347	}
348
349	fl.l_type = rw_type;
350	fl.l_whence = SEEK_SET;
351	fl.l_start = offset;
352	fl.l_len = len;
353	fl.l_pid = 0;
354
355	do {
356		ret = fcntl(tdb->fd,lck_type,&fl);
357	} while (ret == -1 && errno == EINTR);
358
359	if (ret == -1) {
360		/* Generic lock error. errno set by fcntl.
361		 * EAGAIN is an expected return from non-blocking
362		 * locks. */
363		if (!probe && lck_type != F_SETLK) {
364			/* Ensure error code is set for log fun to examine. */
365			tdb->ecode = TDB_ERR_LOCK;
366			TDB_LOG((tdb, TDB_DEBUG_TRACE,"tdb_brlock failed (fd=%d) at offset %d rw_type=%d lck_type=%d len=%d\n",
367				 tdb->fd, offset, rw_type, lck_type, (int)len));
368		}
369		return TDB_ERRCODE(TDB_ERR_LOCK, -1);
370	}
371	return 0;
372}
373
374
375/*
376  upgrade a read lock to a write lock. This needs to be handled in a
377  special way as some OSes (such as solaris) have too conservative
378  deadlock detection and claim a deadlock when progress can be
379  made. For those OSes we may loop for a while.
380*/
381int tdb_brlock_upgrade(struct tdb_context *tdb, tdb_off_t offset, size_t len)
382{
383	int count = 1000;
384	while (count--) {
385		struct timeval tv;
386		if (tdb_brlock(tdb, offset, F_WRLCK, F_SETLKW, 1, len) == 0) {
387			return 0;
388		}
389		if (errno != EDEADLK) {
390			break;
391		}
392		/* sleep for as short a time as we can - more portable than usleep() */
393		tv.tv_sec = 0;
394		tv.tv_usec = 1;
395		select(0, NULL, NULL, NULL, &tv);
396	}
397	TDB_LOG((tdb, TDB_DEBUG_TRACE,"tdb_brlock_upgrade failed at offset %d\n", offset));
398	return -1;
399}
400
401
402/* lock a list in the database. list -1 is the alloc list */
403static int _tdb_lock(struct tdb_context *tdb, int list, int ltype, int op)
404{
405	struct tdb_lock_type *new_lck;
406	int i;
407	bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
408
409	ltype &= ~TDB_MARK_LOCK;
410
411	/* a global lock allows us to avoid per chain locks */
412	if (tdb->global_lock.count &&
413	    (ltype == tdb->global_lock.ltype || ltype == F_RDLCK)) {
414		return 0;
415	}
416
417	if (tdb->global_lock.count) {
418		return TDB_ERRCODE(TDB_ERR_LOCK, -1);
419	}
420
421	if (list < -1 || list >= (int)tdb->header.hash_size) {
422		TDB_LOG((tdb, TDB_DEBUG_ERROR,"tdb_lock: invalid list %d for ltype=%d\n",
423			   list, ltype));
424		return -1;
425	}
426	if (tdb->flags & TDB_NOLOCK)
427		return 0;
428
429	for (i=0; i<tdb->num_lockrecs; i++) {
430		if (tdb->lockrecs[i].list == list) {
431			if (tdb->lockrecs[i].count == 0) {
432				/*
433				 * Can't happen, see tdb_unlock(). It should
434				 * be an assert.
435				 */
436				TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lock: "
437					 "lck->count == 0 for list %d", list));
438			}
439			/*
440			 * Just increment the in-memory struct, posix locks
441			 * don't stack.
442			 */
443			tdb->lockrecs[i].count++;
444			return 0;
445		}
446	}
447
448	new_lck = (struct tdb_lock_type *)realloc(
449		tdb->lockrecs,
450		sizeof(*tdb->lockrecs) * (tdb->num_lockrecs+1));
451	if (new_lck == NULL) {
452		errno = ENOMEM;
453		return -1;
454	}
455	tdb->lockrecs = new_lck;
456
457	/* Since fcntl locks don't nest, we do a lock for the first one,
458	   and simply bump the count for future ones */
459	if (!mark_lock &&
460	    tdb->methods->tdb_brlock(tdb,FREELIST_TOP+4*list, ltype, op,
461				     0, 1)) {
462		return -1;
463	}
464
465	tdb->num_locks++;
466
467	tdb->lockrecs[tdb->num_lockrecs].list = list;
468	tdb->lockrecs[tdb->num_lockrecs].count = 1;
469	tdb->lockrecs[tdb->num_lockrecs].ltype = ltype;
470	tdb->num_lockrecs += 1;
471
472	return 0;
473}
474
475/* lock a list in the database. list -1 is the alloc list */
476int tdb_lock(struct tdb_context *tdb, int list, int ltype)
477{
478	int ret;
479	ret = _tdb_lock(tdb, list, ltype, F_SETLKW);
480	if (ret) {
481		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lock failed on list %d "
482			 "ltype=%d (%s)\n",  list, ltype, strerror(errno)));
483	}
484	return ret;
485}
486
487/* lock a list in the database. list -1 is the alloc list. non-blocking lock */
488int tdb_lock_nonblock(struct tdb_context *tdb, int list, int ltype)
489{
490	return _tdb_lock(tdb, list, ltype, F_SETLK);
491}
492
493
494/* unlock the database: returns void because it's too late for errors. */
495	/* changed to return int it may be interesting to know there
496	   has been an error  --simo */
497int tdb_unlock(struct tdb_context *tdb, int list, int ltype)
498{
499	int ret = -1;
500	int i;
501	struct tdb_lock_type *lck = NULL;
502	bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
503
504	ltype &= ~TDB_MARK_LOCK;
505
506	/* a global lock allows us to avoid per chain locks */
507	if (tdb->global_lock.count &&
508	    (ltype == tdb->global_lock.ltype || ltype == F_RDLCK)) {
509		return 0;
510	}
511
512	if (tdb->global_lock.count) {
513		return TDB_ERRCODE(TDB_ERR_LOCK, -1);
514	}
515
516	if (tdb->flags & TDB_NOLOCK)
517		return 0;
518
519	/* Sanity checks */
520	if (list < -1 || list >= (int)tdb->header.hash_size) {
521		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: list %d invalid (%d)\n", list, tdb->header.hash_size));
522		return ret;
523	}
524
525	for (i=0; i<tdb->num_lockrecs; i++) {
526		if (tdb->lockrecs[i].list == list) {
527			lck = &tdb->lockrecs[i];
528			break;
529		}
530	}
531
532	if ((lck == NULL) || (lck->count == 0)) {
533		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: count is 0\n"));
534		return -1;
535	}
536
537	if (lck->count > 1) {
538		lck->count--;
539		return 0;
540	}
541
542	/*
543	 * This lock has count==1 left, so we need to unlock it in the
544	 * kernel. We don't bother with decrementing the in-memory array
545	 * element, we're about to overwrite it with the last array element
546	 * anyway.
547	 */
548
549	if (mark_lock) {
550		ret = 0;
551	} else {
552		ret = tdb->methods->tdb_brlock(tdb, FREELIST_TOP+4*list, F_UNLCK,
553					       F_SETLKW, 0, 1);
554	}
555	tdb->num_locks--;
556
557	/*
558	 * Shrink the array by overwriting the element just unlocked with the
559	 * last array element.
560	 */
561
562	if (tdb->num_lockrecs > 1) {
563		*lck = tdb->lockrecs[tdb->num_lockrecs-1];
564	}
565	tdb->num_lockrecs -= 1;
566
567	/*
568	 * We don't bother with realloc when the array shrinks, but if we have
569	 * a completely idle tdb we should get rid of the locked array.
570	 */
571
572	if (tdb->num_lockrecs == 0) {
573		SAFE_FREE(tdb->lockrecs);
574	}
575
576	if (ret)
577		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: An error occurred unlocking!\n"));
578	return ret;
579}
580
581/*
582  get the transaction lock
583 */
584int tdb_transaction_lock(struct tdb_context *tdb, int ltype)
585{
586	if (tdb->have_transaction_lock || tdb->global_lock.count) {
587		return 0;
588	}
589	if (tdb->methods->tdb_brlock(tdb, TRANSACTION_LOCK, ltype,
590				     F_SETLKW, 0, 1) == -1) {
591		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_lock: failed to get transaction lock\n"));
592		tdb->ecode = TDB_ERR_LOCK;
593		return -1;
594	}
595	tdb->have_transaction_lock = 1;
596	return 0;
597}
598
599/*
600  release the transaction lock
601 */
602int tdb_transaction_unlock(struct tdb_context *tdb)
603{
604	int ret;
605	if (!tdb->have_transaction_lock) {
606		return 0;
607	}
608	ret = tdb->methods->tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
609	if (ret == 0) {
610		tdb->have_transaction_lock = 0;
611	}
612	return ret;
613}
614
615
616
617
618/* lock/unlock entire database */
619static int _tdb_lockall(struct tdb_context *tdb, int ltype, int op)
620{
621	bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
622
623	ltype &= ~TDB_MARK_LOCK;
624
625	/* There are no locks on read-only dbs */
626	if (tdb->read_only || tdb->traverse_read)
627		return TDB_ERRCODE(TDB_ERR_LOCK, -1);
628
629	if (tdb->global_lock.count && tdb->global_lock.ltype == ltype) {
630		tdb->global_lock.count++;
631		return 0;
632	}
633
634	if (tdb->global_lock.count) {
635		/* a global lock of a different type exists */
636		return TDB_ERRCODE(TDB_ERR_LOCK, -1);
637	}
638
639	if (tdb->num_locks != 0) {
640		/* can't combine global and chain locks */
641		return TDB_ERRCODE(TDB_ERR_LOCK, -1);
642	}
643
644	if (!mark_lock &&
645	    tdb->methods->tdb_brlock(tdb, FREELIST_TOP, ltype, op,
646				     0, 4*tdb->header.hash_size)) {
647		if (op == F_SETLKW) {
648			TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lockall failed (%s)\n", strerror(errno)));
649		}
650		return -1;
651	}
652
653	tdb->global_lock.count = 1;
654	tdb->global_lock.ltype = ltype;
655
656	return 0;
657}
658
659
660
661/* unlock entire db */
662static int _tdb_unlockall(struct tdb_context *tdb, int ltype)
663{
664	bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
665
666	ltype &= ~TDB_MARK_LOCK;
667
668	/* There are no locks on read-only dbs */
669	if (tdb->read_only || tdb->traverse_read) {
670		return TDB_ERRCODE(TDB_ERR_LOCK, -1);
671	}
672
673	if (tdb->global_lock.ltype != ltype || tdb->global_lock.count == 0) {
674		return TDB_ERRCODE(TDB_ERR_LOCK, -1);
675	}
676
677	if (tdb->global_lock.count > 1) {
678		tdb->global_lock.count--;
679		return 0;
680	}
681
682	if (!mark_lock &&
683	    tdb->methods->tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW,
684				     0, 4*tdb->header.hash_size)) {
685		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlockall failed (%s)\n", strerror(errno)));
686		return -1;
687	}
688
689	tdb->global_lock.count = 0;
690	tdb->global_lock.ltype = 0;
691
692	return 0;
693}
694
695/* lock entire database with write lock */
696int tdb_lockall(struct tdb_context *tdb)
697{
698	return _tdb_lockall(tdb, F_WRLCK, F_SETLKW);
699}
700
701/* lock entire database with write lock - mark only */
702int tdb_lockall_mark(struct tdb_context *tdb)
703{
704	return _tdb_lockall(tdb, F_WRLCK | TDB_MARK_LOCK, F_SETLKW);
705}
706
707/* unlock entire database with write lock - unmark only */
708int tdb_lockall_unmark(struct tdb_context *tdb)
709{
710	return _tdb_unlockall(tdb, F_WRLCK | TDB_MARK_LOCK);
711}
712
713/* lock entire database with write lock - nonblocking varient */
714int tdb_lockall_nonblock(struct tdb_context *tdb)
715{
716	return _tdb_lockall(tdb, F_WRLCK, F_SETLK);
717}
718
719/* unlock entire database with write lock */
720int tdb_unlockall(struct tdb_context *tdb)
721{
722	return _tdb_unlockall(tdb, F_WRLCK);
723}
724
725/* lock entire database with read lock */
726int tdb_lockall_read(struct tdb_context *tdb)
727{
728	return _tdb_lockall(tdb, F_RDLCK, F_SETLKW);
729}
730
731/* lock entire database with read lock - nonblock varient */
732int tdb_lockall_read_nonblock(struct tdb_context *tdb)
733{
734	return _tdb_lockall(tdb, F_RDLCK, F_SETLK);
735}
736
737/* unlock entire database with read lock */
738int tdb_unlockall_read(struct tdb_context *tdb)
739{
740	return _tdb_unlockall(tdb, F_RDLCK);
741}
742
743/* lock/unlock one hash chain. This is meant to be used to reduce
744   contention - it cannot guarantee how many records will be locked */
745int tdb_chainlock(struct tdb_context *tdb, TDB_DATA key)
746{
747	return tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
748}
749
750/* lock/unlock one hash chain, non-blocking. This is meant to be used
751   to reduce contention - it cannot guarantee how many records will be
752   locked */
753int tdb_chainlock_nonblock(struct tdb_context *tdb, TDB_DATA key)
754{
755	return tdb_lock_nonblock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
756}
757
758/* mark a chain as locked without actually locking it. Warning! use with great caution! */
759int tdb_chainlock_mark(struct tdb_context *tdb, TDB_DATA key)
760{
761	return tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK | TDB_MARK_LOCK);
762}
763
764/* unmark a chain as locked without actually locking it. Warning! use with great caution! */
765int tdb_chainlock_unmark(struct tdb_context *tdb, TDB_DATA key)
766{
767	return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK | TDB_MARK_LOCK);
768}
769
770int tdb_chainunlock(struct tdb_context *tdb, TDB_DATA key)
771{
772	return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
773}
774
775int tdb_chainlock_read(struct tdb_context *tdb, TDB_DATA key)
776{
777	return tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_RDLCK);
778}
779
780int tdb_chainunlock_read(struct tdb_context *tdb, TDB_DATA key)
781{
782	return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_RDLCK);
783}
784
785
786
787/* record lock stops delete underneath */
788int tdb_lock_record(struct tdb_context *tdb, tdb_off_t off)
789{
790	return off ? tdb->methods->tdb_brlock(tdb, off, F_RDLCK, F_SETLKW, 0, 1) : 0;
791}
792
793/*
794  Write locks override our own fcntl readlocks, so check it here.
795  Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
796  an error to fail to get the lock here.
797*/
798int tdb_write_lock_record(struct tdb_context *tdb, tdb_off_t off)
799{
800	struct tdb_traverse_lock *i;
801	for (i = &tdb->travlocks; i; i = i->next)
802		if (i->off == off)
803			return -1;
804	return tdb->methods->tdb_brlock(tdb, off, F_WRLCK, F_SETLK, 1, 1);
805}
806
807/*
808  Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
809  an error to fail to get the lock here.
810*/
811int tdb_write_unlock_record(struct tdb_context *tdb, tdb_off_t off)
812{
813	return tdb->methods->tdb_brlock(tdb, off, F_UNLCK, F_SETLK, 0, 1);
814}
815
816/* fcntl locks don't stack: avoid unlocking someone else's */
817int tdb_unlock_record(struct tdb_context *tdb, tdb_off_t off)
818{
819	struct tdb_traverse_lock *i;
820	u32 count = 0;
821
822	if (off == 0)
823		return 0;
824	for (i = &tdb->travlocks; i; i = i->next)
825		if (i->off == off)
826			count++;
827	return (count == 1 ? tdb->methods->tdb_brlock(tdb, off, F_UNLCK, F_SETLKW, 0, 1) : 0);
828}
829
830/* file: io.c */
831
832/* check for an out of bounds access - if it is out of bounds then
833   see if the database has been expanded by someone else and expand
834   if necessary
835   note that "len" is the minimum length needed for the db
836*/
837static int tdb_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
838{
839	struct stat st;
840	if (len <= tdb->map_size)
841		return 0;
842	if (tdb->flags & TDB_INTERNAL) {
843		if (!probe) {
844			/* Ensure ecode is set for log fn. */
845			tdb->ecode = TDB_ERR_IO;
846			TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_oob len %d beyond internal malloc size %d\n",
847				 (int)len, (int)tdb->map_size));
848		}
849		return TDB_ERRCODE(TDB_ERR_IO, -1);
850	}
851
852	if (fstat(tdb->fd, &st) == -1) {
853		return TDB_ERRCODE(TDB_ERR_IO, -1);
854	}
855
856	if (st.st_size < (size_t)len) {
857		if (!probe) {
858			/* Ensure ecode is set for log fn. */
859			tdb->ecode = TDB_ERR_IO;
860			TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_oob len %d beyond eof at %d\n",
861				 (int)len, (int)st.st_size));
862		}
863		return TDB_ERRCODE(TDB_ERR_IO, -1);
864	}
865
866	/* Unmap, update size, remap */
867	if (tdb_munmap(tdb) == -1)
868		return TDB_ERRCODE(TDB_ERR_IO, -1);
869	tdb->map_size = st.st_size;
870	tdb_mmap(tdb);
871	return 0;
872}
873
874/* write a lump of data at a specified offset */
875static int tdb_write(struct tdb_context *tdb, tdb_off_t off,
876		     const void *buf, tdb_len_t len)
877{
878	if (len == 0) {
879		return 0;
880	}
881
882	if (tdb->read_only || tdb->traverse_read) {
883		tdb->ecode = TDB_ERR_RDONLY;
884		return -1;
885	}
886
887	if (tdb->methods->tdb_oob(tdb, off + len, 0) != 0)
888		return -1;
889
890	if (tdb->map_ptr) {
891		memcpy(off + (char *)tdb->map_ptr, buf, len);
892	} else if (pwrite(tdb->fd, buf, len, off) != (ssize_t)len) {
893		/* Ensure ecode is set for log fn. */
894		tdb->ecode = TDB_ERR_IO;
895		TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_write failed at %d len=%d (%s)\n",
896			   off, len, strerror(errno)));
897		return TDB_ERRCODE(TDB_ERR_IO, -1);
898	}
899	return 0;
900}
901
902/* Endian conversion: we only ever deal with 4 byte quantities */
903void *tdb_convert(void *buf, u32 size)
904{
905	u32 i, *p = (u32 *)buf;
906	for (i = 0; i < size / 4; i++)
907		p[i] = TDB_BYTEREV(p[i]);
908	return buf;
909}
910
911
912/* read a lump of data at a specified offset, maybe convert */
913static int tdb_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
914		    tdb_len_t len, int cv)
915{
916	if (tdb->methods->tdb_oob(tdb, off + len, 0) != 0) {
917		return -1;
918	}
919
920	if (tdb->map_ptr) {
921		memcpy(buf, off + (char *)tdb->map_ptr, len);
922	} else {
923		ssize_t ret = pread(tdb->fd, buf, len, off);
924		if (ret != (ssize_t)len) {
925			/* Ensure ecode is set for log fn. */
926			tdb->ecode = TDB_ERR_IO;
927			TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_read failed at %d "
928				 "len=%d ret=%d (%s) map_size=%d\n",
929				 (int)off, (int)len, (int)ret, strerror(errno),
930				 (int)tdb->map_size));
931			return TDB_ERRCODE(TDB_ERR_IO, -1);
932		}
933	}
934	if (cv) {
935		tdb_convert(buf, len);
936	}
937	return 0;
938}
939
940
941
942/*
943  do an unlocked scan of the hash table heads to find the next non-zero head. The value
944  will then be confirmed with the lock held
945*/
946static void tdb_next_hash_chain(struct tdb_context *tdb, u32 *chain)
947{
948	u32 h = *chain;
949	if (tdb->map_ptr) {
950		for (;h < tdb->header.hash_size;h++) {
951			if (0 != *(u32 *)(TDB_HASH_TOP(h) + (unsigned char *)tdb->map_ptr)) {
952				break;
953			}
954		}
955	} else {
956		u32 off=0;
957		for (;h < tdb->header.hash_size;h++) {
958			if (tdb_ofs_read(tdb, TDB_HASH_TOP(h), &off) != 0 || off != 0) {
959				break;
960			}
961		}
962	}
963	(*chain) = h;
964}
965
966
967int tdb_munmap(struct tdb_context *tdb)
968{
969	if (tdb->flags & TDB_INTERNAL)
970		return 0;
971
972#ifdef HAVE_MMAP
973	if (tdb->map_ptr) {
974		int ret = munmap(tdb->map_ptr, tdb->map_size);
975		if (ret != 0)
976			return ret;
977	}
978#endif
979	tdb->map_ptr = NULL;
980	return 0;
981}
982
983void tdb_mmap(struct tdb_context *tdb)
984{
985	if (tdb->flags & TDB_INTERNAL)
986		return;
987
988#ifdef HAVE_MMAP
989	if (!(tdb->flags & TDB_NOMMAP)) {
990		tdb->map_ptr = mmap(NULL, tdb->map_size,
991				    PROT_READ|(tdb->read_only? 0:PROT_WRITE),
992				    MAP_SHARED|MAP_FILE, tdb->fd, 0);
993
994		/*
995		 * NB. When mmap fails it returns MAP_FAILED *NOT* NULL !!!!
996		 */
997
998		if (tdb->map_ptr == MAP_FAILED) {
999			tdb->map_ptr = NULL;
1000			TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_mmap failed for size %d (%s)\n",
1001				 tdb->map_size, strerror(errno)));
1002		}
1003	} else {
1004		tdb->map_ptr = NULL;
1005	}
1006#else
1007	tdb->map_ptr = NULL;
1008#endif
1009}
1010
1011/* expand a file.  we prefer to use ftruncate, as that is what posix
1012  says to use for mmap expansion */
1013static int tdb_expand_file(struct tdb_context *tdb, tdb_off_t size, tdb_off_t addition)
1014{
1015	char buf[1024];
1016
1017	if (tdb->read_only || tdb->traverse_read) {
1018		tdb->ecode = TDB_ERR_RDONLY;
1019		return -1;
1020	}
1021
1022	if (ftruncate(tdb->fd, size+addition) == -1) {
1023		char b = 0;
1024		if (pwrite(tdb->fd,  &b, 1, (size+addition) - 1) != 1) {
1025			TDB_LOG((tdb, TDB_DEBUG_FATAL, "expand_file to %d failed (%s)\n",
1026				 size+addition, strerror(errno)));
1027			return -1;
1028		}
1029	}
1030
1031	/* now fill the file with something. This ensures that the
1032	   file isn't sparse, which would be very bad if we ran out of
1033	   disk. This must be done with write, not via mmap */
1034	memset(buf, TDB_PAD_BYTE, sizeof(buf));
1035	while (addition) {
1036		int n = addition>sizeof(buf)?sizeof(buf):addition;
1037		int ret = pwrite(tdb->fd, buf, n, size);
1038		if (ret != n) {
1039			TDB_LOG((tdb, TDB_DEBUG_FATAL, "expand_file write of %d failed (%s)\n",
1040				   n, strerror(errno)));
1041			return -1;
1042		}
1043		addition -= n;
1044		size += n;
1045	}
1046	return 0;
1047}
1048
1049
1050/* expand the database at least size bytes by expanding the underlying
1051   file and doing the mmap again if necessary */
1052int tdb_expand(struct tdb_context *tdb, tdb_off_t size)
1053{
1054	struct list_struct rec;
1055	tdb_off_t offset;
1056
1057	if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
1058		TDB_LOG((tdb, TDB_DEBUG_ERROR, "lock failed in tdb_expand\n"));
1059		return -1;
1060	}
1061
1062	/* must know about any previous expansions by another process */
1063	tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
1064
1065	/* always make room for at least 10 more records, and round
1066           the database up to a multiple of the page size */
1067	size = TDB_ALIGN(tdb->map_size + size*10, tdb->page_size) - tdb->map_size;
1068
1069	if (!(tdb->flags & TDB_INTERNAL))
1070		tdb_munmap(tdb);
1071
1072	/*
1073	 * We must ensure the file is unmapped before doing this
1074	 * to ensure consistency with systems like OpenBSD where
1075	 * writes and mmaps are not consistent.
1076	 */
1077
1078	/* expand the file itself */
1079	if (!(tdb->flags & TDB_INTERNAL)) {
1080		if (tdb->methods->tdb_expand_file(tdb, tdb->map_size, size) != 0)
1081			goto fail;
1082	}
1083
1084	tdb->map_size += size;
1085
1086	if (tdb->flags & TDB_INTERNAL) {
1087		char *new_map_ptr = (char *)realloc(tdb->map_ptr,
1088						    tdb->map_size);
1089		if (!new_map_ptr) {
1090			tdb->map_size -= size;
1091			goto fail;
1092		}
1093		tdb->map_ptr = new_map_ptr;
1094	} else {
1095		/*
1096		 * We must ensure the file is remapped before adding the space
1097		 * to ensure consistency with systems like OpenBSD where
1098		 * writes and mmaps are not consistent.
1099		 */
1100
1101		/* We're ok if the mmap fails as we'll fallback to read/write */
1102		tdb_mmap(tdb);
1103	}
1104
1105	/* form a new freelist record */
1106	memset(&rec,'\0',sizeof(rec));
1107	rec.rec_len = size - sizeof(rec);
1108
1109	/* link it into the free list */
1110	offset = tdb->map_size - size;
1111	if (tdb_free(tdb, offset, &rec) == -1)
1112		goto fail;
1113
1114	tdb_unlock(tdb, -1, F_WRLCK);
1115	return 0;
1116 fail:
1117	tdb_unlock(tdb, -1, F_WRLCK);
1118	return -1;
1119}
1120
1121/* read/write a tdb_off_t */
1122int tdb_ofs_read(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d)
1123{
1124	return tdb->methods->tdb_read(tdb, offset, (char*)d, sizeof(*d), DOCONV());
1125}
1126
1127int tdb_ofs_write(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d)
1128{
1129	tdb_off_t off = *d;
1130	return tdb->methods->tdb_write(tdb, offset, CONVERT(off), sizeof(*d));
1131}
1132
1133
1134/* read a lump of data, allocating the space for it */
1135unsigned char *tdb_alloc_read(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t len)
1136{
1137	unsigned char *buf;
1138
1139	/* some systems don't like zero length malloc */
1140	if (len == 0) {
1141		len = 1;
1142	}
1143
1144	if (!(buf = (unsigned char *)malloc(len))) {
1145		/* Ensure ecode is set for log fn. */
1146		tdb->ecode = TDB_ERR_OOM;
1147		TDB_LOG((tdb, TDB_DEBUG_ERROR,"tdb_alloc_read malloc failed len=%d (%s)\n",
1148			   len, strerror(errno)));
1149		return TDB_ERRCODE(TDB_ERR_OOM, buf);
1150	}
1151	if (tdb->methods->tdb_read(tdb, offset, buf, len, 0) == -1) {
1152		SAFE_FREE(buf);
1153		return NULL;
1154	}
1155	return buf;
1156}
1157
1158/* Give a piece of tdb data to a parser */
1159
1160int tdb_parse_data(struct tdb_context *tdb, TDB_DATA key,
1161		   tdb_off_t offset, tdb_len_t len,
1162		   int (*parser)(TDB_DATA key, TDB_DATA data,
1163				 void *private_data),
1164		   void *private_data)
1165{
1166	TDB_DATA data;
1167	int result;
1168
1169	data.dsize = len;
1170
1171	if ((tdb->transaction == NULL) && (tdb->map_ptr != NULL)) {
1172		/*
1173		 * Optimize by avoiding the malloc/memcpy/free, point the
1174		 * parser directly at the mmap area.
1175		 */
1176		if (tdb->methods->tdb_oob(tdb, offset+len, 0) != 0) {
1177			return -1;
1178		}
1179		data.dptr = offset + (unsigned char *)tdb->map_ptr;
1180		return parser(key, data, private_data);
1181	}
1182
1183	if (!(data.dptr = tdb_alloc_read(tdb, offset, len))) {
1184		return -1;
1185	}
1186
1187	result = parser(key, data, private_data);
1188	free(data.dptr);
1189	return result;
1190}
1191
1192/* read/write a record */
1193int tdb_rec_read(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec)
1194{
1195	if (tdb->methods->tdb_read(tdb, offset, rec, sizeof(*rec),DOCONV()) == -1)
1196		return -1;
1197	if (TDB_BAD_MAGIC(rec)) {
1198		/* Ensure ecode is set for log fn. */
1199		tdb->ecode = TDB_ERR_CORRUPT;
1200		TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_rec_read bad magic 0x%x at offset=%d\n", rec->magic, offset));
1201		return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
1202	}
1203	return tdb->methods->tdb_oob(tdb, rec->next+sizeof(*rec), 0);
1204}
1205
1206int tdb_rec_write(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec)
1207{
1208	struct list_struct r = *rec;
1209	return tdb->methods->tdb_write(tdb, offset, CONVERT(r), sizeof(r));
1210}
1211
1212static const struct tdb_methods io_methods = {
1213	tdb_read,
1214	tdb_write,
1215	tdb_next_hash_chain,
1216	tdb_oob,
1217	tdb_expand_file,
1218	tdb_brlock
1219};
1220
1221/*
1222  initialise the default methods table
1223*/
1224void tdb_io_init(struct tdb_context *tdb)
1225{
1226	tdb->methods = &io_methods;
1227}
1228
1229/* file: transaction.c */
1230
1231/*
1232  transaction design:
1233
1234  - only allow a single transaction at a time per database. This makes
1235    using the transaction API simpler, as otherwise the caller would
1236    have to cope with temporary failures in transactions that conflict
1237    with other current transactions
1238
1239  - keep the transaction recovery information in the same file as the
1240    database, using a special 'transaction recovery' record pointed at
1241    by the header. This removes the need for extra journal files as
1242    used by some other databases
1243
1244  - dynamically allocated the transaction recover record, re-using it
1245    for subsequent transactions. If a larger record is needed then
1246    tdb_free() the old record to place it on the normal tdb freelist
1247    before allocating the new record
1248
1249  - during transactions, keep a linked list of writes all that have
1250    been performed by intercepting all tdb_write() calls. The hooked
1251    transaction versions of tdb_read() and tdb_write() check this
1252    linked list and try to use the elements of the list in preference
1253    to the real database.
1254
1255  - don't allow any locks to be held when a transaction starts,
1256    otherwise we can end up with deadlock (plus lack of lock nesting
1257    in posix locks would mean the lock is lost)
1258
1259  - if the caller gains a lock during the transaction but doesn't
1260    release it then fail the commit
1261
1262  - allow for nested calls to tdb_transaction_start(), re-using the
1263    existing transaction record. If the inner transaction is cancelled
1264    then a subsequent commit will fail
1265
1266  - keep a mirrored copy of the tdb hash chain heads to allow for the
1267    fast hash heads scan on traverse, updating the mirrored copy in
1268    the transaction version of tdb_write
1269
1270  - allow callers to mix transaction and non-transaction use of tdb,
1271    although once a transaction is started then an exclusive lock is
1272    gained until the transaction is committed or cancelled
1273
1274  - the commit stategy involves first saving away all modified data
1275    into a linearised buffer in the transaction recovery area, then
1276    marking the transaction recovery area with a magic value to
1277    indicate a valid recovery record. In total 4 fsync/msync calls are
1278    needed per commit to prevent race conditions. It might be possible
1279    to reduce this to 3 or even 2 with some more work.
1280
1281  - check for a valid recovery record on open of the tdb, while the
1282    global lock is held. Automatically recover from the transaction
1283    recovery area if needed, then continue with the open as
1284    usual. This allows for smooth crash recovery with no administrator
1285    intervention.
1286
1287  - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
1288    still available, but no transaction recovery area is used and no
1289    fsync/msync calls are made.
1290
1291*/
1292
1293struct tdb_transaction_el {
1294	struct tdb_transaction_el *next, *prev;
1295	tdb_off_t offset;
1296	tdb_len_t length;
1297	unsigned char *data;
1298};
1299
1300/*
1301  hold the context of any current transaction
1302*/
1303struct tdb_transaction {
1304	/* we keep a mirrored copy of the tdb hash heads here so
1305	   tdb_next_hash_chain() can operate efficiently */
1306	u32 *hash_heads;
1307
1308	/* the original io methods - used to do IOs to the real db */
1309	const struct tdb_methods *io_methods;
1310
1311	/* the list of transaction elements. We use a doubly linked
1312	   list with a last pointer to allow us to keep the list
1313	   ordered, with first element at the front of the list. It
1314	   needs to be doubly linked as the read/write traversals need
1315	   to be backwards, while the commit needs to be forwards */
1316	struct tdb_transaction_el *elements, *elements_last;
1317
1318	/* non-zero when an internal transaction error has
1319	   occurred. All write operations will then fail until the
1320	   transaction is ended */
1321	int transaction_error;
1322
1323	/* when inside a transaction we need to keep track of any
1324	   nested tdb_transaction_start() calls, as these are allowed,
1325	   but don't create a new transaction */
1326	int nesting;
1327
1328	/* old file size before transaction */
1329	tdb_len_t old_map_size;
1330};
1331
1332
1333/*
1334  read while in a transaction. We need to check first if the data is in our list
1335  of transaction elements, then if not do a real read
1336*/
1337static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
1338			    tdb_len_t len, int cv)
1339{
1340	struct tdb_transaction_el *el;
1341
1342	/* we need to walk the list backwards to get the most recent data */
1343	for (el=tdb->transaction->elements_last;el;el=el->prev) {
1344		tdb_len_t partial;
1345
1346		if (off+len <= el->offset) {
1347			continue;
1348		}
1349		if (off >= el->offset + el->length) {
1350			continue;
1351		}
1352
1353		/* an overlapping read - needs to be split into up to
1354		   2 reads and a memcpy */
1355		if (off < el->offset) {
1356			partial = el->offset - off;
1357			if (transaction_read(tdb, off, buf, partial, cv) != 0) {
1358				goto fail;
1359			}
1360			len -= partial;
1361			off += partial;
1362			buf = (void *)(partial + (char *)buf);
1363		}
1364		if (off + len <= el->offset + el->length) {
1365			partial = len;
1366		} else {
1367			partial = el->offset + el->length - off;
1368		}
1369		memcpy(buf, el->data + (off - el->offset), partial);
1370		if (cv) {
1371			tdb_convert(buf, len);
1372		}
1373		len -= partial;
1374		off += partial;
1375		buf = (void *)(partial + (char *)buf);
1376
1377		if (len != 0 && transaction_read(tdb, off, buf, len, cv) != 0) {
1378			goto fail;
1379		}
1380
1381		return 0;
1382	}
1383
1384	/* its not in the transaction elements - do a real read */
1385	return tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv);
1386
1387fail:
1388	TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%d len=%d\n", off, len));
1389	tdb->ecode = TDB_ERR_IO;
1390	tdb->transaction->transaction_error = 1;
1391	return -1;
1392}
1393
1394
1395/*
1396  write while in a transaction
1397*/
1398static int transaction_write(struct tdb_context *tdb, tdb_off_t off,
1399			     const void *buf, tdb_len_t len)
1400{
1401	struct tdb_transaction_el *el, *best_el=NULL;
1402
1403	if (len == 0) {
1404		return 0;
1405	}
1406
1407	/* if the write is to a hash head, then update the transaction
1408	   hash heads */
1409	if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
1410	    off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
1411		u32 chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
1412		memcpy(&tdb->transaction->hash_heads[chain], buf, len);
1413	}
1414
1415	/* first see if we can replace an existing entry */
1416	for (el=tdb->transaction->elements_last;el;el=el->prev) {
1417		tdb_len_t partial;
1418
1419		if (best_el == NULL && off == el->offset+el->length) {
1420			best_el = el;
1421		}
1422
1423		if (off+len <= el->offset) {
1424			continue;
1425		}
1426		if (off >= el->offset + el->length) {
1427			continue;
1428		}
1429
1430		/* an overlapping write - needs to be split into up to
1431		   2 writes and a memcpy */
1432		if (off < el->offset) {
1433			partial = el->offset - off;
1434			if (transaction_write(tdb, off, buf, partial) != 0) {
1435				goto fail;
1436			}
1437			len -= partial;
1438			off += partial;
1439			buf = (const void *)(partial + (const char *)buf);
1440		}
1441		if (off + len <= el->offset + el->length) {
1442			partial = len;
1443		} else {
1444			partial = el->offset + el->length - off;
1445		}
1446		memcpy(el->data + (off - el->offset), buf, partial);
1447		len -= partial;
1448		off += partial;
1449		buf = (const void *)(partial + (const char *)buf);
1450
1451		if (len != 0 && transaction_write(tdb, off, buf, len) != 0) {
1452			goto fail;
1453		}
1454
1455		return 0;
1456	}
1457
1458	/* see if we can append the new entry to an existing entry */
1459	if (best_el && best_el->offset + best_el->length == off &&
1460	    (off+len < tdb->transaction->old_map_size ||
1461	     off > tdb->transaction->old_map_size)) {
1462		unsigned char *data = best_el->data;
1463		el = best_el;
1464		el->data = (unsigned char *)realloc(el->data,
1465						    el->length + len);
1466		if (el->data == NULL) {
1467			tdb->ecode = TDB_ERR_OOM;
1468			tdb->transaction->transaction_error = 1;
1469			el->data = data;
1470			return -1;
1471		}
1472		if (buf) {
1473			memcpy(el->data + el->length, buf, len);
1474		} else {
1475			memset(el->data + el->length, TDB_PAD_BYTE, len);
1476		}
1477		el->length += len;
1478		return 0;
1479	}
1480
1481	/* add a new entry at the end of the list */
1482	el = (struct tdb_transaction_el *)malloc(sizeof(*el));
1483	if (el == NULL) {
1484		tdb->ecode = TDB_ERR_OOM;
1485		tdb->transaction->transaction_error = 1;
1486		return -1;
1487	}
1488	el->next = NULL;
1489	el->prev = tdb->transaction->elements_last;
1490	el->offset = off;
1491	el->length = len;
1492	el->data = (unsigned char *)malloc(len);
1493	if (el->data == NULL) {
1494		free(el);
1495		tdb->ecode = TDB_ERR_OOM;
1496		tdb->transaction->transaction_error = 1;
1497		return -1;
1498	}
1499	if (buf) {
1500		memcpy(el->data, buf, len);
1501	} else {
1502		memset(el->data, TDB_PAD_BYTE, len);
1503	}
1504	if (el->prev) {
1505		el->prev->next = el;
1506	} else {
1507		tdb->transaction->elements = el;
1508	}
1509	tdb->transaction->elements_last = el;
1510	return 0;
1511
1512fail:
1513	TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n", off, len));
1514	tdb->ecode = TDB_ERR_IO;
1515	tdb->transaction->transaction_error = 1;
1516	return -1;
1517}
1518
1519/*
1520  accelerated hash chain head search, using the cached hash heads
1521*/
1522static void transaction_next_hash_chain(struct tdb_context *tdb, u32 *chain)
1523{
1524	u32 h = *chain;
1525	for (;h < tdb->header.hash_size;h++) {
1526		/* the +1 takes account of the freelist */
1527		if (0 != tdb->transaction->hash_heads[h+1]) {
1528			break;
1529		}
1530	}
1531	(*chain) = h;
1532}
1533
1534/*
1535  out of bounds check during a transaction
1536*/
1537static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
1538{
1539	if (len <= tdb->map_size) {
1540		return 0;
1541	}
1542	return TDB_ERRCODE(TDB_ERR_IO, -1);
1543}
1544
1545/*
1546  transaction version of tdb_expand().
1547*/
1548static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size,
1549				   tdb_off_t addition)
1550{
1551	/* add a write to the transaction elements, so subsequent
1552	   reads see the zero data */
1553	if (transaction_write(tdb, size, NULL, addition) != 0) {
1554		return -1;
1555	}
1556
1557	return 0;
1558}
1559
1560/*
1561  brlock during a transaction - ignore them
1562*/
1563static int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset,
1564			      int rw_type, int lck_type, int probe, size_t len)
1565{
1566	return 0;
1567}
1568
1569static const struct tdb_methods transaction_methods = {
1570	transaction_read,
1571	transaction_write,
1572	transaction_next_hash_chain,
1573	transaction_oob,
1574	transaction_expand_file,
1575	transaction_brlock
1576};
1577
1578
1579/*
1580  start a tdb transaction. No token is returned, as only a single
1581  transaction is allowed to be pending per tdb_context
1582*/
1583int tdb_transaction_start(struct tdb_context *tdb)
1584{
1585	/* some sanity checks */
1586	if (tdb->read_only || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) {
1587		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
1588		tdb->ecode = TDB_ERR_EINVAL;
1589		return -1;
1590	}
1591
1592	/* cope with nested tdb_transaction_start() calls */
1593	if (tdb->transaction != NULL) {
1594		tdb->transaction->nesting++;
1595		TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n",
1596			 tdb->transaction->nesting));
1597		return 0;
1598	}
1599
1600	if (tdb->num_locks != 0 || tdb->global_lock.count) {
1601		/* the caller must not have any locks when starting a
1602		   transaction as otherwise we'll be screwed by lack
1603		   of nested locks in posix */
1604		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n"));
1605		tdb->ecode = TDB_ERR_LOCK;
1606		return -1;
1607	}
1608
1609	if (tdb->travlocks.next != NULL) {
1610		/* you cannot use transactions inside a traverse (although you can use
1611		   traverse inside a transaction) as otherwise you can end up with
1612		   deadlock */
1613		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
1614		tdb->ecode = TDB_ERR_LOCK;
1615		return -1;
1616	}
1617
1618	tdb->transaction = (struct tdb_transaction *)
1619		calloc(sizeof(struct tdb_transaction), 1);
1620	if (tdb->transaction == NULL) {
1621		tdb->ecode = TDB_ERR_OOM;
1622		return -1;
1623	}
1624
1625	/* get the transaction write lock. This is a blocking lock. As
1626	   discussed with Volker, there are a number of ways we could
1627	   make this async, which we will probably do in the future */
1628	if (tdb_transaction_lock(tdb, F_WRLCK) == -1) {
1629		SAFE_FREE(tdb->transaction);
1630		return -1;
1631	}
1632
1633	/* get a read lock from the freelist to the end of file. This
1634	   is upgraded to a write lock during the commit */
1635	if (tdb_brlock(tdb, FREELIST_TOP, F_RDLCK, F_SETLKW, 0, 0) == -1) {
1636		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
1637		tdb->ecode = TDB_ERR_LOCK;
1638		goto fail;
1639	}
1640
1641	/* setup a copy of the hash table heads so the hash scan in
1642	   traverse can be fast */
1643	tdb->transaction->hash_heads = (u32 *)
1644		calloc(tdb->header.hash_size+1, sizeof(u32));
1645	if (tdb->transaction->hash_heads == NULL) {
1646		tdb->ecode = TDB_ERR_OOM;
1647		goto fail;
1648	}
1649	if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
1650				   TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
1651		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n"));
1652		tdb->ecode = TDB_ERR_IO;
1653		goto fail;
1654	}
1655
1656	/* make sure we know about any file expansions already done by
1657	   anyone else */
1658	tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
1659	tdb->transaction->old_map_size = tdb->map_size;
1660
1661	/* finally hook the io methods, replacing them with
1662	   transaction specific methods */
1663	tdb->transaction->io_methods = tdb->methods;
1664	tdb->methods = &transaction_methods;
1665
1666	/* by calling this transaction write here, we ensure that we don't grow the
1667	   transaction linked list due to hash table updates */
1668	if (transaction_write(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
1669			      TDB_HASHTABLE_SIZE(tdb)) != 0) {
1670		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to prime hash table\n"));
1671		tdb->ecode = TDB_ERR_IO;
1672		tdb->methods = tdb->transaction->io_methods;
1673		goto fail;
1674	}
1675
1676	return 0;
1677
1678fail:
1679	tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
1680	tdb_transaction_unlock(tdb);
1681	SAFE_FREE(tdb->transaction->hash_heads);
1682	SAFE_FREE(tdb->transaction);
1683	return -1;
1684}
1685
1686
1687/*
1688  cancel the current transaction
1689*/
1690int tdb_transaction_cancel(struct tdb_context *tdb)
1691{
1692	if (tdb->transaction == NULL) {
1693		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
1694		return -1;
1695	}
1696
1697	if (tdb->transaction->nesting != 0) {
1698		tdb->transaction->transaction_error = 1;
1699		tdb->transaction->nesting--;
1700		return 0;
1701	}
1702
1703	tdb->map_size = tdb->transaction->old_map_size;
1704
1705	/* free all the transaction elements */
1706	while (tdb->transaction->elements) {
1707		struct tdb_transaction_el *el = tdb->transaction->elements;
1708		tdb->transaction->elements = el->next;
1709		free(el->data);
1710		free(el);
1711	}
1712
1713	/* remove any global lock created during the transaction */
1714	if (tdb->global_lock.count != 0) {
1715		tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 4*tdb->header.hash_size);
1716		tdb->global_lock.count = 0;
1717	}
1718
1719	/* remove any locks created during the transaction */
1720	if (tdb->num_locks != 0) {
1721		int i;
1722		for (i=0;i<tdb->num_lockrecs;i++) {
1723			tdb_brlock(tdb,FREELIST_TOP+4*tdb->lockrecs[i].list,
1724				   F_UNLCK,F_SETLKW, 0, 1);
1725		}
1726		tdb->num_locks = 0;
1727		tdb->num_lockrecs = 0;
1728		SAFE_FREE(tdb->lockrecs);
1729	}
1730
1731	/* restore the normal io methods */
1732	tdb->methods = tdb->transaction->io_methods;
1733
1734	tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
1735	tdb_transaction_unlock(tdb);
1736	SAFE_FREE(tdb->transaction->hash_heads);
1737	SAFE_FREE(tdb->transaction);
1738
1739	return 0;
1740}
1741
1742/*
1743  sync to disk
1744*/
1745static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
1746{
1747	if (fsync(tdb->fd) != 0) {
1748		tdb->ecode = TDB_ERR_IO;
1749		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
1750		return -1;
1751	}
1752#ifdef MS_SYNC
1753	if (tdb->map_ptr) {
1754		tdb_off_t moffset = offset & ~(tdb->page_size-1);
1755		if (msync(moffset + (char *)tdb->map_ptr,
1756			  length + (offset - moffset), MS_SYNC) != 0) {
1757			tdb->ecode = TDB_ERR_IO;
1758			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
1759				 strerror(errno)));
1760			return -1;
1761		}
1762	}
1763#endif
1764	return 0;
1765}
1766
1767
1768/*
1769  work out how much space the linearised recovery data will consume
1770*/
1771static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
1772{
1773	struct tdb_transaction_el *el;
1774	tdb_len_t recovery_size = 0;
1775
1776	recovery_size = sizeof(u32);
1777	for (el=tdb->transaction->elements;el;el=el->next) {
1778		if (el->offset >= tdb->transaction->old_map_size) {
1779			continue;
1780		}
1781		recovery_size += 2*sizeof(tdb_off_t) + el->length;
1782	}
1783
1784	return recovery_size;
1785}
1786
1787/*
1788  allocate the recovery area, or use an existing recovery area if it is
1789  large enough
1790*/
1791static int tdb_recovery_allocate(struct tdb_context *tdb,
1792				 tdb_len_t *recovery_size,
1793				 tdb_off_t *recovery_offset,
1794				 tdb_len_t *recovery_max_size)
1795{
1796	struct list_struct rec;
1797	const struct tdb_methods *methods = tdb->transaction->io_methods;
1798	tdb_off_t recovery_head;
1799
1800	if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
1801		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
1802		return -1;
1803	}
1804
1805	rec.rec_len = 0;
1806
1807	if (recovery_head != 0 &&
1808	    methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
1809		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery record\n"));
1810		return -1;
1811	}
1812
1813	*recovery_size = tdb_recovery_size(tdb);
1814
1815	if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
1816		/* it fits in the existing area */
1817		*recovery_max_size = rec.rec_len;
1818		*recovery_offset = recovery_head;
1819		return 0;
1820	}
1821
1822	/* we need to free up the old recovery area, then allocate a
1823	   new one at the end of the file. Note that we cannot use
1824	   tdb_allocate() to allocate the new one as that might return
1825	   us an area that is being currently used (as of the start of
1826	   the transaction) */
1827	if (recovery_head != 0) {
1828		if (tdb_free(tdb, recovery_head, &rec) == -1) {
1829			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to free previous recovery area\n"));
1830			return -1;
1831		}
1832	}
1833
1834	/* the tdb_free() call might have increased the recovery size */
1835	*recovery_size = tdb_recovery_size(tdb);
1836
1837	/* round up to a multiple of page size */
1838	*recovery_max_size = TDB_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
1839	*recovery_offset = tdb->map_size;
1840	recovery_head = *recovery_offset;
1841
1842	if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
1843				     (tdb->map_size - tdb->transaction->old_map_size) +
1844				     sizeof(rec) + *recovery_max_size) == -1) {
1845		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
1846		return -1;
1847	}
1848
1849	/* remap the file (if using mmap) */
1850	methods->tdb_oob(tdb, tdb->map_size + 1, 1);
1851
1852	/* we have to reset the old map size so that we don't try to expand the file
1853	   again in the transaction commit, which would destroy the recovery area */
1854	tdb->transaction->old_map_size = tdb->map_size;
1855
1856	/* write the recovery header offset and sync - we can sync without a race here
1857	   as the magic ptr in the recovery record has not been set */
1858	CONVERT(recovery_head);
1859	if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD,
1860			       &recovery_head, sizeof(tdb_off_t)) == -1) {
1861		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
1862		return -1;
1863	}
1864
1865	return 0;
1866}
1867
1868
1869/*
1870  setup the recovery data that will be used on a crash during commit
1871*/
1872static int transaction_setup_recovery(struct tdb_context *tdb,
1873				      tdb_off_t *magic_offset)
1874{
1875	struct tdb_transaction_el *el;
1876	tdb_len_t recovery_size;
1877	unsigned char *data, *p;
1878	const struct tdb_methods *methods = tdb->transaction->io_methods;
1879	struct list_struct *rec;
1880	tdb_off_t recovery_offset, recovery_max_size;
1881	tdb_off_t old_map_size = tdb->transaction->old_map_size;
1882	u32 magic, tailer;
1883
1884	/*
1885	  check that the recovery area has enough space
1886	*/
1887	if (tdb_recovery_allocate(tdb, &recovery_size,
1888				  &recovery_offset, &recovery_max_size) == -1) {
1889		return -1;
1890	}
1891
1892	data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
1893	if (data == NULL) {
1894		tdb->ecode = TDB_ERR_OOM;
1895		return -1;
1896	}
1897
1898	rec = (struct list_struct *)data;
1899	memset(rec, 0, sizeof(*rec));
1900
1901	rec->magic    = 0;
1902	rec->data_len = recovery_size;
1903	rec->rec_len  = recovery_max_size;
1904	rec->key_len  = old_map_size;
1905	CONVERT(rec);
1906
1907	/* build the recovery data into a single blob to allow us to do a single
1908	   large write, which should be more efficient */
1909	p = data + sizeof(*rec);
1910	for (el=tdb->transaction->elements;el;el=el->next) {
1911		if (el->offset >= old_map_size) {
1912			continue;
1913		}
1914		if (el->offset + el->length > tdb->transaction->old_map_size) {
1915			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
1916			free(data);
1917			tdb->ecode = TDB_ERR_CORRUPT;
1918			return -1;
1919		}
1920		memcpy(p, &el->offset, 4);
1921		memcpy(p+4, &el->length, 4);
1922		if (DOCONV()) {
1923			tdb_convert(p, 8);
1924		}
1925		/* the recovery area contains the old data, not the
1926		   new data, so we have to call the original tdb_read
1927		   method to get it */
1928		if (methods->tdb_read(tdb, el->offset, p + 8, el->length, 0) != 0) {
1929			free(data);
1930			tdb->ecode = TDB_ERR_IO;
1931			return -1;
1932		}
1933		p += 8 + el->length;
1934	}
1935
1936	/* and the tailer */
1937	tailer = sizeof(*rec) + recovery_max_size;
1938	memcpy(p, &tailer, 4);
1939	CONVERT(p);
1940
1941	/* write the recovery data to the recovery area */
1942	if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
1943		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
1944		free(data);
1945		tdb->ecode = TDB_ERR_IO;
1946		return -1;
1947	}
1948
1949	/* as we don't have ordered writes, we have to sync the recovery
1950	   data before we update the magic to indicate that the recovery
1951	   data is present */
1952	if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
1953		free(data);
1954		return -1;
1955	}
1956
1957	free(data);
1958
1959	magic = TDB_RECOVERY_MAGIC;
1960	CONVERT(magic);
1961
1962	*magic_offset = recovery_offset + offsetof(struct list_struct, magic);
1963
1964	if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
1965		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
1966		tdb->ecode = TDB_ERR_IO;
1967		return -1;
1968	}
1969
1970	/* ensure the recovery magic marker is on disk */
1971	if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
1972		return -1;
1973	}
1974
1975	return 0;
1976}
1977
1978/*
1979  commit the current transaction
1980*/
1981int tdb_transaction_commit(struct tdb_context *tdb)
1982{
1983	const struct tdb_methods *methods;
1984	tdb_off_t magic_offset = 0;
1985	u32 zero = 0;
1986
1987	if (tdb->transaction == NULL) {
1988		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
1989		return -1;
1990	}
1991
1992	if (tdb->transaction->transaction_error) {
1993		tdb->ecode = TDB_ERR_IO;
1994		tdb_transaction_cancel(tdb);
1995		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
1996		return -1;
1997	}
1998
1999	if (tdb->transaction->nesting != 0) {
2000		tdb->transaction->nesting--;
2001		return 0;
2002	}
2003
2004	/* check for a null transaction */
2005	if (tdb->transaction->elements == NULL) {
2006		tdb_transaction_cancel(tdb);
2007		return 0;
2008	}
2009
2010	methods = tdb->transaction->io_methods;
2011
2012	/* if there are any locks pending then the caller has not
2013	   nested their locks properly, so fail the transaction */
2014	if (tdb->num_locks || tdb->global_lock.count) {
2015		tdb->ecode = TDB_ERR_LOCK;
2016		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: locks pending on commit\n"));
2017		tdb_transaction_cancel(tdb);
2018		return -1;
2019	}
2020
2021	/* upgrade the main transaction lock region to a write lock */
2022	if (tdb_brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) {
2023		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to upgrade hash locks\n"));
2024		tdb->ecode = TDB_ERR_LOCK;
2025		tdb_transaction_cancel(tdb);
2026		return -1;
2027	}
2028
2029	/* get the global lock - this prevents new users attaching to the database
2030	   during the commit */
2031	if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
2032		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: failed to get global lock\n"));
2033		tdb->ecode = TDB_ERR_LOCK;
2034		tdb_transaction_cancel(tdb);
2035		return -1;
2036	}
2037
2038	if (!(tdb->flags & TDB_NOSYNC)) {
2039		/* write the recovery data to the end of the file */
2040		if (transaction_setup_recovery(tdb, &magic_offset) == -1) {
2041			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to setup recovery data\n"));
2042			tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
2043			tdb_transaction_cancel(tdb);
2044			return -1;
2045		}
2046	}
2047
2048	/* expand the file to the new size if needed */
2049	if (tdb->map_size != tdb->transaction->old_map_size) {
2050		if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
2051					     tdb->map_size -
2052					     tdb->transaction->old_map_size) == -1) {
2053			tdb->ecode = TDB_ERR_IO;
2054			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: expansion failed\n"));
2055			tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
2056			tdb_transaction_cancel(tdb);
2057			return -1;
2058		}
2059		tdb->map_size = tdb->transaction->old_map_size;
2060		methods->tdb_oob(tdb, tdb->map_size + 1, 1);
2061	}
2062
2063	/* perform all the writes */
2064	while (tdb->transaction->elements) {
2065		struct tdb_transaction_el *el = tdb->transaction->elements;
2066
2067		if (methods->tdb_write(tdb, el->offset, el->data, el->length) == -1) {
2068			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
2069
2070			/* we've overwritten part of the data and
2071			   possibly expanded the file, so we need to
2072			   run the crash recovery code */
2073			tdb->methods = methods;
2074			tdb_transaction_recover(tdb);
2075
2076			tdb_transaction_cancel(tdb);
2077			tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
2078
2079			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
2080			return -1;
2081		}
2082		tdb->transaction->elements = el->next;
2083		free(el->data);
2084		free(el);
2085	}
2086
2087	if (!(tdb->flags & TDB_NOSYNC)) {
2088		/* ensure the new data is on disk */
2089		if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
2090			return -1;
2091		}
2092
2093		/* remove the recovery marker */
2094		if (methods->tdb_write(tdb, magic_offset, &zero, 4) == -1) {
2095			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to remove recovery magic\n"));
2096			return -1;
2097		}
2098
2099		/* ensure the recovery marker has been removed on disk */
2100		if (transaction_sync(tdb, magic_offset, 4) == -1) {
2101			return -1;
2102		}
2103	}
2104
2105	tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
2106
2107	/*
2108	  TODO: maybe write to some dummy hdr field, or write to magic
2109	  offset without mmap, before the last sync, instead of the
2110	  utime() call
2111	*/
2112
2113	/* on some systems (like Linux 2.6.x) changes via mmap/msync
2114	   don't change the mtime of the file, this means the file may
2115	   not be backed up (as tdb rounding to block sizes means that
2116	   file size changes are quite rare too). The following forces
2117	   mtime changes when a transaction completes */
2118#ifdef HAVE_UTIME
2119	utime(tdb->name, NULL);
2120#endif
2121
2122	/* use a transaction cancel to free memory and remove the
2123	   transaction locks */
2124	tdb_transaction_cancel(tdb);
2125	return 0;
2126}
2127
2128
2129/*
2130  recover from an aborted transaction. Must be called with exclusive
2131  database write access already established (including the global
2132  lock to prevent new processes attaching)
2133*/
2134int tdb_transaction_recover(struct tdb_context *tdb)
2135{
2136	tdb_off_t recovery_head, recovery_eof;
2137	unsigned char *data, *p;
2138	u32 zero = 0;
2139	struct list_struct rec;
2140
2141	/* find the recovery area */
2142	if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
2143		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery head\n"));
2144		tdb->ecode = TDB_ERR_IO;
2145		return -1;
2146	}
2147
2148	if (recovery_head == 0) {
2149		/* we have never allocated a recovery record */
2150		return 0;
2151	}
2152
2153	/* read the recovery record */
2154	if (tdb->methods->tdb_read(tdb, recovery_head, &rec,
2155				   sizeof(rec), DOCONV()) == -1) {
2156		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));
2157		tdb->ecode = TDB_ERR_IO;
2158		return -1;
2159	}
2160
2161	if (rec.magic != TDB_RECOVERY_MAGIC) {
2162		/* there is no valid recovery data */
2163		return 0;
2164	}
2165
2166	if (tdb->read_only) {
2167		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: attempt to recover read only database\n"));
2168		tdb->ecode = TDB_ERR_CORRUPT;
2169		return -1;
2170	}
2171
2172	recovery_eof = rec.key_len;
2173
2174	data = (unsigned char *)malloc(rec.data_len);
2175	if (data == NULL) {
2176		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));
2177		tdb->ecode = TDB_ERR_OOM;
2178		return -1;
2179	}
2180
2181	/* read the full recovery data */
2182	if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
2183				   rec.data_len, 0) == -1) {
2184		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));
2185		tdb->ecode = TDB_ERR_IO;
2186		return -1;
2187	}
2188
2189	/* recover the file data */
2190	p = data;
2191	while (p+8 < data + rec.data_len) {
2192		u32 ofs, len;
2193		if (DOCONV()) {
2194			tdb_convert(p, 8);
2195		}
2196		memcpy(&ofs, p, 4);
2197		memcpy(&len, p+4, 4);
2198
2199		if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
2200			free(data);
2201			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len, ofs));
2202			tdb->ecode = TDB_ERR_IO;
2203			return -1;
2204		}
2205		p += 8 + len;
2206	}
2207
2208	free(data);
2209
2210	if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
2211		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync recovery\n"));
2212		tdb->ecode = TDB_ERR_IO;
2213		return -1;
2214	}
2215
2216	/* if the recovery area is after the recovered eof then remove it */
2217	if (recovery_eof <= recovery_head) {
2218		if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
2219			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
2220			tdb->ecode = TDB_ERR_IO;
2221			return -1;
2222		}
2223	}
2224
2225	/* remove the recovery magic */
2226	if (tdb_ofs_write(tdb, recovery_head + offsetof(struct list_struct, magic),
2227			  &zero) == -1) {
2228		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
2229		tdb->ecode = TDB_ERR_IO;
2230		return -1;
2231	}
2232
2233	/* reduce the file size to the old size */
2234	tdb_munmap(tdb);
2235	if (ftruncate(tdb->fd, recovery_eof) != 0) {
2236		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to reduce to recovery size\n"));
2237		tdb->ecode = TDB_ERR_IO;
2238		return -1;
2239	}
2240	tdb->map_size = recovery_eof;
2241	tdb_mmap(tdb);
2242
2243	if (transaction_sync(tdb, 0, recovery_eof) == -1) {
2244		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
2245		tdb->ecode = TDB_ERR_IO;
2246		return -1;
2247	}
2248
2249	TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %d byte database\n",
2250		 recovery_eof));
2251
2252	/* all done */
2253	return 0;
2254}
2255
2256/* file: freelist.c */
2257
2258/* read a freelist record and check for simple errors */
2259static int tdb_rec_free_read(struct tdb_context *tdb, tdb_off_t off, struct list_struct *rec)
2260{
2261	if (tdb->methods->tdb_read(tdb, off, rec, sizeof(*rec),DOCONV()) == -1)
2262		return -1;
2263
2264	if (rec->magic == TDB_MAGIC) {
2265		/* this happens when a app is showdown while deleting a record - we should
2266		   not completely fail when this happens */
2267		TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_rec_free_read non-free magic 0x%x at offset=%d - fixing\n",
2268			 rec->magic, off));
2269		rec->magic = TDB_FREE_MAGIC;
2270		if (tdb->methods->tdb_write(tdb, off, rec, sizeof(*rec)) == -1)
2271			return -1;
2272	}
2273
2274	if (rec->magic != TDB_FREE_MAGIC) {
2275		/* Ensure ecode is set for log fn. */
2276		tdb->ecode = TDB_ERR_CORRUPT;
2277		TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_rec_free_read bad magic 0x%x at offset=%d\n",
2278			   rec->magic, off));
2279		return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
2280	}
2281	if (tdb->methods->tdb_oob(tdb, rec->next+sizeof(*rec), 0) != 0)
2282		return -1;
2283	return 0;
2284}
2285
2286
2287
2288/* Remove an element from the freelist.  Must have alloc lock. */
2289static int remove_from_freelist(struct tdb_context *tdb, tdb_off_t off, tdb_off_t next)
2290{
2291	tdb_off_t last_ptr, i;
2292
2293	/* read in the freelist top */
2294	last_ptr = FREELIST_TOP;
2295	while (tdb_ofs_read(tdb, last_ptr, &i) != -1 && i != 0) {
2296		if (i == off) {
2297			/* We've found it! */
2298			return tdb_ofs_write(tdb, last_ptr, &next);
2299		}
2300		/* Follow chain (next offset is at start of record) */
2301		last_ptr = i;
2302	}
2303	TDB_LOG((tdb, TDB_DEBUG_FATAL,"remove_from_freelist: not on list at off=%d\n", off));
2304	return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
2305}
2306
2307
2308/* update a record tailer (must hold allocation lock) */
2309static int update_tailer(struct tdb_context *tdb, tdb_off_t offset,
2310			 const struct list_struct *rec)
2311{
2312	tdb_off_t totalsize;
2313
2314	/* Offset of tailer from record header */
2315	totalsize = sizeof(*rec) + rec->rec_len;
2316	return tdb_ofs_write(tdb, offset + totalsize - sizeof(tdb_off_t),
2317			 &totalsize);
2318}
2319
2320/* Add an element into the freelist. Merge adjacent records if
2321   neccessary. */
2322int tdb_free(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec)
2323{
2324	tdb_off_t right, left;
2325
2326	/* Allocation and tailer lock */
2327	if (tdb_lock(tdb, -1, F_WRLCK) != 0)
2328		return -1;
2329
2330	/* set an initial tailer, so if we fail we don't leave a bogus record */
2331	if (update_tailer(tdb, offset, rec) != 0) {
2332		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: update_tailer failed!\n"));
2333		goto fail;
2334	}
2335
2336	/* Look right first (I'm an Australian, dammit) */
2337	right = offset + sizeof(*rec) + rec->rec_len;
2338	if (right + sizeof(*rec) <= tdb->map_size) {
2339		struct list_struct r;
2340
2341		if (tdb->methods->tdb_read(tdb, right, &r, sizeof(r), DOCONV()) == -1) {
2342			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: right read failed at %u\n", right));
2343			goto left;
2344		}
2345
2346		/* If it's free, expand to include it. */
2347		if (r.magic == TDB_FREE_MAGIC) {
2348			if (remove_from_freelist(tdb, right, r.next) == -1) {
2349				TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: right free failed at %u\n", right));
2350				goto left;
2351			}
2352			rec->rec_len += sizeof(r) + r.rec_len;
2353		}
2354	}
2355
2356left:
2357	/* Look left */
2358	left = offset - sizeof(tdb_off_t);
2359	if (left > TDB_DATA_START(tdb->header.hash_size)) {
2360		struct list_struct l;
2361		tdb_off_t leftsize;
2362
2363		/* Read in tailer and jump back to header */
2364		if (tdb_ofs_read(tdb, left, &leftsize) == -1) {
2365			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: left offset read failed at %u\n", left));
2366			goto update;
2367		}
2368
2369		/* it could be uninitialised data */
2370		if (leftsize == 0 || leftsize == TDB_PAD_U32) {
2371			goto update;
2372		}
2373
2374		left = offset - leftsize;
2375
2376		/* Now read in record */
2377		if (tdb->methods->tdb_read(tdb, left, &l, sizeof(l), DOCONV()) == -1) {
2378			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: left read failed at %u (%u)\n", left, leftsize));
2379			goto update;
2380		}
2381
2382		/* If it's free, expand to include it. */
2383		if (l.magic == TDB_FREE_MAGIC) {
2384			if (remove_from_freelist(tdb, left, l.next) == -1) {
2385				TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: left free failed at %u\n", left));
2386				goto update;
2387			} else {
2388				offset = left;
2389				rec->rec_len += leftsize;
2390			}
2391		}
2392	}
2393
2394update:
2395	if (update_tailer(tdb, offset, rec) == -1) {
2396		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: update_tailer failed at %u\n", offset));
2397		goto fail;
2398	}
2399
2400	/* Now, prepend to free list */
2401	rec->magic = TDB_FREE_MAGIC;
2402
2403	if (tdb_ofs_read(tdb, FREELIST_TOP, &rec->next) == -1 ||
2404	    tdb_rec_write(tdb, offset, rec) == -1 ||
2405	    tdb_ofs_write(tdb, FREELIST_TOP, &offset) == -1) {
2406		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free record write failed at offset=%d\n", offset));
2407		goto fail;
2408	}
2409
2410	/* And we're done. */
2411	tdb_unlock(tdb, -1, F_WRLCK);
2412	return 0;
2413
2414 fail:
2415	tdb_unlock(tdb, -1, F_WRLCK);
2416	return -1;
2417}
2418
2419
2420/*
2421   the core of tdb_allocate - called when we have decided which
2422   free list entry to use
2423 */
2424static tdb_off_t tdb_allocate_ofs(struct tdb_context *tdb, tdb_len_t length, tdb_off_t rec_ptr,
2425				struct list_struct *rec, tdb_off_t last_ptr)
2426{
2427	struct list_struct newrec;
2428	tdb_off_t newrec_ptr;
2429
2430	memset(&newrec, '\0', sizeof(newrec));
2431
2432	/* found it - now possibly split it up  */
2433	if (rec->rec_len > length + MIN_REC_SIZE) {
2434		/* Length of left piece */
2435		length = TDB_ALIGN(length, TDB_ALIGNMENT);
2436
2437		/* Right piece to go on free list */
2438		newrec.rec_len = rec->rec_len - (sizeof(*rec) + length);
2439		newrec_ptr = rec_ptr + sizeof(*rec) + length;
2440
2441		/* And left record is shortened */
2442		rec->rec_len = length;
2443	} else {
2444		newrec_ptr = 0;
2445	}
2446
2447	/* Remove allocated record from the free list */
2448	if (tdb_ofs_write(tdb, last_ptr, &rec->next) == -1) {
2449		return 0;
2450	}
2451
2452	/* Update header: do this before we drop alloc
2453	   lock, otherwise tdb_free() might try to
2454	   merge with us, thinking we're free.
2455	   (Thanks Jeremy Allison). */
2456	rec->magic = TDB_MAGIC;
2457	if (tdb_rec_write(tdb, rec_ptr, rec) == -1) {
2458		return 0;
2459	}
2460
2461	/* Did we create new block? */
2462	if (newrec_ptr) {
2463		/* Update allocated record tailer (we
2464		   shortened it). */
2465		if (update_tailer(tdb, rec_ptr, rec) == -1) {
2466			return 0;
2467		}
2468
2469		/* Free new record */
2470		if (tdb_free(tdb, newrec_ptr, &newrec) == -1) {
2471			return 0;
2472		}
2473	}
2474
2475	/* all done - return the new record offset */
2476	return rec_ptr;
2477}
2478
2479/* allocate some space from the free list. The offset returned points
2480   to a unconnected list_struct within the database with room for at
2481   least length bytes of total data
2482
2483   0 is returned if the space could not be allocated
2484 */
2485tdb_off_t tdb_allocate(struct tdb_context *tdb, tdb_len_t length, struct list_struct *rec)
2486{
2487	tdb_off_t rec_ptr, last_ptr, newrec_ptr;
2488	struct {
2489		tdb_off_t rec_ptr, last_ptr;
2490		tdb_len_t rec_len;
2491	} bestfit;
2492
2493	if (tdb_lock(tdb, -1, F_WRLCK) == -1)
2494		return 0;
2495
2496	/* Extra bytes required for tailer */
2497	length += sizeof(tdb_off_t);
2498
2499 again:
2500	last_ptr = FREELIST_TOP;
2501
2502	/* read in the freelist top */
2503	if (tdb_ofs_read(tdb, FREELIST_TOP, &rec_ptr) == -1)
2504		goto fail;
2505
2506	bestfit.rec_ptr = 0;
2507	bestfit.last_ptr = 0;
2508	bestfit.rec_len = 0;
2509
2510	/*
2511	   this is a best fit allocation strategy. Originally we used
2512	   a first fit strategy, but it suffered from massive fragmentation
2513	   issues when faced with a slowly increasing record size.
2514	 */
2515	while (rec_ptr) {
2516		if (tdb_rec_free_read(tdb, rec_ptr, rec) == -1) {
2517			goto fail;
2518		}
2519
2520		if (rec->rec_len >= length) {
2521			if (bestfit.rec_ptr == 0 ||
2522			    rec->rec_len < bestfit.rec_len) {
2523				bestfit.rec_len = rec->rec_len;
2524				bestfit.rec_ptr = rec_ptr;
2525				bestfit.last_ptr = last_ptr;
2526				/* consider a fit to be good enough if
2527				   we aren't wasting more than half
2528				   the space */
2529				if (bestfit.rec_len < 2*length) {
2530					break;
2531				}
2532			}
2533		}
2534
2535		/* move to the next record */
2536		last_ptr = rec_ptr;
2537		rec_ptr = rec->next;
2538	}
2539
2540	if (bestfit.rec_ptr != 0) {
2541		if (tdb_rec_free_read(tdb, bestfit.rec_ptr, rec) == -1) {
2542			goto fail;
2543		}
2544
2545		newrec_ptr = tdb_allocate_ofs(tdb, length, bestfit.rec_ptr, rec, bestfit.last_ptr);
2546		tdb_unlock(tdb, -1, F_WRLCK);
2547		return newrec_ptr;
2548	}
2549
2550	/* we didn't find enough space. See if we can expand the
2551	   database and if we can then try again */
2552	if (tdb_expand(tdb, length + sizeof(*rec)) == 0)
2553		goto again;
2554 fail:
2555	tdb_unlock(tdb, -1, F_WRLCK);
2556	return 0;
2557}
2558
2559/* file: freelistcheck.c */
2560
2561/* Check the freelist is good and contains no loops.
2562   Very memory intensive - only do this as a consistency
2563   checker. Heh heh - uses an in memory tdb as the storage
2564   for the "seen" record list. For some reason this strikes
2565   me as extremely clever as I don't have to write another tree
2566   data structure implementation :-).
2567 */
2568
2569static int seen_insert(struct tdb_context *mem_tdb, tdb_off_t rec_ptr)
2570{
2571	TDB_DATA key, data;
2572
2573	memset(&data, '\0', sizeof(data));
2574	key.dptr = (unsigned char *)&rec_ptr;
2575	key.dsize = sizeof(rec_ptr);
2576	return tdb_store(mem_tdb, key, data, TDB_INSERT);
2577}
2578
2579int tdb_validate_freelist(struct tdb_context *tdb, int *pnum_entries)
2580{
2581	struct tdb_context *mem_tdb = NULL;
2582	struct list_struct rec;
2583	tdb_off_t rec_ptr, last_ptr;
2584	int ret = -1;
2585
2586	*pnum_entries = 0;
2587
2588	mem_tdb = tdb_open("flval", tdb->header.hash_size,
2589				TDB_INTERNAL, O_RDWR, 0600);
2590	if (!mem_tdb) {
2591		return -1;
2592	}
2593
2594	if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
2595		tdb_close(mem_tdb);
2596		return 0;
2597	}
2598
2599	last_ptr = FREELIST_TOP;
2600
2601	/* Store the FREELIST_TOP record. */
2602	if (seen_insert(mem_tdb, last_ptr) == -1) {
2603		ret = TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
2604		goto fail;
2605	}
2606
2607	/* read in the freelist top */
2608	if (tdb_ofs_read(tdb, FREELIST_TOP, &rec_ptr) == -1) {
2609		goto fail;
2610	}
2611
2612	while (rec_ptr) {
2613
2614		/* If we can't store this record (we've seen it
2615		   before) then the free list has a loop and must
2616		   be corrupt. */
2617
2618		if (seen_insert(mem_tdb, rec_ptr)) {
2619			ret = TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
2620			goto fail;
2621		}
2622
2623		if (tdb_rec_free_read(tdb, rec_ptr, &rec) == -1) {
2624			goto fail;
2625		}
2626
2627		/* move to the next record */
2628		last_ptr = rec_ptr;
2629		rec_ptr = rec.next;
2630		*pnum_entries += 1;
2631	}
2632
2633	ret = 0;
2634
2635  fail:
2636
2637	tdb_close(mem_tdb);
2638	tdb_unlock(tdb, -1, F_WRLCK);
2639	return ret;
2640}
2641
2642/* file: traverse.c */
2643
2644/* Uses traverse lock: 0 = finish, -1 = error, other = record offset */
2645static int tdb_next_lock(struct tdb_context *tdb, struct tdb_traverse_lock *tlock,
2646			 struct list_struct *rec)
2647{
2648	int want_next = (tlock->off != 0);
2649
2650	/* Lock each chain from the start one. */
2651	for (; tlock->hash < tdb->header.hash_size; tlock->hash++) {
2652		if (!tlock->off && tlock->hash != 0) {
2653			/* this is an optimisation for the common case where
2654			   the hash chain is empty, which is particularly
2655			   common for the use of tdb with ldb, where large
2656			   hashes are used. In that case we spend most of our
2657			   time in tdb_brlock(), locking empty hash chains.
2658
2659			   To avoid this, we do an unlocked pre-check to see
2660			   if the hash chain is empty before starting to look
2661			   inside it. If it is empty then we can avoid that
2662			   hash chain. If it isn't empty then we can't believe
2663			   the value we get back, as we read it without a
2664			   lock, so instead we get the lock and re-fetch the
2665			   value below.
2666
2667			   Notice that not doing this optimisation on the
2668			   first hash chain is critical. We must guarantee
2669			   that we have done at least one fcntl lock at the
2670			   start of a search to guarantee that memory is
2671			   coherent on SMP systems. If records are added by
2672			   others during the search then thats OK, and we
2673			   could possibly miss those with this trick, but we
2674			   could miss them anyway without this trick, so the
2675			   semantics don't change.
2676
2677			   With a non-indexed ldb search this trick gains us a
2678			   factor of around 80 in speed on a linux 2.6.x
2679			   system (testing using ldbtest).
2680			*/
2681			tdb->methods->next_hash_chain(tdb, &tlock->hash);
2682			if (tlock->hash == tdb->header.hash_size) {
2683				continue;
2684			}
2685		}
2686
2687		if (tdb_lock(tdb, tlock->hash, tlock->lock_rw) == -1)
2688			return -1;
2689
2690		/* No previous record?  Start at top of chain. */
2691		if (!tlock->off) {
2692			if (tdb_ofs_read(tdb, TDB_HASH_TOP(tlock->hash),
2693				     &tlock->off) == -1)
2694				goto fail;
2695		} else {
2696			/* Otherwise unlock the previous record. */
2697			if (tdb_unlock_record(tdb, tlock->off) != 0)
2698				goto fail;
2699		}
2700
2701		if (want_next) {
2702			/* We have offset of old record: grab next */
2703			if (tdb_rec_read(tdb, tlock->off, rec) == -1)
2704				goto fail;
2705			tlock->off = rec->next;
2706		}
2707
2708		/* Iterate through chain */
2709		while( tlock->off) {
2710			tdb_off_t current;
2711			if (tdb_rec_read(tdb, tlock->off, rec) == -1)
2712				goto fail;
2713
2714			/* Detect infinite loops. From "Shlomi Yaakobovich" <Shlomi@exanet.com>. */
2715			if (tlock->off == rec->next) {
2716				TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_next_lock: loop detected.\n"));
2717				goto fail;
2718			}
2719
2720			if (!TDB_DEAD(rec)) {
2721				/* Woohoo: we found one! */
2722				if (tdb_lock_record(tdb, tlock->off) != 0)
2723					goto fail;
2724				return tlock->off;
2725			}
2726
2727			/* Try to clean dead ones from old traverses */
2728			current = tlock->off;
2729			tlock->off = rec->next;
2730			if (!(tdb->read_only || tdb->traverse_read) &&
2731			    tdb_do_delete(tdb, current, rec) != 0)
2732				goto fail;
2733		}
2734		tdb_unlock(tdb, tlock->hash, tlock->lock_rw);
2735		want_next = 0;
2736	}
2737	/* We finished iteration without finding anything */
2738	return TDB_ERRCODE(TDB_SUCCESS, 0);
2739
2740 fail:
2741	tlock->off = 0;
2742	if (tdb_unlock(tdb, tlock->hash, tlock->lock_rw) != 0)
2743		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_next_lock: On error unlock failed!\n"));
2744	return -1;
2745}
2746
2747/* traverse the entire database - calling fn(tdb, key, data) on each element.
2748   return -1 on error or the record count traversed
2749   if fn is NULL then it is not called
2750   a non-zero return value from fn() indicates that the traversal should stop
2751  */
2752static int tdb_traverse_internal(struct tdb_context *tdb,
2753				 tdb_traverse_func fn, void *private_data,
2754				 struct tdb_traverse_lock *tl)
2755{
2756	TDB_DATA key, dbuf;
2757	struct list_struct rec;
2758	int ret, count = 0;
2759
2760	/* This was in the initializaton, above, but the IRIX compiler
2761	 * did not like it.  crh
2762	 */
2763	tl->next = tdb->travlocks.next;
2764
2765	/* fcntl locks don't stack: beware traverse inside traverse */
2766	tdb->travlocks.next = tl;
2767
2768	/* tdb_next_lock places locks on the record returned, and its chain */
2769	while ((ret = tdb_next_lock(tdb, tl, &rec)) > 0) {
2770		count++;
2771		/* now read the full record */
2772		key.dptr = tdb_alloc_read(tdb, tl->off + sizeof(rec),
2773					  rec.key_len + rec.data_len);
2774		if (!key.dptr) {
2775			ret = -1;
2776			if (tdb_unlock(tdb, tl->hash, tl->lock_rw) != 0)
2777				goto out;
2778			if (tdb_unlock_record(tdb, tl->off) != 0)
2779				TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_traverse: key.dptr == NULL and unlock_record failed!\n"));
2780			goto out;
2781		}
2782		key.dsize = rec.key_len;
2783		dbuf.dptr = key.dptr + rec.key_len;
2784		dbuf.dsize = rec.data_len;
2785
2786		/* Drop chain lock, call out */
2787		if (tdb_unlock(tdb, tl->hash, tl->lock_rw) != 0) {
2788			ret = -1;
2789			SAFE_FREE(key.dptr);
2790			goto out;
2791		}
2792		if (fn && fn(tdb, key, dbuf, private_data)) {
2793			/* They want us to terminate traversal */
2794			ret = count;
2795			if (tdb_unlock_record(tdb, tl->off) != 0) {
2796				TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_traverse: unlock_record failed!\n"));;
2797				ret = -1;
2798			}
2799			SAFE_FREE(key.dptr);
2800			goto out;
2801		}
2802		SAFE_FREE(key.dptr);
2803	}
2804out:
2805	tdb->travlocks.next = tl->next;
2806	if (ret < 0)
2807		return -1;
2808	else
2809		return count;
2810}
2811
2812
2813/*
2814  a write style traverse - temporarily marks the db read only
2815*/
2816int tdb_traverse_read(struct tdb_context *tdb,
2817		      tdb_traverse_func fn, void *private_data)
2818{
2819	struct tdb_traverse_lock tl = { NULL, 0, 0, F_RDLCK };
2820	int ret;
2821
2822	/* we need to get a read lock on the transaction lock here to
2823	   cope with the lock ordering semantics of solaris10 */
2824	if (tdb_transaction_lock(tdb, F_RDLCK)) {
2825		return -1;
2826	}
2827
2828	tdb->traverse_read++;
2829	ret = tdb_traverse_internal(tdb, fn, private_data, &tl);
2830	tdb->traverse_read--;
2831
2832	tdb_transaction_unlock(tdb);
2833
2834	return ret;
2835}
2836
2837/*
2838  a write style traverse - needs to get the transaction lock to
2839  prevent deadlocks
2840*/
2841int tdb_traverse(struct tdb_context *tdb,
2842		 tdb_traverse_func fn, void *private_data)
2843{
2844	struct tdb_traverse_lock tl = { NULL, 0, 0, F_WRLCK };
2845	int ret;
2846
2847	if (tdb->read_only || tdb->traverse_read) {
2848		return tdb_traverse_read(tdb, fn, private_data);
2849	}
2850
2851	if (tdb_transaction_lock(tdb, F_WRLCK)) {
2852		return -1;
2853	}
2854
2855	ret = tdb_traverse_internal(tdb, fn, private_data, &tl);
2856
2857	tdb_transaction_unlock(tdb);
2858
2859	return ret;
2860}
2861
2862
2863/* find the first entry in the database and return its key */
2864TDB_DATA tdb_firstkey(struct tdb_context *tdb)
2865{
2866	TDB_DATA key;
2867	struct list_struct rec;
2868
2869	/* release any old lock */
2870	if (tdb_unlock_record(tdb, tdb->travlocks.off) != 0)
2871		return tdb_null;
2872	tdb->travlocks.off = tdb->travlocks.hash = 0;
2873	tdb->travlocks.lock_rw = F_RDLCK;
2874
2875	/* Grab first record: locks chain and returned record. */
2876	if (tdb_next_lock(tdb, &tdb->travlocks, &rec) <= 0)
2877		return tdb_null;
2878	/* now read the key */
2879	key.dsize = rec.key_len;
2880	key.dptr =tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),key.dsize);
2881
2882	/* Unlock the hash chain of the record we just read. */
2883	if (tdb_unlock(tdb, tdb->travlocks.hash, tdb->travlocks.lock_rw) != 0)
2884		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_firstkey: error occurred while tdb_unlocking!\n"));
2885	return key;
2886}
2887
2888/* find the next entry in the database, returning its key */
2889TDB_DATA tdb_nextkey(struct tdb_context *tdb, TDB_DATA oldkey)
2890{
2891	u32 oldhash;
2892	TDB_DATA key = tdb_null;
2893	struct list_struct rec;
2894	unsigned char *k = NULL;
2895
2896	/* Is locked key the old key?  If so, traverse will be reliable. */
2897	if (tdb->travlocks.off) {
2898		if (tdb_lock(tdb,tdb->travlocks.hash,tdb->travlocks.lock_rw))
2899			return tdb_null;
2900		if (tdb_rec_read(tdb, tdb->travlocks.off, &rec) == -1
2901		    || !(k = tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),
2902					    rec.key_len))
2903		    || memcmp(k, oldkey.dptr, oldkey.dsize) != 0) {
2904			/* No, it wasn't: unlock it and start from scratch */
2905			if (tdb_unlock_record(tdb, tdb->travlocks.off) != 0) {
2906				SAFE_FREE(k);
2907				return tdb_null;
2908			}
2909			if (tdb_unlock(tdb, tdb->travlocks.hash, tdb->travlocks.lock_rw) != 0) {
2910				SAFE_FREE(k);
2911				return tdb_null;
2912			}
2913			tdb->travlocks.off = 0;
2914		}
2915
2916		SAFE_FREE(k);
2917	}
2918
2919	if (!tdb->travlocks.off) {
2920		/* No previous element: do normal find, and lock record */
2921		tdb->travlocks.off = tdb_find_lock_hash(tdb, oldkey, tdb->hash_fn(&oldkey), tdb->travlocks.lock_rw, &rec);
2922		if (!tdb->travlocks.off)
2923			return tdb_null;
2924		tdb->travlocks.hash = BUCKET(rec.full_hash);
2925		if (tdb_lock_record(tdb, tdb->travlocks.off) != 0) {
2926			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_nextkey: lock_record failed (%s)!\n", strerror(errno)));
2927			return tdb_null;
2928		}
2929	}
2930	oldhash = tdb->travlocks.hash;
2931
2932	/* Grab next record: locks chain and returned record,
2933	   unlocks old record */
2934	if (tdb_next_lock(tdb, &tdb->travlocks, &rec) > 0) {
2935		key.dsize = rec.key_len;
2936		key.dptr = tdb_alloc_read(tdb, tdb->travlocks.off+sizeof(rec),
2937					  key.dsize);
2938		/* Unlock the chain of this new record */
2939		if (tdb_unlock(tdb, tdb->travlocks.hash, tdb->travlocks.lock_rw) != 0)
2940			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
2941	}
2942	/* Unlock the chain of old record */
2943	if (tdb_unlock(tdb, BUCKET(oldhash), tdb->travlocks.lock_rw) != 0)
2944		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
2945	return key;
2946}
2947
2948/* file: dump.c */
2949
2950static tdb_off_t tdb_dump_record(struct tdb_context *tdb, int hash,
2951				 tdb_off_t offset)
2952{
2953	struct list_struct rec;
2954	tdb_off_t tailer_ofs, tailer;
2955
2956	if (tdb->methods->tdb_read(tdb, offset, (char *)&rec,
2957				   sizeof(rec), DOCONV()) == -1) {
2958		printf("ERROR: failed to read record at %u\n", offset);
2959		return 0;
2960	}
2961
2962	printf(" rec: hash=%d offset=0x%08x next=0x%08x rec_len=%d "
2963	       "key_len=%d data_len=%d full_hash=0x%x magic=0x%x\n",
2964	       hash, offset, rec.next, rec.rec_len, rec.key_len, rec.data_len,
2965	       rec.full_hash, rec.magic);
2966
2967	tailer_ofs = offset + sizeof(rec) + rec.rec_len - sizeof(tdb_off_t);
2968
2969	if (tdb_ofs_read(tdb, tailer_ofs, &tailer) == -1) {
2970		printf("ERROR: failed to read tailer at %u\n", tailer_ofs);
2971		return rec.next;
2972	}
2973
2974	if (tailer != rec.rec_len + sizeof(rec)) {
2975		printf("ERROR: tailer does not match record! tailer=%u totalsize=%u\n",
2976				(unsigned int)tailer, (unsigned int)(rec.rec_len + sizeof(rec)));
2977	}
2978	return rec.next;
2979}
2980
2981static int tdb_dump_chain(struct tdb_context *tdb, int i)
2982{
2983	tdb_off_t rec_ptr, top;
2984
2985	top = TDB_HASH_TOP(i);
2986
2987	if (tdb_lock(tdb, i, F_WRLCK) != 0)
2988		return -1;
2989
2990	if (tdb_ofs_read(tdb, top, &rec_ptr) == -1)
2991		return tdb_unlock(tdb, i, F_WRLCK);
2992
2993	if (rec_ptr)
2994		printf("hash=%d\n", i);
2995
2996	while (rec_ptr) {
2997		rec_ptr = tdb_dump_record(tdb, i, rec_ptr);
2998	}
2999
3000	return tdb_unlock(tdb, i, F_WRLCK);
3001}
3002
3003void tdb_dump_all(struct tdb_context *tdb)
3004{
3005	int i;
3006	for (i=0;i<tdb->header.hash_size;i++) {
3007		tdb_dump_chain(tdb, i);
3008	}
3009	printf("freelist:\n");
3010	tdb_dump_chain(tdb, -1);
3011}
3012
3013int tdb_printfreelist(struct tdb_context *tdb)
3014{
3015	int ret;
3016	long total_free = 0;
3017	tdb_off_t offset, rec_ptr;
3018	struct list_struct rec;
3019
3020	if ((ret = tdb_lock(tdb, -1, F_WRLCK)) != 0)
3021		return ret;
3022
3023	offset = FREELIST_TOP;
3024
3025	/* read in the freelist top */
3026	if (tdb_ofs_read(tdb, offset, &rec_ptr) == -1) {
3027		tdb_unlock(tdb, -1, F_WRLCK);
3028		return 0;
3029	}
3030
3031	printf("freelist top=[0x%08x]\n", rec_ptr );
3032	while (rec_ptr) {
3033		if (tdb->methods->tdb_read(tdb, rec_ptr, (char *)&rec,
3034					   sizeof(rec), DOCONV()) == -1) {
3035			tdb_unlock(tdb, -1, F_WRLCK);
3036			return -1;
3037		}
3038
3039		if (rec.magic != TDB_FREE_MAGIC) {
3040			printf("bad magic 0x%08x in free list\n", rec.magic);
3041			tdb_unlock(tdb, -1, F_WRLCK);
3042			return -1;
3043		}
3044
3045		printf("entry offset=[0x%08x], rec.rec_len = [0x%08x (%d)] (end = 0x%08x)\n",
3046		       rec_ptr, rec.rec_len, rec.rec_len, rec_ptr + rec.rec_len);
3047		total_free += rec.rec_len;
3048
3049		/* move to the next record */
3050		rec_ptr = rec.next;
3051	}
3052	printf("total rec_len = [0x%08x (%d)]\n", (int)total_free,
3053               (int)total_free);
3054
3055	return tdb_unlock(tdb, -1, F_WRLCK);
3056}
3057
3058/* file: tdb.c */
3059
3060TDB_DATA tdb_null;
3061
3062/*
3063  non-blocking increment of the tdb sequence number if the tdb has been opened using
3064  the TDB_SEQNUM flag
3065*/
3066void tdb_increment_seqnum_nonblock(struct tdb_context *tdb)
3067{
3068	tdb_off_t seqnum=0;
3069
3070	if (!(tdb->flags & TDB_SEQNUM)) {
3071		return;
3072	}
3073
3074	/* we ignore errors from this, as we have no sane way of
3075	   dealing with them.
3076	*/
3077	tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
3078	seqnum++;
3079	tdb_ofs_write(tdb, TDB_SEQNUM_OFS, &seqnum);
3080}
3081
3082/*
3083  increment the tdb sequence number if the tdb has been opened using
3084  the TDB_SEQNUM flag
3085*/
3086static void tdb_increment_seqnum(struct tdb_context *tdb)
3087{
3088	if (!(tdb->flags & TDB_SEQNUM)) {
3089		return;
3090	}
3091
3092	if (tdb_brlock(tdb, TDB_SEQNUM_OFS, F_WRLCK, F_SETLKW, 1, 1) != 0) {
3093		return;
3094	}
3095
3096	tdb_increment_seqnum_nonblock(tdb);
3097
3098	tdb_brlock(tdb, TDB_SEQNUM_OFS, F_UNLCK, F_SETLKW, 1, 1);
3099}
3100
3101static int tdb_key_compare(TDB_DATA key, TDB_DATA data, void *private_data)
3102{
3103	return memcmp(data.dptr, key.dptr, data.dsize);
3104}
3105
3106/* Returns 0 on fail.  On success, return offset of record, and fills
3107   in rec */
3108static tdb_off_t tdb_find(struct tdb_context *tdb, TDB_DATA key, u32 hash,
3109			struct list_struct *r)
3110{
3111	tdb_off_t rec_ptr;
3112
3113	/* read in the hash top */
3114	if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
3115		return 0;
3116
3117	/* keep looking until we find the right record */
3118	while (rec_ptr) {
3119		if (tdb_rec_read(tdb, rec_ptr, r) == -1)
3120			return 0;
3121
3122		if (!TDB_DEAD(r) && hash==r->full_hash
3123		    && key.dsize==r->key_len
3124		    && tdb_parse_data(tdb, key, rec_ptr + sizeof(*r),
3125				      r->key_len, tdb_key_compare,
3126				      NULL) == 0) {
3127			return rec_ptr;
3128		}
3129		rec_ptr = r->next;
3130	}
3131	return TDB_ERRCODE(TDB_ERR_NOEXIST, 0);
3132}
3133
3134/* As tdb_find, but if you succeed, keep the lock */
3135tdb_off_t tdb_find_lock_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash, int locktype,
3136			   struct list_struct *rec)
3137{
3138	u32 rec_ptr;
3139
3140	if (tdb_lock(tdb, BUCKET(hash), locktype) == -1)
3141		return 0;
3142	if (!(rec_ptr = tdb_find(tdb, key, hash, rec)))
3143		tdb_unlock(tdb, BUCKET(hash), locktype);
3144	return rec_ptr;
3145}
3146
3147
3148/* update an entry in place - this only works if the new data size
3149   is <= the old data size and the key exists.
3150   on failure return -1.
3151*/
3152static int tdb_update_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash, TDB_DATA dbuf)
3153{
3154	struct list_struct rec;
3155	tdb_off_t rec_ptr;
3156
3157	/* find entry */
3158	if (!(rec_ptr = tdb_find(tdb, key, hash, &rec)))
3159		return -1;
3160
3161	/* must be long enough key, data and tailer */
3162	if (rec.rec_len < key.dsize + dbuf.dsize + sizeof(tdb_off_t)) {
3163		tdb->ecode = TDB_SUCCESS; /* Not really an error */
3164		return -1;
3165	}
3166
3167	if (tdb->methods->tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len,
3168		      dbuf.dptr, dbuf.dsize) == -1)
3169		return -1;
3170
3171	if (dbuf.dsize != rec.data_len) {
3172		/* update size */
3173		rec.data_len = dbuf.dsize;
3174		return tdb_rec_write(tdb, rec_ptr, &rec);
3175	}
3176
3177	return 0;
3178}
3179
3180/* find an entry in the database given a key */
3181/* If an entry doesn't exist tdb_err will be set to
3182 * TDB_ERR_NOEXIST. If a key has no data attached
3183 * then the TDB_DATA will have zero length but
3184 * a non-zero pointer
3185 */
3186TDB_DATA tdb_fetch(struct tdb_context *tdb, TDB_DATA key)
3187{
3188	tdb_off_t rec_ptr;
3189	struct list_struct rec;
3190	TDB_DATA ret;
3191	u32 hash;
3192
3193	/* find which hash bucket it is in */
3194	hash = tdb->hash_fn(&key);
3195	if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec)))
3196		return tdb_null;
3197
3198	ret.dptr = tdb_alloc_read(tdb, rec_ptr + sizeof(rec) + rec.key_len,
3199				  rec.data_len);
3200	ret.dsize = rec.data_len;
3201	tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
3202	return ret;
3203}
3204
3205/*
3206 * Find an entry in the database and hand the record's data to a parsing
3207 * function. The parsing function is executed under the chain read lock, so it
3208 * should be fast and should not block on other syscalls.
3209 *
3210 * DONT CALL OTHER TDB CALLS FROM THE PARSER, THIS MIGHT LEAD TO SEGFAULTS.
3211 *
3212 * For mmapped tdb's that do not have a transaction open it points the parsing
3213 * function directly at the mmap area, it avoids the malloc/memcpy in this
3214 * case. If a transaction is open or no mmap is available, it has to do
3215 * malloc/read/parse/free.
3216 *
3217 * This is interesting for all readers of potentially large data structures in
3218 * the tdb records, ldb indexes being one example.
3219 */
3220
3221int tdb_parse_record(struct tdb_context *tdb, TDB_DATA key,
3222		     int (*parser)(TDB_DATA key, TDB_DATA data,
3223				   void *private_data),
3224		     void *private_data)
3225{
3226	tdb_off_t rec_ptr;
3227	struct list_struct rec;
3228	int ret;
3229	u32 hash;
3230
3231	/* find which hash bucket it is in */
3232	hash = tdb->hash_fn(&key);
3233
3234	if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec))) {
3235		return TDB_ERRCODE(TDB_ERR_NOEXIST, 0);
3236	}
3237
3238	ret = tdb_parse_data(tdb, key, rec_ptr + sizeof(rec) + rec.key_len,
3239			     rec.data_len, parser, private_data);
3240
3241	tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
3242
3243	return ret;
3244}
3245
3246/* check if an entry in the database exists
3247
3248   note that 1 is returned if the key is found and 0 is returned if not found
3249   this doesn't match the conventions in the rest of this module, but is
3250   compatible with gdbm
3251*/
3252static int tdb_exists_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash)
3253{
3254	struct list_struct rec;
3255
3256	if (tdb_find_lock_hash(tdb, key, hash, F_RDLCK, &rec) == 0)
3257		return 0;
3258	tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
3259	return 1;
3260}
3261
3262int tdb_exists(struct tdb_context *tdb, TDB_DATA key)
3263{
3264	u32 hash = tdb->hash_fn(&key);
3265	return tdb_exists_hash(tdb, key, hash);
3266}
3267
3268/* actually delete an entry in the database given the offset */
3269int tdb_do_delete(struct tdb_context *tdb, tdb_off_t rec_ptr, struct list_struct*rec)
3270{
3271	tdb_off_t last_ptr, i;
3272	struct list_struct lastrec;
3273
3274	if (tdb->read_only || tdb->traverse_read) return -1;
3275
3276	if (tdb_write_lock_record(tdb, rec_ptr) == -1) {
3277		/* Someone traversing here: mark it as dead */
3278		rec->magic = TDB_DEAD_MAGIC;
3279		return tdb_rec_write(tdb, rec_ptr, rec);
3280	}
3281	if (tdb_write_unlock_record(tdb, rec_ptr) != 0)
3282		return -1;
3283
3284	/* find previous record in hash chain */
3285	if (tdb_ofs_read(tdb, TDB_HASH_TOP(rec->full_hash), &i) == -1)
3286		return -1;
3287	for (last_ptr = 0; i != rec_ptr; last_ptr = i, i = lastrec.next)
3288		if (tdb_rec_read(tdb, i, &lastrec) == -1)
3289			return -1;
3290
3291	/* unlink it: next ptr is at start of record. */
3292	if (last_ptr == 0)
3293		last_ptr = TDB_HASH_TOP(rec->full_hash);
3294	if (tdb_ofs_write(tdb, last_ptr, &rec->next) == -1)
3295		return -1;
3296
3297	/* recover the space */
3298	if (tdb_free(tdb, rec_ptr, rec) == -1)
3299		return -1;
3300	return 0;
3301}
3302
3303static int tdb_count_dead(struct tdb_context *tdb, u32 hash)
3304{
3305	int res = 0;
3306	tdb_off_t rec_ptr;
3307	struct list_struct rec;
3308
3309	/* read in the hash top */
3310	if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
3311		return 0;
3312
3313	while (rec_ptr) {
3314		if (tdb_rec_read(tdb, rec_ptr, &rec) == -1)
3315			return 0;
3316
3317		if (rec.magic == TDB_DEAD_MAGIC) {
3318			res += 1;
3319		}
3320		rec_ptr = rec.next;
3321	}
3322	return res;
3323}
3324
3325/*
3326 * Purge all DEAD records from a hash chain
3327 */
3328static int tdb_purge_dead(struct tdb_context *tdb, u32 hash)
3329{
3330	int res = -1;
3331	struct list_struct rec;
3332	tdb_off_t rec_ptr;
3333
3334	if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
3335		return -1;
3336	}
3337
3338	/* read in the hash top */
3339	if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
3340		goto fail;
3341
3342	while (rec_ptr) {
3343		tdb_off_t next;
3344
3345		if (tdb_rec_read(tdb, rec_ptr, &rec) == -1) {
3346			goto fail;
3347		}
3348
3349		next = rec.next;
3350
3351		if (rec.magic == TDB_DEAD_MAGIC
3352		    && tdb_do_delete(tdb, rec_ptr, &rec) == -1) {
3353			goto fail;
3354		}
3355		rec_ptr = next;
3356	}
3357	res = 0;
3358 fail:
3359	tdb_unlock(tdb, -1, F_WRLCK);
3360	return res;
3361}
3362
3363/* delete an entry in the database given a key */
3364static int tdb_delete_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash)
3365{
3366	tdb_off_t rec_ptr;
3367	struct list_struct rec;
3368	int ret;
3369
3370	if (tdb->max_dead_records != 0) {
3371
3372		/*
3373		 * Allow for some dead records per hash chain, mainly for
3374		 * tdb's with a very high create/delete rate like locking.tdb.
3375		 */
3376
3377		if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
3378			return -1;
3379
3380		if (tdb_count_dead(tdb, hash) >= tdb->max_dead_records) {
3381			/*
3382			 * Don't let the per-chain freelist grow too large,
3383			 * delete all existing dead records
3384			 */
3385			tdb_purge_dead(tdb, hash);
3386		}
3387
3388		if (!(rec_ptr = tdb_find(tdb, key, hash, &rec))) {
3389			tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
3390			return -1;
3391		}
3392
3393		/*
3394		 * Just mark the record as dead.
3395		 */
3396		rec.magic = TDB_DEAD_MAGIC;
3397		ret = tdb_rec_write(tdb, rec_ptr, &rec);
3398	}
3399	else {
3400		if (!(rec_ptr = tdb_find_lock_hash(tdb, key, hash, F_WRLCK,
3401						   &rec)))
3402			return -1;
3403
3404		ret = tdb_do_delete(tdb, rec_ptr, &rec);
3405	}
3406
3407	if (ret == 0) {
3408		tdb_increment_seqnum(tdb);
3409	}
3410
3411	if (tdb_unlock(tdb, BUCKET(rec.full_hash), F_WRLCK) != 0)
3412		TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_delete: WARNING tdb_unlock failed!\n"));
3413	return ret;
3414}
3415
3416int tdb_delete(struct tdb_context *tdb, TDB_DATA key)
3417{
3418	u32 hash = tdb->hash_fn(&key);
3419	return tdb_delete_hash(tdb, key, hash);
3420}
3421
3422/*
3423 * See if we have a dead record around with enough space
3424 */
3425static tdb_off_t tdb_find_dead(struct tdb_context *tdb, u32 hash,
3426			       struct list_struct *r, tdb_len_t length)
3427{
3428	tdb_off_t rec_ptr;
3429
3430	/* read in the hash top */
3431	if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
3432		return 0;
3433
3434	/* keep looking until we find the right record */
3435	while (rec_ptr) {
3436		if (tdb_rec_read(tdb, rec_ptr, r) == -1)
3437			return 0;
3438
3439		if (TDB_DEAD(r) && r->rec_len >= length) {
3440			/*
3441			 * First fit for simple coding, TODO: change to best
3442			 * fit
3443			 */
3444			return rec_ptr;
3445		}
3446		rec_ptr = r->next;
3447	}
3448	return 0;
3449}
3450
3451/* store an element in the database, replacing any existing element
3452   with the same key
3453
3454   return 0 on success, -1 on failure
3455*/
3456int tdb_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
3457{
3458	struct list_struct rec;
3459	u32 hash;
3460	tdb_off_t rec_ptr;
3461	char *p = NULL;
3462	int ret = -1;
3463
3464	if (tdb->read_only || tdb->traverse_read) {
3465		tdb->ecode = TDB_ERR_RDONLY;
3466		return -1;
3467	}
3468
3469	/* find which hash bucket it is in */
3470	hash = tdb->hash_fn(&key);
3471	if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
3472		return -1;
3473
3474	/* check for it existing, on insert. */
3475	if (flag == TDB_INSERT) {
3476		if (tdb_exists_hash(tdb, key, hash)) {
3477			tdb->ecode = TDB_ERR_EXISTS;
3478			goto fail;
3479		}
3480	} else {
3481		/* first try in-place update, on modify or replace. */
3482		if (tdb_update_hash(tdb, key, hash, dbuf) == 0) {
3483			goto done;
3484		}
3485		if (tdb->ecode == TDB_ERR_NOEXIST &&
3486		    flag == TDB_MODIFY) {
3487			/* if the record doesn't exist and we are in TDB_MODIFY mode then
3488			 we should fail the store */
3489			goto fail;
3490		}
3491	}
3492	/* reset the error code potentially set by the tdb_update() */
3493	tdb->ecode = TDB_SUCCESS;
3494
3495	/* delete any existing record - if it doesn't exist we don't
3496           care.  Doing this first reduces fragmentation, and avoids
3497           coalescing with `allocated' block before it's updated. */
3498	if (flag != TDB_INSERT)
3499		tdb_delete_hash(tdb, key, hash);
3500
3501	/* Copy key+value *before* allocating free space in case malloc
3502	   fails and we are left with a dead spot in the tdb. */
3503
3504	if (!(p = (char *)malloc(key.dsize + dbuf.dsize))) {
3505		tdb->ecode = TDB_ERR_OOM;
3506		goto fail;
3507	}
3508
3509	memcpy(p, key.dptr, key.dsize);
3510	if (dbuf.dsize)
3511		memcpy(p+key.dsize, dbuf.dptr, dbuf.dsize);
3512
3513	if (tdb->max_dead_records != 0) {
3514		/*
3515		 * Allow for some dead records per hash chain, look if we can
3516		 * find one that can hold the new record. We need enough space
3517		 * for key, data and tailer. If we find one, we don't have to
3518		 * consult the central freelist.
3519		 */
3520		rec_ptr = tdb_find_dead(
3521			tdb, hash, &rec,
3522			key.dsize + dbuf.dsize + sizeof(tdb_off_t));
3523
3524		if (rec_ptr != 0) {
3525			rec.key_len = key.dsize;
3526			rec.data_len = dbuf.dsize;
3527			rec.full_hash = hash;
3528			rec.magic = TDB_MAGIC;
3529			if (tdb_rec_write(tdb, rec_ptr, &rec) == -1
3530			    || tdb->methods->tdb_write(
3531				    tdb, rec_ptr + sizeof(rec),
3532				    p, key.dsize + dbuf.dsize) == -1) {
3533				goto fail;
3534			}
3535			goto done;
3536		}
3537	}
3538
3539	/*
3540	 * We have to allocate some space from the freelist, so this means we
3541	 * have to lock it. Use the chance to purge all the DEAD records from
3542	 * the hash chain under the freelist lock.
3543	 */
3544
3545	if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
3546		goto fail;
3547	}
3548
3549	if ((tdb->max_dead_records != 0)
3550	    && (tdb_purge_dead(tdb, hash) == -1)) {
3551		tdb_unlock(tdb, -1, F_WRLCK);
3552		goto fail;
3553	}
3554
3555	/* we have to allocate some space */
3556	rec_ptr = tdb_allocate(tdb, key.dsize + dbuf.dsize, &rec);
3557
3558	tdb_unlock(tdb, -1, F_WRLCK);
3559
3560	if (rec_ptr == 0) {
3561		goto fail;
3562	}
3563
3564	/* Read hash top into next ptr */
3565	if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
3566		goto fail;
3567
3568	rec.key_len = key.dsize;
3569	rec.data_len = dbuf.dsize;
3570	rec.full_hash = hash;
3571	rec.magic = TDB_MAGIC;
3572
3573	/* write out and point the top of the hash chain at it */
3574	if (tdb_rec_write(tdb, rec_ptr, &rec) == -1
3575	    || tdb->methods->tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+dbuf.dsize)==-1
3576	    || tdb_ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
3577		/* Need to tdb_unallocate() here */
3578		goto fail;
3579	}
3580
3581 done:
3582	ret = 0;
3583 fail:
3584	if (ret == 0) {
3585		tdb_increment_seqnum(tdb);
3586	}
3587
3588	SAFE_FREE(p);
3589	tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
3590	return ret;
3591}
3592
3593
3594/* Append to an entry. Create if not exist. */
3595int tdb_append(struct tdb_context *tdb, TDB_DATA key, TDB_DATA new_dbuf)
3596{
3597	u32 hash;
3598	TDB_DATA dbuf;
3599	int ret = -1;
3600
3601	/* find which hash bucket it is in */
3602	hash = tdb->hash_fn(&key);
3603	if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
3604		return -1;
3605
3606	dbuf = tdb_fetch(tdb, key);
3607
3608	if (dbuf.dptr == NULL) {
3609		dbuf.dptr = (unsigned char *)malloc(new_dbuf.dsize);
3610	} else {
3611		unsigned char *new_dptr = (unsigned char *)realloc(dbuf.dptr,
3612						     dbuf.dsize + new_dbuf.dsize);
3613		if (new_dptr == NULL) {
3614			free(dbuf.dptr);
3615		}
3616		dbuf.dptr = new_dptr;
3617	}
3618
3619	if (dbuf.dptr == NULL) {
3620		tdb->ecode = TDB_ERR_OOM;
3621		goto failed;
3622	}
3623
3624	memcpy(dbuf.dptr + dbuf.dsize, new_dbuf.dptr, new_dbuf.dsize);
3625	dbuf.dsize += new_dbuf.dsize;
3626
3627	ret = tdb_store(tdb, key, dbuf, 0);
3628
3629failed:
3630	tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
3631	SAFE_FREE(dbuf.dptr);
3632	return ret;
3633}
3634
3635
3636/*
3637  return the name of the current tdb file
3638  useful for external logging functions
3639*/
3640const char *tdb_name(struct tdb_context *tdb)
3641{
3642	return tdb->name;
3643}
3644
3645/*
3646  return the underlying file descriptor being used by tdb, or -1
3647  useful for external routines that want to check the device/inode
3648  of the fd
3649*/
3650int tdb_fd(struct tdb_context *tdb)
3651{
3652	return tdb->fd;
3653}
3654
3655/*
3656  return the current logging function
3657  useful for external tdb routines that wish to log tdb errors
3658*/
3659tdb_log_func tdb_log_fn(struct tdb_context *tdb)
3660{
3661	return tdb->log.log_fn;
3662}
3663
3664
3665/*
3666  get the tdb sequence number. Only makes sense if the writers opened
3667  with TDB_SEQNUM set. Note that this sequence number will wrap quite
3668  quickly, so it should only be used for a 'has something changed'
3669  test, not for code that relies on the count of the number of changes
3670  made. If you want a counter then use a tdb record.
3671
3672  The aim of this sequence number is to allow for a very lightweight
3673  test of a possible tdb change.
3674*/
3675int tdb_get_seqnum(struct tdb_context *tdb)
3676{
3677	tdb_off_t seqnum=0;
3678
3679	tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
3680	return seqnum;
3681}
3682
3683int tdb_hash_size(struct tdb_context *tdb)
3684{
3685	return tdb->header.hash_size;
3686}
3687
3688size_t tdb_map_size(struct tdb_context *tdb)
3689{
3690	return tdb->map_size;
3691}
3692
3693int tdb_get_flags(struct tdb_context *tdb)
3694{
3695	return tdb->flags;
3696}
3697
3698
3699/*
3700  enable sequence number handling on an open tdb
3701*/
3702void tdb_enable_seqnum(struct tdb_context *tdb)
3703{
3704	tdb->flags |= TDB_SEQNUM;
3705}
3706
3707/* file: open.c */
3708
3709/* all contexts, to ensure no double-opens (fcntl locks don't nest!) */
3710static struct tdb_context *tdbs = NULL;
3711
3712
3713/* This is based on the hash algorithm from gdbm */
3714static unsigned int default_tdb_hash(TDB_DATA *key)
3715{
3716	u32 value;	/* Used to compute the hash value.  */
3717	u32   i;	/* Used to cycle through random values. */
3718
3719	/* Set the initial value from the key size. */
3720	for (value = 0x238F13AF * key->dsize, i=0; i < key->dsize; i++)
3721		value = (value + (key->dptr[i] << (i*5 % 24)));
3722
3723	return (1103515243 * value + 12345);
3724}
3725
3726
3727/* initialise a new database with a specified hash size */
3728static int tdb_new_database(struct tdb_context *tdb, int hash_size)
3729{
3730	struct tdb_header *newdb;
3731	int size, ret = -1;
3732
3733	/* We make it up in memory, then write it out if not internal */
3734	size = sizeof(struct tdb_header) + (hash_size+1)*sizeof(tdb_off_t);
3735	if (!(newdb = (struct tdb_header *)calloc(size, 1)))
3736		return TDB_ERRCODE(TDB_ERR_OOM, -1);
3737
3738	/* Fill in the header */
3739	newdb->version = TDB_VERSION;
3740	newdb->hash_size = hash_size;
3741	if (tdb->flags & TDB_INTERNAL) {
3742		tdb->map_size = size;
3743		tdb->map_ptr = (char *)newdb;
3744		memcpy(&tdb->header, newdb, sizeof(tdb->header));
3745		/* Convert the `ondisk' version if asked. */
3746		CONVERT(*newdb);
3747		return 0;
3748	}
3749	if (lseek(tdb->fd, 0, SEEK_SET) == -1)
3750		goto fail;
3751
3752	if (ftruncate(tdb->fd, 0) == -1)
3753		goto fail;
3754
3755	/* This creates an endian-converted header, as if read from disk */
3756	CONVERT(*newdb);
3757	memcpy(&tdb->header, newdb, sizeof(tdb->header));
3758	/* Don't endian-convert the magic food! */
3759	memcpy(newdb->magic_food, TDB_MAGIC_FOOD, strlen(TDB_MAGIC_FOOD)+1);
3760	if (write(tdb->fd, newdb, size) != size) {
3761		ret = -1;
3762	} else {
3763		ret = 0;
3764	}
3765
3766  fail:
3767	SAFE_FREE(newdb);
3768	return ret;
3769}
3770
3771
3772
3773static int tdb_already_open(dev_t device,
3774			    ino_t ino)
3775{
3776	struct tdb_context *i;
3777
3778	for (i = tdbs; i; i = i->next) {
3779		if (i->device == device && i->inode == ino) {
3780			return 1;
3781		}
3782	}
3783
3784	return 0;
3785}
3786
3787/* open the database, creating it if necessary
3788
3789   The open_flags and mode are passed straight to the open call on the
3790   database file. A flags value of O_WRONLY is invalid. The hash size
3791   is advisory, use zero for a default value.
3792
3793   Return is NULL on error, in which case errno is also set.  Don't
3794   try to call tdb_error or tdb_errname, just do strerror(errno).
3795
3796   @param name may be NULL for internal databases. */
3797struct tdb_context *tdb_open(const char *name, int hash_size, int tdb_flags,
3798		      int open_flags, mode_t mode)
3799{
3800	return tdb_open_ex(name, hash_size, tdb_flags, open_flags, mode, NULL, NULL);
3801}
3802
3803/* a default logging function */
3804static void null_log_fn(struct tdb_context *tdb, enum tdb_debug_level level, const char *fmt, ...) PRINTF_ATTRIBUTE(3, 4);
3805static void null_log_fn(struct tdb_context *tdb, enum tdb_debug_level level, const char *fmt, ...)
3806{
3807}
3808
3809
3810struct tdb_context *tdb_open_ex(const char *name, int hash_size, int tdb_flags,
3811				int open_flags, mode_t mode,
3812				const struct tdb_logging_context *log_ctx,
3813				tdb_hash_func hash_fn)
3814{
3815	struct tdb_context *tdb;
3816	struct stat st;
3817	int rev = 0, locked = 0;
3818	unsigned char *vp;
3819	u32 vertest;
3820
3821	if (!(tdb = (struct tdb_context *)calloc(1, sizeof *tdb))) {
3822		/* Can't log this */
3823		errno = ENOMEM;
3824		goto fail;
3825	}
3826	tdb_io_init(tdb);
3827	tdb->fd = -1;
3828	tdb->name = NULL;
3829	tdb->map_ptr = NULL;
3830	tdb->flags = tdb_flags;
3831	tdb->open_flags = open_flags;
3832	if (log_ctx) {
3833		tdb->log = *log_ctx;
3834	} else {
3835		tdb->log.log_fn = null_log_fn;
3836		tdb->log.log_private = NULL;
3837	}
3838	tdb->hash_fn = hash_fn ? hash_fn : default_tdb_hash;
3839
3840	/* cache the page size */
3841	tdb->page_size = getpagesize();
3842	if (tdb->page_size <= 0) {
3843		tdb->page_size = 0x2000;
3844	}
3845
3846	if ((open_flags & O_ACCMODE) == O_WRONLY) {
3847		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: can't open tdb %s write-only\n",
3848			 name));
3849		errno = EINVAL;
3850		goto fail;
3851	}
3852
3853	if (hash_size == 0)
3854		hash_size = DEFAULT_HASH_SIZE;
3855	if ((open_flags & O_ACCMODE) == O_RDONLY) {
3856		tdb->read_only = 1;
3857		/* read only databases don't do locking or clear if first */
3858		tdb->flags |= TDB_NOLOCK;
3859		tdb->flags &= ~TDB_CLEAR_IF_FIRST;
3860	}
3861
3862	/* internal databases don't mmap or lock, and start off cleared */
3863	if (tdb->flags & TDB_INTERNAL) {
3864		tdb->flags |= (TDB_NOLOCK | TDB_NOMMAP);
3865		tdb->flags &= ~TDB_CLEAR_IF_FIRST;
3866		if (tdb_new_database(tdb, hash_size) != 0) {
3867			TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: tdb_new_database failed!"));
3868			goto fail;
3869		}
3870		goto internal;
3871	}
3872
3873	if ((tdb->fd = open(name, open_flags, mode)) == -1) {
3874		TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_open_ex: could not open file %s: %s\n",
3875			 name, strerror(errno)));
3876		goto fail;	/* errno set by open(2) */
3877	}
3878
3879	/* ensure there is only one process initialising at once */
3880	if (tdb->methods->tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
3881		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: failed to get global lock on %s: %s\n",
3882			 name, strerror(errno)));
3883		goto fail;	/* errno set by tdb_brlock */
3884	}
3885
3886	/* we need to zero database if we are the only one with it open */
3887	if ((tdb_flags & TDB_CLEAR_IF_FIRST) &&
3888	    (locked = (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_WRLCK, F_SETLK, 0, 1) == 0))) {
3889		open_flags |= O_CREAT;
3890		if (ftruncate(tdb->fd, 0) == -1) {
3891			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_open_ex: "
3892				 "failed to truncate %s: %s\n",
3893				 name, strerror(errno)));
3894			goto fail; /* errno set by ftruncate */
3895		}
3896	}
3897
3898	if (read(tdb->fd, &tdb->header, sizeof(tdb->header)) != sizeof(tdb->header)
3899	    || strcmp(tdb->header.magic_food, TDB_MAGIC_FOOD) != 0
3900	    || (tdb->header.version != TDB_VERSION
3901		&& !(rev = (tdb->header.version==TDB_BYTEREV(TDB_VERSION))))) {
3902		/* its not a valid database - possibly initialise it */
3903		if (!(open_flags & O_CREAT) || tdb_new_database(tdb, hash_size) == -1) {
3904			errno = EIO; /* ie bad format or something */
3905			goto fail;
3906		}
3907		rev = (tdb->flags & TDB_CONVERT);
3908	}
3909	vp = (unsigned char *)&tdb->header.version;
3910	vertest = (((u32)vp[0]) << 24) | (((u32)vp[1]) << 16) |
3911		  (((u32)vp[2]) << 8) | (u32)vp[3];
3912	tdb->flags |= (vertest==TDB_VERSION) ? TDB_BIGENDIAN : 0;
3913	if (!rev)
3914		tdb->flags &= ~TDB_CONVERT;
3915	else {
3916		tdb->flags |= TDB_CONVERT;
3917		tdb_convert(&tdb->header, sizeof(tdb->header));
3918	}
3919	if (fstat(tdb->fd, &st) == -1)
3920		goto fail;
3921
3922	if (tdb->header.rwlocks != 0) {
3923		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: spinlocks no longer supported\n"));
3924		goto fail;
3925	}
3926
3927	/* Is it already in the open list?  If so, fail. */
3928	if (tdb_already_open(st.st_dev, st.st_ino)) {
3929		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: "
3930			 "%s (%d,%d) is already open in this process\n",
3931			 name, (int)st.st_dev, (int)st.st_ino));
3932		errno = EBUSY;
3933		goto fail;
3934	}
3935
3936	if (!(tdb->name = (char *)strdup(name))) {
3937		errno = ENOMEM;
3938		goto fail;
3939	}
3940
3941	tdb->map_size = st.st_size;
3942	tdb->device = st.st_dev;
3943	tdb->inode = st.st_ino;
3944	tdb->max_dead_records = 0;
3945	tdb_mmap(tdb);
3946	if (locked) {
3947		if (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_UNLCK, F_SETLK, 0, 1) == -1) {
3948			TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: "
3949				 "failed to take ACTIVE_LOCK on %s: %s\n",
3950				 name, strerror(errno)));
3951			goto fail;
3952		}
3953
3954	}
3955
3956	/* We always need to do this if the CLEAR_IF_FIRST flag is set, even if
3957	   we didn't get the initial exclusive lock as we need to let all other
3958	   users know we're using it. */
3959
3960	if (tdb_flags & TDB_CLEAR_IF_FIRST) {
3961		/* leave this lock in place to indicate it's in use */
3962		if (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0, 1) == -1)
3963			goto fail;
3964	}
3965
3966	/* if needed, run recovery */
3967	if (tdb_transaction_recover(tdb) == -1) {
3968		goto fail;
3969	}
3970
3971 internal:
3972	/* Internal (memory-only) databases skip all the code above to
3973	 * do with disk files, and resume here by releasing their
3974	 * global lock and hooking into the active list. */
3975	if (tdb->methods->tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1) == -1)
3976		goto fail;
3977	tdb->next = tdbs;
3978	tdbs = tdb;
3979	return tdb;
3980
3981 fail:
3982	{ int save_errno = errno;
3983
3984	if (!tdb)
3985		return NULL;
3986
3987	if (tdb->map_ptr) {
3988		if (tdb->flags & TDB_INTERNAL)
3989			SAFE_FREE(tdb->map_ptr);
3990		else
3991			tdb_munmap(tdb);
3992	}
3993	SAFE_FREE(tdb->name);
3994	if (tdb->fd != -1)
3995		if (close(tdb->fd) != 0)
3996			TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: failed to close tdb->fd on error!\n"));
3997	SAFE_FREE(tdb);
3998	errno = save_errno;
3999	return NULL;
4000	}
4001}
4002
4003/*
4004 * Set the maximum number of dead records per hash chain
4005 */
4006
4007void tdb_set_max_dead(struct tdb_context *tdb, int max_dead)
4008{
4009	tdb->max_dead_records = max_dead;
4010}
4011
4012/**
4013 * Close a database.
4014 *
4015 * @returns -1 for error; 0 for success.
4016 **/
4017int tdb_close(struct tdb_context *tdb)
4018{
4019	struct tdb_context **i;
4020	int ret = 0;
4021
4022	if (tdb->transaction) {
4023		tdb_transaction_cancel(tdb);
4024	}
4025
4026	if (tdb->map_ptr) {
4027		if (tdb->flags & TDB_INTERNAL)
4028			SAFE_FREE(tdb->map_ptr);
4029		else
4030			tdb_munmap(tdb);
4031	}
4032	SAFE_FREE(tdb->name);
4033	if (tdb->fd != -1)
4034		ret = close(tdb->fd);
4035	SAFE_FREE(tdb->lockrecs);
4036
4037	/* Remove from contexts list */
4038	for (i = &tdbs; *i; i = &(*i)->next) {
4039		if (*i == tdb) {
4040			*i = tdb->next;
4041			break;
4042		}
4043	}
4044
4045	memset(tdb, 0, sizeof(*tdb));
4046	SAFE_FREE(tdb);
4047
4048	return ret;
4049}
4050
4051/* register a loging function */
4052void tdb_set_logging_function(struct tdb_context *tdb,
4053                              const struct tdb_logging_context *log_ctx)
4054{
4055        tdb->log = *log_ctx;
4056}
4057
4058void *tdb_get_logging_private(struct tdb_context *tdb)
4059{
4060	return tdb->log.log_private;
4061}
4062
4063/* reopen a tdb - this can be used after a fork to ensure that we have an independent
4064   seek pointer from our parent and to re-establish locks */
4065int tdb_reopen(struct tdb_context *tdb)
4066{
4067	struct stat st;
4068
4069	if (tdb->flags & TDB_INTERNAL) {
4070		return 0; /* Nothing to do. */
4071	}
4072
4073	if (tdb->num_locks != 0 || tdb->global_lock.count) {
4074		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_reopen: reopen not allowed with locks held\n"));
4075		goto fail;
4076	}
4077
4078	if (tdb->transaction != 0) {
4079		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_reopen: reopen not allowed inside a transaction\n"));
4080		goto fail;
4081	}
4082
4083	if (tdb_munmap(tdb) != 0) {
4084		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: munmap failed (%s)\n", strerror(errno)));
4085		goto fail;
4086	}
4087	if (close(tdb->fd) != 0)
4088		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: WARNING closing tdb->fd failed!\n"));
4089	tdb->fd = open(tdb->name, tdb->open_flags & ~(O_CREAT|O_TRUNC), 0);
4090	if (tdb->fd == -1) {
4091		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: open failed (%s)\n", strerror(errno)));
4092		goto fail;
4093	}
4094	if ((tdb->flags & TDB_CLEAR_IF_FIRST) &&
4095	    (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0, 1) == -1)) {
4096		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: failed to obtain active lock\n"));
4097		goto fail;
4098	}
4099	if (fstat(tdb->fd, &st) != 0) {
4100		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: fstat failed (%s)\n", strerror(errno)));
4101		goto fail;
4102	}
4103	if (st.st_ino != tdb->inode || st.st_dev != tdb->device) {
4104		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: file dev/inode has changed!\n"));
4105		goto fail;
4106	}
4107	tdb_mmap(tdb);
4108
4109	return 0;
4110
4111fail:
4112	tdb_close(tdb);
4113	return -1;
4114}
4115
4116/* reopen all tdb's */
4117int tdb_reopen_all(int parent_longlived)
4118{
4119	struct tdb_context *tdb;
4120
4121	for (tdb=tdbs; tdb; tdb = tdb->next) {
4122		/*
4123		 * If the parent is longlived (ie. a
4124		 * parent daemon architecture), we know
4125		 * it will keep it's active lock on a
4126		 * tdb opened with CLEAR_IF_FIRST. Thus
4127		 * for child processes we don't have to
4128		 * add an active lock. This is essential
4129		 * to improve performance on systems that
4130		 * keep POSIX locks as a non-scalable data
4131		 * structure in the kernel.
4132		 */
4133		if (parent_longlived) {
4134			/* Ensure no clear-if-first. */
4135			tdb->flags &= ~TDB_CLEAR_IF_FIRST;
4136		}
4137
4138		if (tdb_reopen(tdb) != 0)
4139			return -1;
4140	}
4141
4142	return 0;
4143}
4144