tdb.c revision 24d364fc43601ea3d2e01cc506633302fa091d8f
1/*
2URL: svn://svnanon.samba.org/samba/branches/SAMBA_4_0/source/lib/tdb/common
3Rev: 23590
4Last Changed Date: 2007-06-22 13:36:10 -0400 (Fri, 22 Jun 2007)
5*/
6 /*
7   trivial database library - standalone version
8
9   Copyright (C) Andrew Tridgell              1999-2005
10   Copyright (C) Jeremy Allison               2000-2006
11   Copyright (C) Paul `Rusty' Russell         2000
12
13     ** NOTE! The following LGPL license applies to the tdb
14     ** library. This does NOT imply that all of Samba is released
15     ** under the LGPL
16
17   This library is free software; you can redistribute it and/or
18   modify it under the terms of the GNU Lesser General Public
19   License as published by the Free Software Foundation; either
20   version 2 of the License, or (at your option) any later version.
21
22   This library is distributed in the hope that it will be useful,
23   but WITHOUT ANY WARRANTY; without even the implied warranty of
24   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
25   Lesser General Public License for more details.
26
27   You should have received a copy of the GNU Lesser General Public
28   License along with this library; if not, write to the Free Software
29   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
30*/
31
32#ifdef CONFIG_STAND_ALONE
33#define HAVE_MMAP
34#define HAVE_STRDUP
35#define HAVE_SYS_MMAN_H
36#define HAVE_UTIME_H
37#define HAVE_UTIME
38#endif
39#define _XOPEN_SOURCE 600
40
41#include "config.h"
42#include <unistd.h>
43#include <stdio.h>
44#include <stdlib.h>
45#include <stdarg.h>
46#include <stddef.h>
47#include <errno.h>
48#include <string.h>
49#ifdef HAVE_SYS_SELECT_H
50#include <sys/select.h>
51#endif
52#include <sys/time.h>
53#include <sys/types.h>
54#include <time.h>
55#ifdef HAVE_UTIME_H
56#include <utime.h>
57#endif
58#include <sys/stat.h>
59#include <sys/file.h>
60#include <fcntl.h>
61
62#ifdef HAVE_SYS_MMAN_H
63#include <sys/mman.h>
64#endif
65
66#ifndef MAP_FILE
67#define MAP_FILE 0
68#endif
69
70#ifndef MAP_FAILED
71#define MAP_FAILED ((void *)-1)
72#endif
73
74#ifndef HAVE_STRDUP
75#define strdup rep_strdup
76static char *rep_strdup(const char *s)
77{
78	char *ret;
79	int length;
80	if (!s)
81		return NULL;
82
83	if (!length)
84		length = strlen(s);
85
86	ret = malloc(length + 1);
87	if (ret) {
88		strncpy(ret, s, length);
89		ret[length] = '\0';
90	}
91	return ret;
92}
93#endif
94
95#ifndef PRINTF_ATTRIBUTE
96#if (__GNUC__ >= 3) && (__GNUC_MINOR__ >= 1 )
97/** Use gcc attribute to check printf fns.  a1 is the 1-based index of
98 * the parameter containing the format, and a2 the index of the first
99 * argument. Note that some gcc 2.x versions don't handle this
100 * properly **/
101#define PRINTF_ATTRIBUTE(a1, a2) __attribute__ ((format (__printf__, a1, a2)))
102#else
103#define PRINTF_ATTRIBUTE(a1, a2)
104#endif
105#endif
106
107typedef int bool;
108
109#include "tdb.h"
110
111static TDB_DATA tdb_null;
112
113#ifndef u32
114#define u32 unsigned
115#endif
116
117typedef u32 tdb_len_t;
118typedef u32 tdb_off_t;
119
120#ifndef offsetof
121#define offsetof(t,f) ((unsigned int)&((t *)0)->f)
122#endif
123
124#define TDB_MAGIC_FOOD "TDB file\n"
125#define TDB_VERSION (0x26011967 + 6)
126#define TDB_MAGIC (0x26011999U)
127#define TDB_FREE_MAGIC (~TDB_MAGIC)
128#define TDB_DEAD_MAGIC (0xFEE1DEAD)
129#define TDB_RECOVERY_MAGIC (0xf53bc0e7U)
130#define TDB_ALIGNMENT 4
131#define MIN_REC_SIZE (2*sizeof(struct list_struct) + TDB_ALIGNMENT)
132#define DEFAULT_HASH_SIZE 131
133#define FREELIST_TOP (sizeof(struct tdb_header))
134#define TDB_ALIGN(x,a) (((x) + (a)-1) & ~((a)-1))
135#define TDB_BYTEREV(x) (((((x)&0xff)<<24)|((x)&0xFF00)<<8)|(((x)>>8)&0xFF00)|((x)>>24))
136#define TDB_DEAD(r) ((r)->magic == TDB_DEAD_MAGIC)
137#define TDB_BAD_MAGIC(r) ((r)->magic != TDB_MAGIC && !TDB_DEAD(r))
138#define TDB_HASH_TOP(hash) (FREELIST_TOP + (BUCKET(hash)+1)*sizeof(tdb_off_t))
139#define TDB_HASHTABLE_SIZE(tdb) ((tdb->header.hash_size+1)*sizeof(tdb_off_t))
140#define TDB_DATA_START(hash_size) TDB_HASH_TOP(hash_size-1)
141#define TDB_RECOVERY_HEAD offsetof(struct tdb_header, recovery_start)
142#define TDB_SEQNUM_OFS    offsetof(struct tdb_header, sequence_number)
143#define TDB_PAD_BYTE 0x42
144#define TDB_PAD_U32  0x42424242
145
146/* NB assumes there is a local variable called "tdb" that is the
147 * current context, also takes doubly-parenthesized print-style
148 * argument. */
149#define TDB_LOG(x) tdb->log.log_fn x
150
151/* lock offsets */
152#define GLOBAL_LOCK      0
153#define ACTIVE_LOCK      4
154#define TRANSACTION_LOCK 8
155
156/* free memory if the pointer is valid and zero the pointer */
157#ifndef SAFE_FREE
158#define SAFE_FREE(x) do { if ((x) != NULL) {free(x); (x)=NULL;} } while(0)
159#endif
160
161#define BUCKET(hash) ((hash) % tdb->header.hash_size)
162
163#define DOCONV() (tdb->flags & TDB_CONVERT)
164#define CONVERT(x) (DOCONV() ? tdb_convert(&x, sizeof(x)) : &x)
165
166
167/* the body of the database is made of one list_struct for the free space
168   plus a separate data list for each hash value */
169struct list_struct {
170	tdb_off_t next; /* offset of the next record in the list */
171	tdb_len_t rec_len; /* total byte length of record */
172	tdb_len_t key_len; /* byte length of key */
173	tdb_len_t data_len; /* byte length of data */
174	u32 full_hash; /* the full 32 bit hash of the key */
175	u32 magic;   /* try to catch errors */
176	/* the following union is implied:
177		union {
178			char record[rec_len];
179			struct {
180				char key[key_len];
181				char data[data_len];
182			}
183			u32 totalsize; (tailer)
184		}
185	*/
186};
187
188
189/* this is stored at the front of every database */
190struct tdb_header {
191	char magic_food[32]; /* for /etc/magic */
192	u32 version; /* version of the code */
193	u32 hash_size; /* number of hash entries */
194	tdb_off_t rwlocks; /* obsolete - kept to detect old formats */
195	tdb_off_t recovery_start; /* offset of transaction recovery region */
196	tdb_off_t sequence_number; /* used when TDB_SEQNUM is set */
197	tdb_off_t reserved[29];
198};
199
200struct tdb_lock_type {
201	int list;
202	u32 count;
203	u32 ltype;
204};
205
206struct tdb_traverse_lock {
207	struct tdb_traverse_lock *next;
208	u32 off;
209	u32 hash;
210	int lock_rw;
211};
212
213
214struct tdb_methods {
215	int (*tdb_read)(struct tdb_context *, tdb_off_t , void *, tdb_len_t , int );
216	int (*tdb_write)(struct tdb_context *, tdb_off_t, const void *, tdb_len_t);
217	void (*next_hash_chain)(struct tdb_context *, u32 *);
218	int (*tdb_oob)(struct tdb_context *, tdb_off_t , int );
219	int (*tdb_expand_file)(struct tdb_context *, tdb_off_t , tdb_off_t );
220	int (*tdb_brlock)(struct tdb_context *, tdb_off_t , int, int, int, size_t);
221};
222
223struct tdb_context {
224	char *name; /* the name of the database */
225	void *map_ptr; /* where it is currently mapped */
226	int fd; /* open file descriptor for the database */
227	tdb_len_t map_size; /* how much space has been mapped */
228	int read_only; /* opened read-only */
229	int traverse_read; /* read-only traversal */
230	struct tdb_lock_type global_lock;
231	int num_lockrecs;
232	struct tdb_lock_type *lockrecs; /* only real locks, all with count>0 */
233	enum TDB_ERROR ecode; /* error code for last tdb error */
234	struct tdb_header header; /* a cached copy of the header */
235	u32 flags; /* the flags passed to tdb_open */
236	struct tdb_traverse_lock travlocks; /* current traversal locks */
237	struct tdb_context *next; /* all tdbs to avoid multiple opens */
238	dev_t device;	/* uniquely identifies this tdb */
239	ino_t inode;	/* uniquely identifies this tdb */
240	struct tdb_logging_context log;
241	unsigned int (*hash_fn)(TDB_DATA *key);
242	int open_flags; /* flags used in the open - needed by reopen */
243	unsigned int num_locks; /* number of chain locks held */
244	const struct tdb_methods *methods;
245	struct tdb_transaction *transaction;
246	int page_size;
247	int max_dead_records;
248	bool have_transaction_lock;
249};
250
251
252/*
253  internal prototypes
254*/
255static int tdb_munmap(struct tdb_context *tdb);
256static void tdb_mmap(struct tdb_context *tdb);
257static int tdb_lock(struct tdb_context *tdb, int list, int ltype);
258static int tdb_unlock(struct tdb_context *tdb, int list, int ltype);
259static int tdb_brlock(struct tdb_context *tdb, tdb_off_t offset, int rw_type, int lck_type, int probe, size_t len);
260static int tdb_transaction_lock(struct tdb_context *tdb, int ltype);
261static int tdb_transaction_unlock(struct tdb_context *tdb);
262static int tdb_brlock_upgrade(struct tdb_context *tdb, tdb_off_t offset, size_t len);
263static int tdb_write_lock_record(struct tdb_context *tdb, tdb_off_t off);
264static int tdb_write_unlock_record(struct tdb_context *tdb, tdb_off_t off);
265static int tdb_ofs_read(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
266static int tdb_ofs_write(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
267static void *tdb_convert(void *buf, u32 size);
268static int tdb_free(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec);
269static tdb_off_t tdb_allocate(struct tdb_context *tdb, tdb_len_t length, struct list_struct *rec);
270static int tdb_ofs_read(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
271static int tdb_ofs_write(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
272static int tdb_lock_record(struct tdb_context *tdb, tdb_off_t off);
273static int tdb_unlock_record(struct tdb_context *tdb, tdb_off_t off);
274static int tdb_rec_read(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec);
275static int tdb_rec_write(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec);
276static int tdb_do_delete(struct tdb_context *tdb, tdb_off_t rec_ptr, struct list_struct *rec);
277static unsigned char *tdb_alloc_read(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t len);
278static int tdb_parse_data(struct tdb_context *tdb, TDB_DATA key,
279		   tdb_off_t offset, tdb_len_t len,
280		   int (*parser)(TDB_DATA key, TDB_DATA data,
281				 void *private_data),
282		   void *private_data);
283static tdb_off_t tdb_find_lock_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash, int locktype,
284			   struct list_struct *rec);
285static void tdb_io_init(struct tdb_context *tdb);
286static int tdb_expand(struct tdb_context *tdb, tdb_off_t size);
287static int tdb_rec_free_read(struct tdb_context *tdb, tdb_off_t off,
288		      struct list_struct *rec);
289
290
291/* file: error.c */
292
293enum TDB_ERROR tdb_error(struct tdb_context *tdb)
294{
295	return tdb->ecode;
296}
297
298static struct tdb_errname {
299	enum TDB_ERROR ecode; const char *estring;
300} emap[] = { {TDB_SUCCESS, "Success"},
301	     {TDB_ERR_CORRUPT, "Corrupt database"},
302	     {TDB_ERR_IO, "IO Error"},
303	     {TDB_ERR_LOCK, "Locking error"},
304	     {TDB_ERR_OOM, "Out of memory"},
305	     {TDB_ERR_EXISTS, "Record exists"},
306	     {TDB_ERR_NOLOCK, "Lock exists on other keys"},
307	     {TDB_ERR_EINVAL, "Invalid parameter"},
308	     {TDB_ERR_NOEXIST, "Record does not exist"},
309	     {TDB_ERR_RDONLY, "write not permitted"} };
310
311/* Error string for the last tdb error */
312const char *tdb_errorstr(struct tdb_context *tdb)
313{
314	u32 i;
315	for (i = 0; i < sizeof(emap) / sizeof(struct tdb_errname); i++)
316		if (tdb->ecode == emap[i].ecode)
317			return emap[i].estring;
318	return "Invalid error code";
319}
320
321/* file: lock.c */
322
323#define TDB_MARK_LOCK 0x80000000
324
325/* a byte range locking function - return 0 on success
326   this functions locks/unlocks 1 byte at the specified offset.
327
328   On error, errno is also set so that errors are passed back properly
329   through tdb_open().
330
331   note that a len of zero means lock to end of file
332*/
333int tdb_brlock(struct tdb_context *tdb, tdb_off_t offset,
334	       int rw_type, int lck_type, int probe, size_t len)
335{
336	struct flock fl;
337	int ret;
338
339	if (tdb->flags & TDB_NOLOCK) {
340		return 0;
341	}
342
343	if ((rw_type == F_WRLCK) && (tdb->read_only || tdb->traverse_read)) {
344		tdb->ecode = TDB_ERR_RDONLY;
345		return -1;
346	}
347
348	fl.l_type = rw_type;
349	fl.l_whence = SEEK_SET;
350	fl.l_start = offset;
351	fl.l_len = len;
352	fl.l_pid = 0;
353
354	do {
355		ret = fcntl(tdb->fd,lck_type,&fl);
356	} while (ret == -1 && errno == EINTR);
357
358	if (ret == -1) {
359		/* Generic lock error. errno set by fcntl.
360		 * EAGAIN is an expected return from non-blocking
361		 * locks. */
362		if (!probe && lck_type != F_SETLK) {
363			/* Ensure error code is set for log fun to examine. */
364			tdb->ecode = TDB_ERR_LOCK;
365			TDB_LOG((tdb, TDB_DEBUG_TRACE,"tdb_brlock failed (fd=%d) at offset %d rw_type=%d lck_type=%d len=%d\n",
366				 tdb->fd, offset, rw_type, lck_type, (int)len));
367		}
368		return TDB_ERRCODE(TDB_ERR_LOCK, -1);
369	}
370	return 0;
371}
372
373
374/*
375  upgrade a read lock to a write lock. This needs to be handled in a
376  special way as some OSes (such as solaris) have too conservative
377  deadlock detection and claim a deadlock when progress can be
378  made. For those OSes we may loop for a while.
379*/
380int tdb_brlock_upgrade(struct tdb_context *tdb, tdb_off_t offset, size_t len)
381{
382	int count = 1000;
383	while (count--) {
384		struct timeval tv;
385		if (tdb_brlock(tdb, offset, F_WRLCK, F_SETLKW, 1, len) == 0) {
386			return 0;
387		}
388		if (errno != EDEADLK) {
389			break;
390		}
391		/* sleep for as short a time as we can - more portable than usleep() */
392		tv.tv_sec = 0;
393		tv.tv_usec = 1;
394		select(0, NULL, NULL, NULL, &tv);
395	}
396	TDB_LOG((tdb, TDB_DEBUG_TRACE,"tdb_brlock_upgrade failed at offset %d\n", offset));
397	return -1;
398}
399
400
401/* lock a list in the database. list -1 is the alloc list */
402static int _tdb_lock(struct tdb_context *tdb, int list, int ltype, int op)
403{
404	struct tdb_lock_type *new_lck;
405	int i;
406	bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
407
408	ltype &= ~TDB_MARK_LOCK;
409
410	/* a global lock allows us to avoid per chain locks */
411	if (tdb->global_lock.count &&
412	    (ltype == tdb->global_lock.ltype || ltype == F_RDLCK)) {
413		return 0;
414	}
415
416	if (tdb->global_lock.count) {
417		return TDB_ERRCODE(TDB_ERR_LOCK, -1);
418	}
419
420	if (list < -1 || list >= (int)tdb->header.hash_size) {
421		TDB_LOG((tdb, TDB_DEBUG_ERROR,"tdb_lock: invalid list %d for ltype=%d\n",
422			   list, ltype));
423		return -1;
424	}
425	if (tdb->flags & TDB_NOLOCK)
426		return 0;
427
428	for (i=0; i<tdb->num_lockrecs; i++) {
429		if (tdb->lockrecs[i].list == list) {
430			if (tdb->lockrecs[i].count == 0) {
431				/*
432				 * Can't happen, see tdb_unlock(). It should
433				 * be an assert.
434				 */
435				TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lock: "
436					 "lck->count == 0 for list %d", list));
437			}
438			/*
439			 * Just increment the in-memory struct, posix locks
440			 * don't stack.
441			 */
442			tdb->lockrecs[i].count++;
443			return 0;
444		}
445	}
446
447	new_lck = (struct tdb_lock_type *)realloc(
448		tdb->lockrecs,
449		sizeof(*tdb->lockrecs) * (tdb->num_lockrecs+1));
450	if (new_lck == NULL) {
451		errno = ENOMEM;
452		return -1;
453	}
454	tdb->lockrecs = new_lck;
455
456	/* Since fcntl locks don't nest, we do a lock for the first one,
457	   and simply bump the count for future ones */
458	if (!mark_lock &&
459	    tdb->methods->tdb_brlock(tdb,FREELIST_TOP+4*list, ltype, op,
460				     0, 1)) {
461		return -1;
462	}
463
464	tdb->num_locks++;
465
466	tdb->lockrecs[tdb->num_lockrecs].list = list;
467	tdb->lockrecs[tdb->num_lockrecs].count = 1;
468	tdb->lockrecs[tdb->num_lockrecs].ltype = ltype;
469	tdb->num_lockrecs += 1;
470
471	return 0;
472}
473
474/* lock a list in the database. list -1 is the alloc list */
475int tdb_lock(struct tdb_context *tdb, int list, int ltype)
476{
477	int ret;
478	ret = _tdb_lock(tdb, list, ltype, F_SETLKW);
479	if (ret) {
480		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lock failed on list %d "
481			 "ltype=%d (%s)\n",  list, ltype, strerror(errno)));
482	}
483	return ret;
484}
485
486/* lock a list in the database. list -1 is the alloc list. non-blocking lock */
487int tdb_lock_nonblock(struct tdb_context *tdb, int list, int ltype)
488{
489	return _tdb_lock(tdb, list, ltype, F_SETLK);
490}
491
492
493/* unlock the database: returns void because it's too late for errors. */
494	/* changed to return int it may be interesting to know there
495	   has been an error  --simo */
496int tdb_unlock(struct tdb_context *tdb, int list, int ltype)
497{
498	int ret = -1;
499	int i;
500	struct tdb_lock_type *lck = NULL;
501	bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
502
503	ltype &= ~TDB_MARK_LOCK;
504
505	/* a global lock allows us to avoid per chain locks */
506	if (tdb->global_lock.count &&
507	    (ltype == tdb->global_lock.ltype || ltype == F_RDLCK)) {
508		return 0;
509	}
510
511	if (tdb->global_lock.count) {
512		return TDB_ERRCODE(TDB_ERR_LOCK, -1);
513	}
514
515	if (tdb->flags & TDB_NOLOCK)
516		return 0;
517
518	/* Sanity checks */
519	if (list < -1 || list >= (int)tdb->header.hash_size) {
520		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: list %d invalid (%d)\n", list, tdb->header.hash_size));
521		return ret;
522	}
523
524	for (i=0; i<tdb->num_lockrecs; i++) {
525		if (tdb->lockrecs[i].list == list) {
526			lck = &tdb->lockrecs[i];
527			break;
528		}
529	}
530
531	if ((lck == NULL) || (lck->count == 0)) {
532		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: count is 0\n"));
533		return -1;
534	}
535
536	if (lck->count > 1) {
537		lck->count--;
538		return 0;
539	}
540
541	/*
542	 * This lock has count==1 left, so we need to unlock it in the
543	 * kernel. We don't bother with decrementing the in-memory array
544	 * element, we're about to overwrite it with the last array element
545	 * anyway.
546	 */
547
548	if (mark_lock) {
549		ret = 0;
550	} else {
551		ret = tdb->methods->tdb_brlock(tdb, FREELIST_TOP+4*list, F_UNLCK,
552					       F_SETLKW, 0, 1);
553	}
554	tdb->num_locks--;
555
556	/*
557	 * Shrink the array by overwriting the element just unlocked with the
558	 * last array element.
559	 */
560
561	if (tdb->num_lockrecs > 1) {
562		*lck = tdb->lockrecs[tdb->num_lockrecs-1];
563	}
564	tdb->num_lockrecs -= 1;
565
566	/*
567	 * We don't bother with realloc when the array shrinks, but if we have
568	 * a completely idle tdb we should get rid of the locked array.
569	 */
570
571	if (tdb->num_lockrecs == 0) {
572		SAFE_FREE(tdb->lockrecs);
573	}
574
575	if (ret)
576		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: An error occurred unlocking!\n"));
577	return ret;
578}
579
580/*
581  get the transaction lock
582 */
583int tdb_transaction_lock(struct tdb_context *tdb, int ltype)
584{
585	if (tdb->have_transaction_lock || tdb->global_lock.count) {
586		return 0;
587	}
588	if (tdb->methods->tdb_brlock(tdb, TRANSACTION_LOCK, ltype,
589				     F_SETLKW, 0, 1) == -1) {
590		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_lock: failed to get transaction lock\n"));
591		tdb->ecode = TDB_ERR_LOCK;
592		return -1;
593	}
594	tdb->have_transaction_lock = 1;
595	return 0;
596}
597
598/*
599  release the transaction lock
600 */
601int tdb_transaction_unlock(struct tdb_context *tdb)
602{
603	int ret;
604	if (!tdb->have_transaction_lock) {
605		return 0;
606	}
607	ret = tdb->methods->tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
608	if (ret == 0) {
609		tdb->have_transaction_lock = 0;
610	}
611	return ret;
612}
613
614
615
616
617/* lock/unlock entire database */
618static int _tdb_lockall(struct tdb_context *tdb, int ltype, int op)
619{
620	bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
621
622	ltype &= ~TDB_MARK_LOCK;
623
624	/* There are no locks on read-only dbs */
625	if (tdb->read_only || tdb->traverse_read)
626		return TDB_ERRCODE(TDB_ERR_LOCK, -1);
627
628	if (tdb->global_lock.count && tdb->global_lock.ltype == ltype) {
629		tdb->global_lock.count++;
630		return 0;
631	}
632
633	if (tdb->global_lock.count) {
634		/* a global lock of a different type exists */
635		return TDB_ERRCODE(TDB_ERR_LOCK, -1);
636	}
637
638	if (tdb->num_locks != 0) {
639		/* can't combine global and chain locks */
640		return TDB_ERRCODE(TDB_ERR_LOCK, -1);
641	}
642
643	if (!mark_lock &&
644	    tdb->methods->tdb_brlock(tdb, FREELIST_TOP, ltype, op,
645				     0, 4*tdb->header.hash_size)) {
646		if (op == F_SETLKW) {
647			TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lockall failed (%s)\n", strerror(errno)));
648		}
649		return -1;
650	}
651
652	tdb->global_lock.count = 1;
653	tdb->global_lock.ltype = ltype;
654
655	return 0;
656}
657
658
659
660/* unlock entire db */
661static int _tdb_unlockall(struct tdb_context *tdb, int ltype)
662{
663	bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
664
665	ltype &= ~TDB_MARK_LOCK;
666
667	/* There are no locks on read-only dbs */
668	if (tdb->read_only || tdb->traverse_read) {
669		return TDB_ERRCODE(TDB_ERR_LOCK, -1);
670	}
671
672	if (tdb->global_lock.ltype != ltype || tdb->global_lock.count == 0) {
673		return TDB_ERRCODE(TDB_ERR_LOCK, -1);
674	}
675
676	if (tdb->global_lock.count > 1) {
677		tdb->global_lock.count--;
678		return 0;
679	}
680
681	if (!mark_lock &&
682	    tdb->methods->tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW,
683				     0, 4*tdb->header.hash_size)) {
684		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlockall failed (%s)\n", strerror(errno)));
685		return -1;
686	}
687
688	tdb->global_lock.count = 0;
689	tdb->global_lock.ltype = 0;
690
691	return 0;
692}
693
694/* lock entire database with write lock */
695int tdb_lockall(struct tdb_context *tdb)
696{
697	return _tdb_lockall(tdb, F_WRLCK, F_SETLKW);
698}
699
700/* lock entire database with write lock - mark only */
701int tdb_lockall_mark(struct tdb_context *tdb)
702{
703	return _tdb_lockall(tdb, F_WRLCK | TDB_MARK_LOCK, F_SETLKW);
704}
705
706/* unlock entire database with write lock - unmark only */
707int tdb_lockall_unmark(struct tdb_context *tdb)
708{
709	return _tdb_unlockall(tdb, F_WRLCK | TDB_MARK_LOCK);
710}
711
712/* lock entire database with write lock - nonblocking varient */
713int tdb_lockall_nonblock(struct tdb_context *tdb)
714{
715	return _tdb_lockall(tdb, F_WRLCK, F_SETLK);
716}
717
718/* unlock entire database with write lock */
719int tdb_unlockall(struct tdb_context *tdb)
720{
721	return _tdb_unlockall(tdb, F_WRLCK);
722}
723
724/* lock entire database with read lock */
725int tdb_lockall_read(struct tdb_context *tdb)
726{
727	return _tdb_lockall(tdb, F_RDLCK, F_SETLKW);
728}
729
730/* lock entire database with read lock - nonblock varient */
731int tdb_lockall_read_nonblock(struct tdb_context *tdb)
732{
733	return _tdb_lockall(tdb, F_RDLCK, F_SETLK);
734}
735
736/* unlock entire database with read lock */
737int tdb_unlockall_read(struct tdb_context *tdb)
738{
739	return _tdb_unlockall(tdb, F_RDLCK);
740}
741
742/* lock/unlock one hash chain. This is meant to be used to reduce
743   contention - it cannot guarantee how many records will be locked */
744int tdb_chainlock(struct tdb_context *tdb, TDB_DATA key)
745{
746	return tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
747}
748
749/* lock/unlock one hash chain, non-blocking. This is meant to be used
750   to reduce contention - it cannot guarantee how many records will be
751   locked */
752int tdb_chainlock_nonblock(struct tdb_context *tdb, TDB_DATA key)
753{
754	return tdb_lock_nonblock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
755}
756
757/* mark a chain as locked without actually locking it. Warning! use with great caution! */
758int tdb_chainlock_mark(struct tdb_context *tdb, TDB_DATA key)
759{
760	return tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK | TDB_MARK_LOCK);
761}
762
763/* unmark a chain as locked without actually locking it. Warning! use with great caution! */
764int tdb_chainlock_unmark(struct tdb_context *tdb, TDB_DATA key)
765{
766	return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK | TDB_MARK_LOCK);
767}
768
769int tdb_chainunlock(struct tdb_context *tdb, TDB_DATA key)
770{
771	return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
772}
773
774int tdb_chainlock_read(struct tdb_context *tdb, TDB_DATA key)
775{
776	return tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_RDLCK);
777}
778
779int tdb_chainunlock_read(struct tdb_context *tdb, TDB_DATA key)
780{
781	return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_RDLCK);
782}
783
784
785
786/* record lock stops delete underneath */
787int tdb_lock_record(struct tdb_context *tdb, tdb_off_t off)
788{
789	return off ? tdb->methods->tdb_brlock(tdb, off, F_RDLCK, F_SETLKW, 0, 1) : 0;
790}
791
792/*
793  Write locks override our own fcntl readlocks, so check it here.
794  Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
795  an error to fail to get the lock here.
796*/
797int tdb_write_lock_record(struct tdb_context *tdb, tdb_off_t off)
798{
799	struct tdb_traverse_lock *i;
800	for (i = &tdb->travlocks; i; i = i->next)
801		if (i->off == off)
802			return -1;
803	return tdb->methods->tdb_brlock(tdb, off, F_WRLCK, F_SETLK, 1, 1);
804}
805
806/*
807  Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
808  an error to fail to get the lock here.
809*/
810int tdb_write_unlock_record(struct tdb_context *tdb, tdb_off_t off)
811{
812	return tdb->methods->tdb_brlock(tdb, off, F_UNLCK, F_SETLK, 0, 1);
813}
814
815/* fcntl locks don't stack: avoid unlocking someone else's */
816int tdb_unlock_record(struct tdb_context *tdb, tdb_off_t off)
817{
818	struct tdb_traverse_lock *i;
819	u32 count = 0;
820
821	if (off == 0)
822		return 0;
823	for (i = &tdb->travlocks; i; i = i->next)
824		if (i->off == off)
825			count++;
826	return (count == 1 ? tdb->methods->tdb_brlock(tdb, off, F_UNLCK, F_SETLKW, 0, 1) : 0);
827}
828
829/* file: io.c */
830
831/* check for an out of bounds access - if it is out of bounds then
832   see if the database has been expanded by someone else and expand
833   if necessary
834   note that "len" is the minimum length needed for the db
835*/
836static int tdb_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
837{
838	struct stat st;
839	if (len <= tdb->map_size)
840		return 0;
841	if (tdb->flags & TDB_INTERNAL) {
842		if (!probe) {
843			/* Ensure ecode is set for log fn. */
844			tdb->ecode = TDB_ERR_IO;
845			TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_oob len %d beyond internal malloc size %d\n",
846				 (int)len, (int)tdb->map_size));
847		}
848		return TDB_ERRCODE(TDB_ERR_IO, -1);
849	}
850
851	if (fstat(tdb->fd, &st) == -1) {
852		return TDB_ERRCODE(TDB_ERR_IO, -1);
853	}
854
855	if (st.st_size < (size_t)len) {
856		if (!probe) {
857			/* Ensure ecode is set for log fn. */
858			tdb->ecode = TDB_ERR_IO;
859			TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_oob len %d beyond eof at %d\n",
860				 (int)len, (int)st.st_size));
861		}
862		return TDB_ERRCODE(TDB_ERR_IO, -1);
863	}
864
865	/* Unmap, update size, remap */
866	if (tdb_munmap(tdb) == -1)
867		return TDB_ERRCODE(TDB_ERR_IO, -1);
868	tdb->map_size = st.st_size;
869	tdb_mmap(tdb);
870	return 0;
871}
872
873/* write a lump of data at a specified offset */
874static int tdb_write(struct tdb_context *tdb, tdb_off_t off,
875		     const void *buf, tdb_len_t len)
876{
877	if (len == 0) {
878		return 0;
879	}
880
881	if (tdb->read_only || tdb->traverse_read) {
882		tdb->ecode = TDB_ERR_RDONLY;
883		return -1;
884	}
885
886	if (tdb->methods->tdb_oob(tdb, off + len, 0) != 0)
887		return -1;
888
889	if (tdb->map_ptr) {
890		memcpy(off + (char *)tdb->map_ptr, buf, len);
891	} else if (pwrite(tdb->fd, buf, len, off) != (ssize_t)len) {
892		/* Ensure ecode is set for log fn. */
893		tdb->ecode = TDB_ERR_IO;
894		TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_write failed at %d len=%d (%s)\n",
895			   off, len, strerror(errno)));
896		return TDB_ERRCODE(TDB_ERR_IO, -1);
897	}
898	return 0;
899}
900
901/* Endian conversion: we only ever deal with 4 byte quantities */
902void *tdb_convert(void *buf, u32 size)
903{
904	u32 i, *p = (u32 *)buf;
905	for (i = 0; i < size / 4; i++)
906		p[i] = TDB_BYTEREV(p[i]);
907	return buf;
908}
909
910
911/* read a lump of data at a specified offset, maybe convert */
912static int tdb_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
913		    tdb_len_t len, int cv)
914{
915	if (tdb->methods->tdb_oob(tdb, off + len, 0) != 0) {
916		return -1;
917	}
918
919	if (tdb->map_ptr) {
920		memcpy(buf, off + (char *)tdb->map_ptr, len);
921	} else {
922		ssize_t ret = pread(tdb->fd, buf, len, off);
923		if (ret != (ssize_t)len) {
924			/* Ensure ecode is set for log fn. */
925			tdb->ecode = TDB_ERR_IO;
926			TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_read failed at %d "
927				 "len=%d ret=%d (%s) map_size=%d\n",
928				 (int)off, (int)len, (int)ret, strerror(errno),
929				 (int)tdb->map_size));
930			return TDB_ERRCODE(TDB_ERR_IO, -1);
931		}
932	}
933	if (cv) {
934		tdb_convert(buf, len);
935	}
936	return 0;
937}
938
939
940
941/*
942  do an unlocked scan of the hash table heads to find the next non-zero head. The value
943  will then be confirmed with the lock held
944*/
945static void tdb_next_hash_chain(struct tdb_context *tdb, u32 *chain)
946{
947	u32 h = *chain;
948	if (tdb->map_ptr) {
949		for (;h < tdb->header.hash_size;h++) {
950			if (0 != *(u32 *)(TDB_HASH_TOP(h) + (unsigned char *)tdb->map_ptr)) {
951				break;
952			}
953		}
954	} else {
955		u32 off=0;
956		for (;h < tdb->header.hash_size;h++) {
957			if (tdb_ofs_read(tdb, TDB_HASH_TOP(h), &off) != 0 || off != 0) {
958				break;
959			}
960		}
961	}
962	(*chain) = h;
963}
964
965
966int tdb_munmap(struct tdb_context *tdb)
967{
968	if (tdb->flags & TDB_INTERNAL)
969		return 0;
970
971#ifdef HAVE_MMAP
972	if (tdb->map_ptr) {
973		int ret = munmap(tdb->map_ptr, tdb->map_size);
974		if (ret != 0)
975			return ret;
976	}
977#endif
978	tdb->map_ptr = NULL;
979	return 0;
980}
981
982void tdb_mmap(struct tdb_context *tdb)
983{
984	if (tdb->flags & TDB_INTERNAL)
985		return;
986
987#ifdef HAVE_MMAP
988	if (!(tdb->flags & TDB_NOMMAP)) {
989		tdb->map_ptr = mmap(NULL, tdb->map_size,
990				    PROT_READ|(tdb->read_only? 0:PROT_WRITE),
991				    MAP_SHARED|MAP_FILE, tdb->fd, 0);
992
993		/*
994		 * NB. When mmap fails it returns MAP_FAILED *NOT* NULL !!!!
995		 */
996
997		if (tdb->map_ptr == MAP_FAILED) {
998			tdb->map_ptr = NULL;
999			TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_mmap failed for size %d (%s)\n",
1000				 tdb->map_size, strerror(errno)));
1001		}
1002	} else {
1003		tdb->map_ptr = NULL;
1004	}
1005#else
1006	tdb->map_ptr = NULL;
1007#endif
1008}
1009
1010/* expand a file.  we prefer to use ftruncate, as that is what posix
1011  says to use for mmap expansion */
1012static int tdb_expand_file(struct tdb_context *tdb, tdb_off_t size, tdb_off_t addition)
1013{
1014	char buf[1024];
1015
1016	if (tdb->read_only || tdb->traverse_read) {
1017		tdb->ecode = TDB_ERR_RDONLY;
1018		return -1;
1019	}
1020
1021	if (ftruncate(tdb->fd, size+addition) == -1) {
1022		char b = 0;
1023		if (pwrite(tdb->fd,  &b, 1, (size+addition) - 1) != 1) {
1024			TDB_LOG((tdb, TDB_DEBUG_FATAL, "expand_file to %d failed (%s)\n",
1025				 size+addition, strerror(errno)));
1026			return -1;
1027		}
1028	}
1029
1030	/* now fill the file with something. This ensures that the
1031	   file isn't sparse, which would be very bad if we ran out of
1032	   disk. This must be done with write, not via mmap */
1033	memset(buf, TDB_PAD_BYTE, sizeof(buf));
1034	while (addition) {
1035		int n = addition>sizeof(buf)?sizeof(buf):addition;
1036		int ret = pwrite(tdb->fd, buf, n, size);
1037		if (ret != n) {
1038			TDB_LOG((tdb, TDB_DEBUG_FATAL, "expand_file write of %d failed (%s)\n",
1039				   n, strerror(errno)));
1040			return -1;
1041		}
1042		addition -= n;
1043		size += n;
1044	}
1045	return 0;
1046}
1047
1048
1049/* expand the database at least size bytes by expanding the underlying
1050   file and doing the mmap again if necessary */
1051int tdb_expand(struct tdb_context *tdb, tdb_off_t size)
1052{
1053	struct list_struct rec;
1054	tdb_off_t offset;
1055
1056	if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
1057		TDB_LOG((tdb, TDB_DEBUG_ERROR, "lock failed in tdb_expand\n"));
1058		return -1;
1059	}
1060
1061	/* must know about any previous expansions by another process */
1062	tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
1063
1064	/* always make room for at least 10 more records, and round
1065           the database up to a multiple of the page size */
1066	size = TDB_ALIGN(tdb->map_size + size*10, tdb->page_size) - tdb->map_size;
1067
1068	if (!(tdb->flags & TDB_INTERNAL))
1069		tdb_munmap(tdb);
1070
1071	/*
1072	 * We must ensure the file is unmapped before doing this
1073	 * to ensure consistency with systems like OpenBSD where
1074	 * writes and mmaps are not consistent.
1075	 */
1076
1077	/* expand the file itself */
1078	if (!(tdb->flags & TDB_INTERNAL)) {
1079		if (tdb->methods->tdb_expand_file(tdb, tdb->map_size, size) != 0)
1080			goto fail;
1081	}
1082
1083	tdb->map_size += size;
1084
1085	if (tdb->flags & TDB_INTERNAL) {
1086		char *new_map_ptr = (char *)realloc(tdb->map_ptr,
1087						    tdb->map_size);
1088		if (!new_map_ptr) {
1089			tdb->map_size -= size;
1090			goto fail;
1091		}
1092		tdb->map_ptr = new_map_ptr;
1093	} else {
1094		/*
1095		 * We must ensure the file is remapped before adding the space
1096		 * to ensure consistency with systems like OpenBSD where
1097		 * writes and mmaps are not consistent.
1098		 */
1099
1100		/* We're ok if the mmap fails as we'll fallback to read/write */
1101		tdb_mmap(tdb);
1102	}
1103
1104	/* form a new freelist record */
1105	memset(&rec,'\0',sizeof(rec));
1106	rec.rec_len = size - sizeof(rec);
1107
1108	/* link it into the free list */
1109	offset = tdb->map_size - size;
1110	if (tdb_free(tdb, offset, &rec) == -1)
1111		goto fail;
1112
1113	tdb_unlock(tdb, -1, F_WRLCK);
1114	return 0;
1115 fail:
1116	tdb_unlock(tdb, -1, F_WRLCK);
1117	return -1;
1118}
1119
1120/* read/write a tdb_off_t */
1121int tdb_ofs_read(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d)
1122{
1123	return tdb->methods->tdb_read(tdb, offset, (char*)d, sizeof(*d), DOCONV());
1124}
1125
1126int tdb_ofs_write(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d)
1127{
1128	tdb_off_t off = *d;
1129	return tdb->methods->tdb_write(tdb, offset, CONVERT(off), sizeof(*d));
1130}
1131
1132
1133/* read a lump of data, allocating the space for it */
1134unsigned char *tdb_alloc_read(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t len)
1135{
1136	unsigned char *buf;
1137
1138	/* some systems don't like zero length malloc */
1139	if (len == 0) {
1140		len = 1;
1141	}
1142
1143	if (!(buf = (unsigned char *)malloc(len))) {
1144		/* Ensure ecode is set for log fn. */
1145		tdb->ecode = TDB_ERR_OOM;
1146		TDB_LOG((tdb, TDB_DEBUG_ERROR,"tdb_alloc_read malloc failed len=%d (%s)\n",
1147			   len, strerror(errno)));
1148		return TDB_ERRCODE(TDB_ERR_OOM, buf);
1149	}
1150	if (tdb->methods->tdb_read(tdb, offset, buf, len, 0) == -1) {
1151		SAFE_FREE(buf);
1152		return NULL;
1153	}
1154	return buf;
1155}
1156
1157/* Give a piece of tdb data to a parser */
1158
1159int tdb_parse_data(struct tdb_context *tdb, TDB_DATA key,
1160		   tdb_off_t offset, tdb_len_t len,
1161		   int (*parser)(TDB_DATA key, TDB_DATA data,
1162				 void *private_data),
1163		   void *private_data)
1164{
1165	TDB_DATA data;
1166	int result;
1167
1168	data.dsize = len;
1169
1170	if ((tdb->transaction == NULL) && (tdb->map_ptr != NULL)) {
1171		/*
1172		 * Optimize by avoiding the malloc/memcpy/free, point the
1173		 * parser directly at the mmap area.
1174		 */
1175		if (tdb->methods->tdb_oob(tdb, offset+len, 0) != 0) {
1176			return -1;
1177		}
1178		data.dptr = offset + (unsigned char *)tdb->map_ptr;
1179		return parser(key, data, private_data);
1180	}
1181
1182	if (!(data.dptr = tdb_alloc_read(tdb, offset, len))) {
1183		return -1;
1184	}
1185
1186	result = parser(key, data, private_data);
1187	free(data.dptr);
1188	return result;
1189}
1190
1191/* read/write a record */
1192int tdb_rec_read(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec)
1193{
1194	if (tdb->methods->tdb_read(tdb, offset, rec, sizeof(*rec),DOCONV()) == -1)
1195		return -1;
1196	if (TDB_BAD_MAGIC(rec)) {
1197		/* Ensure ecode is set for log fn. */
1198		tdb->ecode = TDB_ERR_CORRUPT;
1199		TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_rec_read bad magic 0x%x at offset=%d\n", rec->magic, offset));
1200		return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
1201	}
1202	return tdb->methods->tdb_oob(tdb, rec->next+sizeof(*rec), 0);
1203}
1204
1205int tdb_rec_write(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec)
1206{
1207	struct list_struct r = *rec;
1208	return tdb->methods->tdb_write(tdb, offset, CONVERT(r), sizeof(r));
1209}
1210
1211static const struct tdb_methods io_methods = {
1212	tdb_read,
1213	tdb_write,
1214	tdb_next_hash_chain,
1215	tdb_oob,
1216	tdb_expand_file,
1217	tdb_brlock
1218};
1219
1220/*
1221  initialise the default methods table
1222*/
1223void tdb_io_init(struct tdb_context *tdb)
1224{
1225	tdb->methods = &io_methods;
1226}
1227
1228/* file: transaction.c */
1229
1230/*
1231  transaction design:
1232
1233  - only allow a single transaction at a time per database. This makes
1234    using the transaction API simpler, as otherwise the caller would
1235    have to cope with temporary failures in transactions that conflict
1236    with other current transactions
1237
1238  - keep the transaction recovery information in the same file as the
1239    database, using a special 'transaction recovery' record pointed at
1240    by the header. This removes the need for extra journal files as
1241    used by some other databases
1242
1243  - dynamically allocated the transaction recover record, re-using it
1244    for subsequent transactions. If a larger record is needed then
1245    tdb_free() the old record to place it on the normal tdb freelist
1246    before allocating the new record
1247
1248  - during transactions, keep a linked list of writes all that have
1249    been performed by intercepting all tdb_write() calls. The hooked
1250    transaction versions of tdb_read() and tdb_write() check this
1251    linked list and try to use the elements of the list in preference
1252    to the real database.
1253
1254  - don't allow any locks to be held when a transaction starts,
1255    otherwise we can end up with deadlock (plus lack of lock nesting
1256    in posix locks would mean the lock is lost)
1257
1258  - if the caller gains a lock during the transaction but doesn't
1259    release it then fail the commit
1260
1261  - allow for nested calls to tdb_transaction_start(), re-using the
1262    existing transaction record. If the inner transaction is cancelled
1263    then a subsequent commit will fail
1264
1265  - keep a mirrored copy of the tdb hash chain heads to allow for the
1266    fast hash heads scan on traverse, updating the mirrored copy in
1267    the transaction version of tdb_write
1268
1269  - allow callers to mix transaction and non-transaction use of tdb,
1270    although once a transaction is started then an exclusive lock is
1271    gained until the transaction is committed or cancelled
1272
1273  - the commit stategy involves first saving away all modified data
1274    into a linearised buffer in the transaction recovery area, then
1275    marking the transaction recovery area with a magic value to
1276    indicate a valid recovery record. In total 4 fsync/msync calls are
1277    needed per commit to prevent race conditions. It might be possible
1278    to reduce this to 3 or even 2 with some more work.
1279
1280  - check for a valid recovery record on open of the tdb, while the
1281    global lock is held. Automatically recover from the transaction
1282    recovery area if needed, then continue with the open as
1283    usual. This allows for smooth crash recovery with no administrator
1284    intervention.
1285
1286  - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
1287    still available, but no transaction recovery area is used and no
1288    fsync/msync calls are made.
1289
1290*/
1291
1292struct tdb_transaction_el {
1293	struct tdb_transaction_el *next, *prev;
1294	tdb_off_t offset;
1295	tdb_len_t length;
1296	unsigned char *data;
1297};
1298
1299/*
1300  hold the context of any current transaction
1301*/
1302struct tdb_transaction {
1303	/* we keep a mirrored copy of the tdb hash heads here so
1304	   tdb_next_hash_chain() can operate efficiently */
1305	u32 *hash_heads;
1306
1307	/* the original io methods - used to do IOs to the real db */
1308	const struct tdb_methods *io_methods;
1309
1310	/* the list of transaction elements. We use a doubly linked
1311	   list with a last pointer to allow us to keep the list
1312	   ordered, with first element at the front of the list. It
1313	   needs to be doubly linked as the read/write traversals need
1314	   to be backwards, while the commit needs to be forwards */
1315	struct tdb_transaction_el *elements, *elements_last;
1316
1317	/* non-zero when an internal transaction error has
1318	   occurred. All write operations will then fail until the
1319	   transaction is ended */
1320	int transaction_error;
1321
1322	/* when inside a transaction we need to keep track of any
1323	   nested tdb_transaction_start() calls, as these are allowed,
1324	   but don't create a new transaction */
1325	int nesting;
1326
1327	/* old file size before transaction */
1328	tdb_len_t old_map_size;
1329};
1330
1331
1332/*
1333  read while in a transaction. We need to check first if the data is in our list
1334  of transaction elements, then if not do a real read
1335*/
1336static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
1337			    tdb_len_t len, int cv)
1338{
1339	struct tdb_transaction_el *el;
1340
1341	/* we need to walk the list backwards to get the most recent data */
1342	for (el=tdb->transaction->elements_last;el;el=el->prev) {
1343		tdb_len_t partial;
1344
1345		if (off+len <= el->offset) {
1346			continue;
1347		}
1348		if (off >= el->offset + el->length) {
1349			continue;
1350		}
1351
1352		/* an overlapping read - needs to be split into up to
1353		   2 reads and a memcpy */
1354		if (off < el->offset) {
1355			partial = el->offset - off;
1356			if (transaction_read(tdb, off, buf, partial, cv) != 0) {
1357				goto fail;
1358			}
1359			len -= partial;
1360			off += partial;
1361			buf = (void *)(partial + (char *)buf);
1362		}
1363		if (off + len <= el->offset + el->length) {
1364			partial = len;
1365		} else {
1366			partial = el->offset + el->length - off;
1367		}
1368		memcpy(buf, el->data + (off - el->offset), partial);
1369		if (cv) {
1370			tdb_convert(buf, len);
1371		}
1372		len -= partial;
1373		off += partial;
1374		buf = (void *)(partial + (char *)buf);
1375
1376		if (len != 0 && transaction_read(tdb, off, buf, len, cv) != 0) {
1377			goto fail;
1378		}
1379
1380		return 0;
1381	}
1382
1383	/* its not in the transaction elements - do a real read */
1384	return tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv);
1385
1386fail:
1387	TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%d len=%d\n", off, len));
1388	tdb->ecode = TDB_ERR_IO;
1389	tdb->transaction->transaction_error = 1;
1390	return -1;
1391}
1392
1393
1394/*
1395  write while in a transaction
1396*/
1397static int transaction_write(struct tdb_context *tdb, tdb_off_t off,
1398			     const void *buf, tdb_len_t len)
1399{
1400	struct tdb_transaction_el *el, *best_el=NULL;
1401
1402	if (len == 0) {
1403		return 0;
1404	}
1405
1406	/* if the write is to a hash head, then update the transaction
1407	   hash heads */
1408	if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
1409	    off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
1410		u32 chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
1411		memcpy(&tdb->transaction->hash_heads[chain], buf, len);
1412	}
1413
1414	/* first see if we can replace an existing entry */
1415	for (el=tdb->transaction->elements_last;el;el=el->prev) {
1416		tdb_len_t partial;
1417
1418		if (best_el == NULL && off == el->offset+el->length) {
1419			best_el = el;
1420		}
1421
1422		if (off+len <= el->offset) {
1423			continue;
1424		}
1425		if (off >= el->offset + el->length) {
1426			continue;
1427		}
1428
1429		/* an overlapping write - needs to be split into up to
1430		   2 writes and a memcpy */
1431		if (off < el->offset) {
1432			partial = el->offset - off;
1433			if (transaction_write(tdb, off, buf, partial) != 0) {
1434				goto fail;
1435			}
1436			len -= partial;
1437			off += partial;
1438			buf = (const void *)(partial + (const char *)buf);
1439		}
1440		if (off + len <= el->offset + el->length) {
1441			partial = len;
1442		} else {
1443			partial = el->offset + el->length - off;
1444		}
1445		memcpy(el->data + (off - el->offset), buf, partial);
1446		len -= partial;
1447		off += partial;
1448		buf = (const void *)(partial + (const char *)buf);
1449
1450		if (len != 0 && transaction_write(tdb, off, buf, len) != 0) {
1451			goto fail;
1452		}
1453
1454		return 0;
1455	}
1456
1457	/* see if we can append the new entry to an existing entry */
1458	if (best_el && best_el->offset + best_el->length == off &&
1459	    (off+len < tdb->transaction->old_map_size ||
1460	     off > tdb->transaction->old_map_size)) {
1461		unsigned char *data = best_el->data;
1462		el = best_el;
1463		el->data = (unsigned char *)realloc(el->data,
1464						    el->length + len);
1465		if (el->data == NULL) {
1466			tdb->ecode = TDB_ERR_OOM;
1467			tdb->transaction->transaction_error = 1;
1468			el->data = data;
1469			return -1;
1470		}
1471		if (buf) {
1472			memcpy(el->data + el->length, buf, len);
1473		} else {
1474			memset(el->data + el->length, TDB_PAD_BYTE, len);
1475		}
1476		el->length += len;
1477		return 0;
1478	}
1479
1480	/* add a new entry at the end of the list */
1481	el = (struct tdb_transaction_el *)malloc(sizeof(*el));
1482	if (el == NULL) {
1483		tdb->ecode = TDB_ERR_OOM;
1484		tdb->transaction->transaction_error = 1;
1485		return -1;
1486	}
1487	el->next = NULL;
1488	el->prev = tdb->transaction->elements_last;
1489	el->offset = off;
1490	el->length = len;
1491	el->data = (unsigned char *)malloc(len);
1492	if (el->data == NULL) {
1493		free(el);
1494		tdb->ecode = TDB_ERR_OOM;
1495		tdb->transaction->transaction_error = 1;
1496		return -1;
1497	}
1498	if (buf) {
1499		memcpy(el->data, buf, len);
1500	} else {
1501		memset(el->data, TDB_PAD_BYTE, len);
1502	}
1503	if (el->prev) {
1504		el->prev->next = el;
1505	} else {
1506		tdb->transaction->elements = el;
1507	}
1508	tdb->transaction->elements_last = el;
1509	return 0;
1510
1511fail:
1512	TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n", off, len));
1513	tdb->ecode = TDB_ERR_IO;
1514	tdb->transaction->transaction_error = 1;
1515	return -1;
1516}
1517
1518/*
1519  accelerated hash chain head search, using the cached hash heads
1520*/
1521static void transaction_next_hash_chain(struct tdb_context *tdb, u32 *chain)
1522{
1523	u32 h = *chain;
1524	for (;h < tdb->header.hash_size;h++) {
1525		/* the +1 takes account of the freelist */
1526		if (0 != tdb->transaction->hash_heads[h+1]) {
1527			break;
1528		}
1529	}
1530	(*chain) = h;
1531}
1532
1533/*
1534  out of bounds check during a transaction
1535*/
1536static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
1537{
1538	if (len <= tdb->map_size) {
1539		return 0;
1540	}
1541	return TDB_ERRCODE(TDB_ERR_IO, -1);
1542}
1543
1544/*
1545  transaction version of tdb_expand().
1546*/
1547static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size,
1548				   tdb_off_t addition)
1549{
1550	/* add a write to the transaction elements, so subsequent
1551	   reads see the zero data */
1552	if (transaction_write(tdb, size, NULL, addition) != 0) {
1553		return -1;
1554	}
1555
1556	return 0;
1557}
1558
1559/*
1560  brlock during a transaction - ignore them
1561*/
1562static int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset,
1563			      int rw_type, int lck_type, int probe, size_t len)
1564{
1565	return 0;
1566}
1567
1568static const struct tdb_methods transaction_methods = {
1569	transaction_read,
1570	transaction_write,
1571	transaction_next_hash_chain,
1572	transaction_oob,
1573	transaction_expand_file,
1574	transaction_brlock
1575};
1576
1577
1578/*
1579  start a tdb transaction. No token is returned, as only a single
1580  transaction is allowed to be pending per tdb_context
1581*/
1582int tdb_transaction_start(struct tdb_context *tdb)
1583{
1584	/* some sanity checks */
1585	if (tdb->read_only || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) {
1586		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
1587		tdb->ecode = TDB_ERR_EINVAL;
1588		return -1;
1589	}
1590
1591	/* cope with nested tdb_transaction_start() calls */
1592	if (tdb->transaction != NULL) {
1593		tdb->transaction->nesting++;
1594		TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n",
1595			 tdb->transaction->nesting));
1596		return 0;
1597	}
1598
1599	if (tdb->num_locks != 0 || tdb->global_lock.count) {
1600		/* the caller must not have any locks when starting a
1601		   transaction as otherwise we'll be screwed by lack
1602		   of nested locks in posix */
1603		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n"));
1604		tdb->ecode = TDB_ERR_LOCK;
1605		return -1;
1606	}
1607
1608	if (tdb->travlocks.next != NULL) {
1609		/* you cannot use transactions inside a traverse (although you can use
1610		   traverse inside a transaction) as otherwise you can end up with
1611		   deadlock */
1612		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
1613		tdb->ecode = TDB_ERR_LOCK;
1614		return -1;
1615	}
1616
1617	tdb->transaction = (struct tdb_transaction *)
1618		calloc(sizeof(struct tdb_transaction), 1);
1619	if (tdb->transaction == NULL) {
1620		tdb->ecode = TDB_ERR_OOM;
1621		return -1;
1622	}
1623
1624	/* get the transaction write lock. This is a blocking lock. As
1625	   discussed with Volker, there are a number of ways we could
1626	   make this async, which we will probably do in the future */
1627	if (tdb_transaction_lock(tdb, F_WRLCK) == -1) {
1628		SAFE_FREE(tdb->transaction);
1629		return -1;
1630	}
1631
1632	/* get a read lock from the freelist to the end of file. This
1633	   is upgraded to a write lock during the commit */
1634	if (tdb_brlock(tdb, FREELIST_TOP, F_RDLCK, F_SETLKW, 0, 0) == -1) {
1635		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
1636		tdb->ecode = TDB_ERR_LOCK;
1637		goto fail;
1638	}
1639
1640	/* setup a copy of the hash table heads so the hash scan in
1641	   traverse can be fast */
1642	tdb->transaction->hash_heads = (u32 *)
1643		calloc(tdb->header.hash_size+1, sizeof(u32));
1644	if (tdb->transaction->hash_heads == NULL) {
1645		tdb->ecode = TDB_ERR_OOM;
1646		goto fail;
1647	}
1648	if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
1649				   TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
1650		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n"));
1651		tdb->ecode = TDB_ERR_IO;
1652		goto fail;
1653	}
1654
1655	/* make sure we know about any file expansions already done by
1656	   anyone else */
1657	tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
1658	tdb->transaction->old_map_size = tdb->map_size;
1659
1660	/* finally hook the io methods, replacing them with
1661	   transaction specific methods */
1662	tdb->transaction->io_methods = tdb->methods;
1663	tdb->methods = &transaction_methods;
1664
1665	/* by calling this transaction write here, we ensure that we don't grow the
1666	   transaction linked list due to hash table updates */
1667	if (transaction_write(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
1668			      TDB_HASHTABLE_SIZE(tdb)) != 0) {
1669		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to prime hash table\n"));
1670		tdb->ecode = TDB_ERR_IO;
1671		tdb->methods = tdb->transaction->io_methods;
1672		goto fail;
1673	}
1674
1675	return 0;
1676
1677fail:
1678	tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
1679	tdb_transaction_unlock(tdb);
1680	SAFE_FREE(tdb->transaction->hash_heads);
1681	SAFE_FREE(tdb->transaction);
1682	return -1;
1683}
1684
1685
1686/*
1687  cancel the current transaction
1688*/
1689int tdb_transaction_cancel(struct tdb_context *tdb)
1690{
1691	if (tdb->transaction == NULL) {
1692		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
1693		return -1;
1694	}
1695
1696	if (tdb->transaction->nesting != 0) {
1697		tdb->transaction->transaction_error = 1;
1698		tdb->transaction->nesting--;
1699		return 0;
1700	}
1701
1702	tdb->map_size = tdb->transaction->old_map_size;
1703
1704	/* free all the transaction elements */
1705	while (tdb->transaction->elements) {
1706		struct tdb_transaction_el *el = tdb->transaction->elements;
1707		tdb->transaction->elements = el->next;
1708		free(el->data);
1709		free(el);
1710	}
1711
1712	/* remove any global lock created during the transaction */
1713	if (tdb->global_lock.count != 0) {
1714		tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 4*tdb->header.hash_size);
1715		tdb->global_lock.count = 0;
1716	}
1717
1718	/* remove any locks created during the transaction */
1719	if (tdb->num_locks != 0) {
1720		int i;
1721		for (i=0;i<tdb->num_lockrecs;i++) {
1722			tdb_brlock(tdb,FREELIST_TOP+4*tdb->lockrecs[i].list,
1723				   F_UNLCK,F_SETLKW, 0, 1);
1724		}
1725		tdb->num_locks = 0;
1726		tdb->num_lockrecs = 0;
1727		SAFE_FREE(tdb->lockrecs);
1728	}
1729
1730	/* restore the normal io methods */
1731	tdb->methods = tdb->transaction->io_methods;
1732
1733	tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
1734	tdb_transaction_unlock(tdb);
1735	SAFE_FREE(tdb->transaction->hash_heads);
1736	SAFE_FREE(tdb->transaction);
1737
1738	return 0;
1739}
1740
1741/*
1742  sync to disk
1743*/
1744static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
1745{
1746	if (fsync(tdb->fd) != 0) {
1747		tdb->ecode = TDB_ERR_IO;
1748		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
1749		return -1;
1750	}
1751#if defined(HAVE_MSYNC) && defined(MS_SYNC)
1752	if (tdb->map_ptr) {
1753		tdb_off_t moffset = offset & ~(tdb->page_size-1);
1754		if (msync(moffset + (char *)tdb->map_ptr,
1755			  length + (offset - moffset), MS_SYNC) != 0) {
1756			tdb->ecode = TDB_ERR_IO;
1757			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
1758				 strerror(errno)));
1759			return -1;
1760		}
1761	}
1762#endif
1763	return 0;
1764}
1765
1766
1767/*
1768  work out how much space the linearised recovery data will consume
1769*/
1770static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
1771{
1772	struct tdb_transaction_el *el;
1773	tdb_len_t recovery_size = 0;
1774
1775	recovery_size = sizeof(u32);
1776	for (el=tdb->transaction->elements;el;el=el->next) {
1777		if (el->offset >= tdb->transaction->old_map_size) {
1778			continue;
1779		}
1780		recovery_size += 2*sizeof(tdb_off_t) + el->length;
1781	}
1782
1783	return recovery_size;
1784}
1785
1786/*
1787  allocate the recovery area, or use an existing recovery area if it is
1788  large enough
1789*/
1790static int tdb_recovery_allocate(struct tdb_context *tdb,
1791				 tdb_len_t *recovery_size,
1792				 tdb_off_t *recovery_offset,
1793				 tdb_len_t *recovery_max_size)
1794{
1795	struct list_struct rec;
1796	const struct tdb_methods *methods = tdb->transaction->io_methods;
1797	tdb_off_t recovery_head;
1798
1799	if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
1800		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
1801		return -1;
1802	}
1803
1804	rec.rec_len = 0;
1805
1806	if (recovery_head != 0 &&
1807	    methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
1808		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery record\n"));
1809		return -1;
1810	}
1811
1812	*recovery_size = tdb_recovery_size(tdb);
1813
1814	if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
1815		/* it fits in the existing area */
1816		*recovery_max_size = rec.rec_len;
1817		*recovery_offset = recovery_head;
1818		return 0;
1819	}
1820
1821	/* we need to free up the old recovery area, then allocate a
1822	   new one at the end of the file. Note that we cannot use
1823	   tdb_allocate() to allocate the new one as that might return
1824	   us an area that is being currently used (as of the start of
1825	   the transaction) */
1826	if (recovery_head != 0) {
1827		if (tdb_free(tdb, recovery_head, &rec) == -1) {
1828			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to free previous recovery area\n"));
1829			return -1;
1830		}
1831	}
1832
1833	/* the tdb_free() call might have increased the recovery size */
1834	*recovery_size = tdb_recovery_size(tdb);
1835
1836	/* round up to a multiple of page size */
1837	*recovery_max_size = TDB_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
1838	*recovery_offset = tdb->map_size;
1839	recovery_head = *recovery_offset;
1840
1841	if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
1842				     (tdb->map_size - tdb->transaction->old_map_size) +
1843				     sizeof(rec) + *recovery_max_size) == -1) {
1844		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
1845		return -1;
1846	}
1847
1848	/* remap the file (if using mmap) */
1849	methods->tdb_oob(tdb, tdb->map_size + 1, 1);
1850
1851	/* we have to reset the old map size so that we don't try to expand the file
1852	   again in the transaction commit, which would destroy the recovery area */
1853	tdb->transaction->old_map_size = tdb->map_size;
1854
1855	/* write the recovery header offset and sync - we can sync without a race here
1856	   as the magic ptr in the recovery record has not been set */
1857	CONVERT(recovery_head);
1858	if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD,
1859			       &recovery_head, sizeof(tdb_off_t)) == -1) {
1860		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
1861		return -1;
1862	}
1863
1864	return 0;
1865}
1866
1867
1868/*
1869  setup the recovery data that will be used on a crash during commit
1870*/
1871static int transaction_setup_recovery(struct tdb_context *tdb,
1872				      tdb_off_t *magic_offset)
1873{
1874	struct tdb_transaction_el *el;
1875	tdb_len_t recovery_size;
1876	unsigned char *data, *p;
1877	const struct tdb_methods *methods = tdb->transaction->io_methods;
1878	struct list_struct *rec;
1879	tdb_off_t recovery_offset, recovery_max_size;
1880	tdb_off_t old_map_size = tdb->transaction->old_map_size;
1881	u32 magic, tailer;
1882
1883	/*
1884	  check that the recovery area has enough space
1885	*/
1886	if (tdb_recovery_allocate(tdb, &recovery_size,
1887				  &recovery_offset, &recovery_max_size) == -1) {
1888		return -1;
1889	}
1890
1891	data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
1892	if (data == NULL) {
1893		tdb->ecode = TDB_ERR_OOM;
1894		return -1;
1895	}
1896
1897	rec = (struct list_struct *)data;
1898	memset(rec, 0, sizeof(*rec));
1899
1900	rec->magic    = 0;
1901	rec->data_len = recovery_size;
1902	rec->rec_len  = recovery_max_size;
1903	rec->key_len  = old_map_size;
1904	CONVERT(rec);
1905
1906	/* build the recovery data into a single blob to allow us to do a single
1907	   large write, which should be more efficient */
1908	p = data + sizeof(*rec);
1909	for (el=tdb->transaction->elements;el;el=el->next) {
1910		if (el->offset >= old_map_size) {
1911			continue;
1912		}
1913		if (el->offset + el->length > tdb->transaction->old_map_size) {
1914			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
1915			free(data);
1916			tdb->ecode = TDB_ERR_CORRUPT;
1917			return -1;
1918		}
1919		memcpy(p, &el->offset, 4);
1920		memcpy(p+4, &el->length, 4);
1921		if (DOCONV()) {
1922			tdb_convert(p, 8);
1923		}
1924		/* the recovery area contains the old data, not the
1925		   new data, so we have to call the original tdb_read
1926		   method to get it */
1927		if (methods->tdb_read(tdb, el->offset, p + 8, el->length, 0) != 0) {
1928			free(data);
1929			tdb->ecode = TDB_ERR_IO;
1930			return -1;
1931		}
1932		p += 8 + el->length;
1933	}
1934
1935	/* and the tailer */
1936	tailer = sizeof(*rec) + recovery_max_size;
1937	memcpy(p, &tailer, 4);
1938	CONVERT(p);
1939
1940	/* write the recovery data to the recovery area */
1941	if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
1942		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
1943		free(data);
1944		tdb->ecode = TDB_ERR_IO;
1945		return -1;
1946	}
1947
1948	/* as we don't have ordered writes, we have to sync the recovery
1949	   data before we update the magic to indicate that the recovery
1950	   data is present */
1951	if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
1952		free(data);
1953		return -1;
1954	}
1955
1956	free(data);
1957
1958	magic = TDB_RECOVERY_MAGIC;
1959	CONVERT(magic);
1960
1961	*magic_offset = recovery_offset + offsetof(struct list_struct, magic);
1962
1963	if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
1964		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
1965		tdb->ecode = TDB_ERR_IO;
1966		return -1;
1967	}
1968
1969	/* ensure the recovery magic marker is on disk */
1970	if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
1971		return -1;
1972	}
1973
1974	return 0;
1975}
1976
1977/*
1978  commit the current transaction
1979*/
1980int tdb_transaction_commit(struct tdb_context *tdb)
1981{
1982	const struct tdb_methods *methods;
1983	tdb_off_t magic_offset = 0;
1984	u32 zero = 0;
1985
1986	if (tdb->transaction == NULL) {
1987		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
1988		return -1;
1989	}
1990
1991	if (tdb->transaction->transaction_error) {
1992		tdb->ecode = TDB_ERR_IO;
1993		tdb_transaction_cancel(tdb);
1994		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
1995		return -1;
1996	}
1997
1998	if (tdb->transaction->nesting != 0) {
1999		tdb->transaction->nesting--;
2000		return 0;
2001	}
2002
2003	/* check for a null transaction */
2004	if (tdb->transaction->elements == NULL) {
2005		tdb_transaction_cancel(tdb);
2006		return 0;
2007	}
2008
2009	methods = tdb->transaction->io_methods;
2010
2011	/* if there are any locks pending then the caller has not
2012	   nested their locks properly, so fail the transaction */
2013	if (tdb->num_locks || tdb->global_lock.count) {
2014		tdb->ecode = TDB_ERR_LOCK;
2015		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: locks pending on commit\n"));
2016		tdb_transaction_cancel(tdb);
2017		return -1;
2018	}
2019
2020	/* upgrade the main transaction lock region to a write lock */
2021	if (tdb_brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) {
2022		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to upgrade hash locks\n"));
2023		tdb->ecode = TDB_ERR_LOCK;
2024		tdb_transaction_cancel(tdb);
2025		return -1;
2026	}
2027
2028	/* get the global lock - this prevents new users attaching to the database
2029	   during the commit */
2030	if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
2031		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: failed to get global lock\n"));
2032		tdb->ecode = TDB_ERR_LOCK;
2033		tdb_transaction_cancel(tdb);
2034		return -1;
2035	}
2036
2037	if (!(tdb->flags & TDB_NOSYNC)) {
2038		/* write the recovery data to the end of the file */
2039		if (transaction_setup_recovery(tdb, &magic_offset) == -1) {
2040			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to setup recovery data\n"));
2041			tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
2042			tdb_transaction_cancel(tdb);
2043			return -1;
2044		}
2045	}
2046
2047	/* expand the file to the new size if needed */
2048	if (tdb->map_size != tdb->transaction->old_map_size) {
2049		if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
2050					     tdb->map_size -
2051					     tdb->transaction->old_map_size) == -1) {
2052			tdb->ecode = TDB_ERR_IO;
2053			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: expansion failed\n"));
2054			tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
2055			tdb_transaction_cancel(tdb);
2056			return -1;
2057		}
2058		tdb->map_size = tdb->transaction->old_map_size;
2059		methods->tdb_oob(tdb, tdb->map_size + 1, 1);
2060	}
2061
2062	/* perform all the writes */
2063	while (tdb->transaction->elements) {
2064		struct tdb_transaction_el *el = tdb->transaction->elements;
2065
2066		if (methods->tdb_write(tdb, el->offset, el->data, el->length) == -1) {
2067			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
2068
2069			/* we've overwritten part of the data and
2070			   possibly expanded the file, so we need to
2071			   run the crash recovery code */
2072			tdb->methods = methods;
2073			tdb_transaction_recover(tdb);
2074
2075			tdb_transaction_cancel(tdb);
2076			tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
2077
2078			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
2079			return -1;
2080		}
2081		tdb->transaction->elements = el->next;
2082		free(el->data);
2083		free(el);
2084	}
2085
2086	if (!(tdb->flags & TDB_NOSYNC)) {
2087		/* ensure the new data is on disk */
2088		if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
2089			return -1;
2090		}
2091
2092		/* remove the recovery marker */
2093		if (methods->tdb_write(tdb, magic_offset, &zero, 4) == -1) {
2094			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to remove recovery magic\n"));
2095			return -1;
2096		}
2097
2098		/* ensure the recovery marker has been removed on disk */
2099		if (transaction_sync(tdb, magic_offset, 4) == -1) {
2100			return -1;
2101		}
2102	}
2103
2104	tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
2105
2106	/*
2107	  TODO: maybe write to some dummy hdr field, or write to magic
2108	  offset without mmap, before the last sync, instead of the
2109	  utime() call
2110	*/
2111
2112	/* on some systems (like Linux 2.6.x) changes via mmap/msync
2113	   don't change the mtime of the file, this means the file may
2114	   not be backed up (as tdb rounding to block sizes means that
2115	   file size changes are quite rare too). The following forces
2116	   mtime changes when a transaction completes */
2117#ifdef HAVE_UTIME
2118	utime(tdb->name, NULL);
2119#endif
2120
2121	/* use a transaction cancel to free memory and remove the
2122	   transaction locks */
2123	tdb_transaction_cancel(tdb);
2124	return 0;
2125}
2126
2127
2128/*
2129  recover from an aborted transaction. Must be called with exclusive
2130  database write access already established (including the global
2131  lock to prevent new processes attaching)
2132*/
2133int tdb_transaction_recover(struct tdb_context *tdb)
2134{
2135	tdb_off_t recovery_head, recovery_eof;
2136	unsigned char *data, *p;
2137	u32 zero = 0;
2138	struct list_struct rec;
2139
2140	/* find the recovery area */
2141	if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
2142		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery head\n"));
2143		tdb->ecode = TDB_ERR_IO;
2144		return -1;
2145	}
2146
2147	if (recovery_head == 0) {
2148		/* we have never allocated a recovery record */
2149		return 0;
2150	}
2151
2152	/* read the recovery record */
2153	if (tdb->methods->tdb_read(tdb, recovery_head, &rec,
2154				   sizeof(rec), DOCONV()) == -1) {
2155		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));
2156		tdb->ecode = TDB_ERR_IO;
2157		return -1;
2158	}
2159
2160	if (rec.magic != TDB_RECOVERY_MAGIC) {
2161		/* there is no valid recovery data */
2162		return 0;
2163	}
2164
2165	if (tdb->read_only) {
2166		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: attempt to recover read only database\n"));
2167		tdb->ecode = TDB_ERR_CORRUPT;
2168		return -1;
2169	}
2170
2171	recovery_eof = rec.key_len;
2172
2173	data = (unsigned char *)malloc(rec.data_len);
2174	if (data == NULL) {
2175		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));
2176		tdb->ecode = TDB_ERR_OOM;
2177		return -1;
2178	}
2179
2180	/* read the full recovery data */
2181	if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
2182				   rec.data_len, 0) == -1) {
2183		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));
2184		tdb->ecode = TDB_ERR_IO;
2185		return -1;
2186	}
2187
2188	/* recover the file data */
2189	p = data;
2190	while (p+8 < data + rec.data_len) {
2191		u32 ofs, len;
2192		if (DOCONV()) {
2193			tdb_convert(p, 8);
2194		}
2195		memcpy(&ofs, p, 4);
2196		memcpy(&len, p+4, 4);
2197
2198		if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
2199			free(data);
2200			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len, ofs));
2201			tdb->ecode = TDB_ERR_IO;
2202			return -1;
2203		}
2204		p += 8 + len;
2205	}
2206
2207	free(data);
2208
2209	if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
2210		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync recovery\n"));
2211		tdb->ecode = TDB_ERR_IO;
2212		return -1;
2213	}
2214
2215	/* if the recovery area is after the recovered eof then remove it */
2216	if (recovery_eof <= recovery_head) {
2217		if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
2218			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
2219			tdb->ecode = TDB_ERR_IO;
2220			return -1;
2221		}
2222	}
2223
2224	/* remove the recovery magic */
2225	if (tdb_ofs_write(tdb, recovery_head + offsetof(struct list_struct, magic),
2226			  &zero) == -1) {
2227		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
2228		tdb->ecode = TDB_ERR_IO;
2229		return -1;
2230	}
2231
2232	/* reduce the file size to the old size */
2233	tdb_munmap(tdb);
2234	if (ftruncate(tdb->fd, recovery_eof) != 0) {
2235		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to reduce to recovery size\n"));
2236		tdb->ecode = TDB_ERR_IO;
2237		return -1;
2238	}
2239	tdb->map_size = recovery_eof;
2240	tdb_mmap(tdb);
2241
2242	if (transaction_sync(tdb, 0, recovery_eof) == -1) {
2243		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
2244		tdb->ecode = TDB_ERR_IO;
2245		return -1;
2246	}
2247
2248	TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %d byte database\n",
2249		 recovery_eof));
2250
2251	/* all done */
2252	return 0;
2253}
2254
2255/* file: freelist.c */
2256
2257/* read a freelist record and check for simple errors */
2258static int tdb_rec_free_read(struct tdb_context *tdb, tdb_off_t off, struct list_struct *rec)
2259{
2260	if (tdb->methods->tdb_read(tdb, off, rec, sizeof(*rec),DOCONV()) == -1)
2261		return -1;
2262
2263	if (rec->magic == TDB_MAGIC) {
2264		/* this happens when a app is showdown while deleting a record - we should
2265		   not completely fail when this happens */
2266		TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_rec_free_read non-free magic 0x%x at offset=%d - fixing\n",
2267			 rec->magic, off));
2268		rec->magic = TDB_FREE_MAGIC;
2269		if (tdb->methods->tdb_write(tdb, off, rec, sizeof(*rec)) == -1)
2270			return -1;
2271	}
2272
2273	if (rec->magic != TDB_FREE_MAGIC) {
2274		/* Ensure ecode is set for log fn. */
2275		tdb->ecode = TDB_ERR_CORRUPT;
2276		TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_rec_free_read bad magic 0x%x at offset=%d\n",
2277			   rec->magic, off));
2278		return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
2279	}
2280	if (tdb->methods->tdb_oob(tdb, rec->next+sizeof(*rec), 0) != 0)
2281		return -1;
2282	return 0;
2283}
2284
2285
2286
2287/* Remove an element from the freelist.  Must have alloc lock. */
2288static int remove_from_freelist(struct tdb_context *tdb, tdb_off_t off, tdb_off_t next)
2289{
2290	tdb_off_t last_ptr, i;
2291
2292	/* read in the freelist top */
2293	last_ptr = FREELIST_TOP;
2294	while (tdb_ofs_read(tdb, last_ptr, &i) != -1 && i != 0) {
2295		if (i == off) {
2296			/* We've found it! */
2297			return tdb_ofs_write(tdb, last_ptr, &next);
2298		}
2299		/* Follow chain (next offset is at start of record) */
2300		last_ptr = i;
2301	}
2302	TDB_LOG((tdb, TDB_DEBUG_FATAL,"remove_from_freelist: not on list at off=%d\n", off));
2303	return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
2304}
2305
2306
2307/* update a record tailer (must hold allocation lock) */
2308static int update_tailer(struct tdb_context *tdb, tdb_off_t offset,
2309			 const struct list_struct *rec)
2310{
2311	tdb_off_t totalsize;
2312
2313	/* Offset of tailer from record header */
2314	totalsize = sizeof(*rec) + rec->rec_len;
2315	return tdb_ofs_write(tdb, offset + totalsize - sizeof(tdb_off_t),
2316			 &totalsize);
2317}
2318
2319/* Add an element into the freelist. Merge adjacent records if
2320   neccessary. */
2321int tdb_free(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec)
2322{
2323	tdb_off_t right, left;
2324
2325	/* Allocation and tailer lock */
2326	if (tdb_lock(tdb, -1, F_WRLCK) != 0)
2327		return -1;
2328
2329	/* set an initial tailer, so if we fail we don't leave a bogus record */
2330	if (update_tailer(tdb, offset, rec) != 0) {
2331		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: update_tailer failed!\n"));
2332		goto fail;
2333	}
2334
2335	/* Look right first (I'm an Australian, dammit) */
2336	right = offset + sizeof(*rec) + rec->rec_len;
2337	if (right + sizeof(*rec) <= tdb->map_size) {
2338		struct list_struct r;
2339
2340		if (tdb->methods->tdb_read(tdb, right, &r, sizeof(r), DOCONV()) == -1) {
2341			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: right read failed at %u\n", right));
2342			goto left;
2343		}
2344
2345		/* If it's free, expand to include it. */
2346		if (r.magic == TDB_FREE_MAGIC) {
2347			if (remove_from_freelist(tdb, right, r.next) == -1) {
2348				TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: right free failed at %u\n", right));
2349				goto left;
2350			}
2351			rec->rec_len += sizeof(r) + r.rec_len;
2352		}
2353	}
2354
2355left:
2356	/* Look left */
2357	left = offset - sizeof(tdb_off_t);
2358	if (left > TDB_DATA_START(tdb->header.hash_size)) {
2359		struct list_struct l;
2360		tdb_off_t leftsize;
2361
2362		/* Read in tailer and jump back to header */
2363		if (tdb_ofs_read(tdb, left, &leftsize) == -1) {
2364			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: left offset read failed at %u\n", left));
2365			goto update;
2366		}
2367
2368		/* it could be uninitialised data */
2369		if (leftsize == 0 || leftsize == TDB_PAD_U32) {
2370			goto update;
2371		}
2372
2373		left = offset - leftsize;
2374
2375		/* Now read in record */
2376		if (tdb->methods->tdb_read(tdb, left, &l, sizeof(l), DOCONV()) == -1) {
2377			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: left read failed at %u (%u)\n", left, leftsize));
2378			goto update;
2379		}
2380
2381		/* If it's free, expand to include it. */
2382		if (l.magic == TDB_FREE_MAGIC) {
2383			if (remove_from_freelist(tdb, left, l.next) == -1) {
2384				TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: left free failed at %u\n", left));
2385				goto update;
2386			} else {
2387				offset = left;
2388				rec->rec_len += leftsize;
2389			}
2390		}
2391	}
2392
2393update:
2394	if (update_tailer(tdb, offset, rec) == -1) {
2395		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: update_tailer failed at %u\n", offset));
2396		goto fail;
2397	}
2398
2399	/* Now, prepend to free list */
2400	rec->magic = TDB_FREE_MAGIC;
2401
2402	if (tdb_ofs_read(tdb, FREELIST_TOP, &rec->next) == -1 ||
2403	    tdb_rec_write(tdb, offset, rec) == -1 ||
2404	    tdb_ofs_write(tdb, FREELIST_TOP, &offset) == -1) {
2405		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free record write failed at offset=%d\n", offset));
2406		goto fail;
2407	}
2408
2409	/* And we're done. */
2410	tdb_unlock(tdb, -1, F_WRLCK);
2411	return 0;
2412
2413 fail:
2414	tdb_unlock(tdb, -1, F_WRLCK);
2415	return -1;
2416}
2417
2418
2419/*
2420   the core of tdb_allocate - called when we have decided which
2421   free list entry to use
2422 */
2423static tdb_off_t tdb_allocate_ofs(struct tdb_context *tdb, tdb_len_t length, tdb_off_t rec_ptr,
2424				struct list_struct *rec, tdb_off_t last_ptr)
2425{
2426	struct list_struct newrec;
2427	tdb_off_t newrec_ptr;
2428
2429	memset(&newrec, '\0', sizeof(newrec));
2430
2431	/* found it - now possibly split it up  */
2432	if (rec->rec_len > length + MIN_REC_SIZE) {
2433		/* Length of left piece */
2434		length = TDB_ALIGN(length, TDB_ALIGNMENT);
2435
2436		/* Right piece to go on free list */
2437		newrec.rec_len = rec->rec_len - (sizeof(*rec) + length);
2438		newrec_ptr = rec_ptr + sizeof(*rec) + length;
2439
2440		/* And left record is shortened */
2441		rec->rec_len = length;
2442	} else {
2443		newrec_ptr = 0;
2444	}
2445
2446	/* Remove allocated record from the free list */
2447	if (tdb_ofs_write(tdb, last_ptr, &rec->next) == -1) {
2448		return 0;
2449	}
2450
2451	/* Update header: do this before we drop alloc
2452	   lock, otherwise tdb_free() might try to
2453	   merge with us, thinking we're free.
2454	   (Thanks Jeremy Allison). */
2455	rec->magic = TDB_MAGIC;
2456	if (tdb_rec_write(tdb, rec_ptr, rec) == -1) {
2457		return 0;
2458	}
2459
2460	/* Did we create new block? */
2461	if (newrec_ptr) {
2462		/* Update allocated record tailer (we
2463		   shortened it). */
2464		if (update_tailer(tdb, rec_ptr, rec) == -1) {
2465			return 0;
2466		}
2467
2468		/* Free new record */
2469		if (tdb_free(tdb, newrec_ptr, &newrec) == -1) {
2470			return 0;
2471		}
2472	}
2473
2474	/* all done - return the new record offset */
2475	return rec_ptr;
2476}
2477
2478/* allocate some space from the free list. The offset returned points
2479   to a unconnected list_struct within the database with room for at
2480   least length bytes of total data
2481
2482   0 is returned if the space could not be allocated
2483 */
2484tdb_off_t tdb_allocate(struct tdb_context *tdb, tdb_len_t length, struct list_struct *rec)
2485{
2486	tdb_off_t rec_ptr, last_ptr, newrec_ptr;
2487	struct {
2488		tdb_off_t rec_ptr, last_ptr;
2489		tdb_len_t rec_len;
2490	} bestfit;
2491
2492	if (tdb_lock(tdb, -1, F_WRLCK) == -1)
2493		return 0;
2494
2495	/* Extra bytes required for tailer */
2496	length += sizeof(tdb_off_t);
2497
2498 again:
2499	last_ptr = FREELIST_TOP;
2500
2501	/* read in the freelist top */
2502	if (tdb_ofs_read(tdb, FREELIST_TOP, &rec_ptr) == -1)
2503		goto fail;
2504
2505	bestfit.rec_ptr = 0;
2506	bestfit.last_ptr = 0;
2507	bestfit.rec_len = 0;
2508
2509	/*
2510	   this is a best fit allocation strategy. Originally we used
2511	   a first fit strategy, but it suffered from massive fragmentation
2512	   issues when faced with a slowly increasing record size.
2513	 */
2514	while (rec_ptr) {
2515		if (tdb_rec_free_read(tdb, rec_ptr, rec) == -1) {
2516			goto fail;
2517		}
2518
2519		if (rec->rec_len >= length) {
2520			if (bestfit.rec_ptr == 0 ||
2521			    rec->rec_len < bestfit.rec_len) {
2522				bestfit.rec_len = rec->rec_len;
2523				bestfit.rec_ptr = rec_ptr;
2524				bestfit.last_ptr = last_ptr;
2525				/* consider a fit to be good enough if
2526				   we aren't wasting more than half
2527				   the space */
2528				if (bestfit.rec_len < 2*length) {
2529					break;
2530				}
2531			}
2532		}
2533
2534		/* move to the next record */
2535		last_ptr = rec_ptr;
2536		rec_ptr = rec->next;
2537	}
2538
2539	if (bestfit.rec_ptr != 0) {
2540		if (tdb_rec_free_read(tdb, bestfit.rec_ptr, rec) == -1) {
2541			goto fail;
2542		}
2543
2544		newrec_ptr = tdb_allocate_ofs(tdb, length, bestfit.rec_ptr, rec, bestfit.last_ptr);
2545		tdb_unlock(tdb, -1, F_WRLCK);
2546		return newrec_ptr;
2547	}
2548
2549	/* we didn't find enough space. See if we can expand the
2550	   database and if we can then try again */
2551	if (tdb_expand(tdb, length + sizeof(*rec)) == 0)
2552		goto again;
2553 fail:
2554	tdb_unlock(tdb, -1, F_WRLCK);
2555	return 0;
2556}
2557
2558/* file: freelistcheck.c */
2559
2560/* Check the freelist is good and contains no loops.
2561   Very memory intensive - only do this as a consistency
2562   checker. Heh heh - uses an in memory tdb as the storage
2563   for the "seen" record list. For some reason this strikes
2564   me as extremely clever as I don't have to write another tree
2565   data structure implementation :-).
2566 */
2567
2568static int seen_insert(struct tdb_context *mem_tdb, tdb_off_t rec_ptr)
2569{
2570	TDB_DATA key, data;
2571
2572	memset(&data, '\0', sizeof(data));
2573	key.dptr = (unsigned char *)&rec_ptr;
2574	key.dsize = sizeof(rec_ptr);
2575	return tdb_store(mem_tdb, key, data, TDB_INSERT);
2576}
2577
2578int tdb_validate_freelist(struct tdb_context *tdb, int *pnum_entries)
2579{
2580	struct tdb_context *mem_tdb = NULL;
2581	struct list_struct rec;
2582	tdb_off_t rec_ptr, last_ptr;
2583	int ret = -1;
2584
2585	*pnum_entries = 0;
2586
2587	mem_tdb = tdb_open("flval", tdb->header.hash_size,
2588				TDB_INTERNAL, O_RDWR, 0600);
2589	if (!mem_tdb) {
2590		return -1;
2591	}
2592
2593	if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
2594		tdb_close(mem_tdb);
2595		return 0;
2596	}
2597
2598	last_ptr = FREELIST_TOP;
2599
2600	/* Store the FREELIST_TOP record. */
2601	if (seen_insert(mem_tdb, last_ptr) == -1) {
2602		ret = TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
2603		goto fail;
2604	}
2605
2606	/* read in the freelist top */
2607	if (tdb_ofs_read(tdb, FREELIST_TOP, &rec_ptr) == -1) {
2608		goto fail;
2609	}
2610
2611	while (rec_ptr) {
2612
2613		/* If we can't store this record (we've seen it
2614		   before) then the free list has a loop and must
2615		   be corrupt. */
2616
2617		if (seen_insert(mem_tdb, rec_ptr)) {
2618			ret = TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
2619			goto fail;
2620		}
2621
2622		if (tdb_rec_free_read(tdb, rec_ptr, &rec) == -1) {
2623			goto fail;
2624		}
2625
2626		/* move to the next record */
2627		last_ptr = rec_ptr;
2628		rec_ptr = rec.next;
2629		*pnum_entries += 1;
2630	}
2631
2632	ret = 0;
2633
2634  fail:
2635
2636	tdb_close(mem_tdb);
2637	tdb_unlock(tdb, -1, F_WRLCK);
2638	return ret;
2639}
2640
2641/* file: traverse.c */
2642
2643/* Uses traverse lock: 0 = finish, -1 = error, other = record offset */
2644static int tdb_next_lock(struct tdb_context *tdb, struct tdb_traverse_lock *tlock,
2645			 struct list_struct *rec)
2646{
2647	int want_next = (tlock->off != 0);
2648
2649	/* Lock each chain from the start one. */
2650	for (; tlock->hash < tdb->header.hash_size; tlock->hash++) {
2651		if (!tlock->off && tlock->hash != 0) {
2652			/* this is an optimisation for the common case where
2653			   the hash chain is empty, which is particularly
2654			   common for the use of tdb with ldb, where large
2655			   hashes are used. In that case we spend most of our
2656			   time in tdb_brlock(), locking empty hash chains.
2657
2658			   To avoid this, we do an unlocked pre-check to see
2659			   if the hash chain is empty before starting to look
2660			   inside it. If it is empty then we can avoid that
2661			   hash chain. If it isn't empty then we can't believe
2662			   the value we get back, as we read it without a
2663			   lock, so instead we get the lock and re-fetch the
2664			   value below.
2665
2666			   Notice that not doing this optimisation on the
2667			   first hash chain is critical. We must guarantee
2668			   that we have done at least one fcntl lock at the
2669			   start of a search to guarantee that memory is
2670			   coherent on SMP systems. If records are added by
2671			   others during the search then thats OK, and we
2672			   could possibly miss those with this trick, but we
2673			   could miss them anyway without this trick, so the
2674			   semantics don't change.
2675
2676			   With a non-indexed ldb search this trick gains us a
2677			   factor of around 80 in speed on a linux 2.6.x
2678			   system (testing using ldbtest).
2679			*/
2680			tdb->methods->next_hash_chain(tdb, &tlock->hash);
2681			if (tlock->hash == tdb->header.hash_size) {
2682				continue;
2683			}
2684		}
2685
2686		if (tdb_lock(tdb, tlock->hash, tlock->lock_rw) == -1)
2687			return -1;
2688
2689		/* No previous record?  Start at top of chain. */
2690		if (!tlock->off) {
2691			if (tdb_ofs_read(tdb, TDB_HASH_TOP(tlock->hash),
2692				     &tlock->off) == -1)
2693				goto fail;
2694		} else {
2695			/* Otherwise unlock the previous record. */
2696			if (tdb_unlock_record(tdb, tlock->off) != 0)
2697				goto fail;
2698		}
2699
2700		if (want_next) {
2701			/* We have offset of old record: grab next */
2702			if (tdb_rec_read(tdb, tlock->off, rec) == -1)
2703				goto fail;
2704			tlock->off = rec->next;
2705		}
2706
2707		/* Iterate through chain */
2708		while( tlock->off) {
2709			tdb_off_t current;
2710			if (tdb_rec_read(tdb, tlock->off, rec) == -1)
2711				goto fail;
2712
2713			/* Detect infinite loops. From "Shlomi Yaakobovich" <Shlomi@exanet.com>. */
2714			if (tlock->off == rec->next) {
2715				TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_next_lock: loop detected.\n"));
2716				goto fail;
2717			}
2718
2719			if (!TDB_DEAD(rec)) {
2720				/* Woohoo: we found one! */
2721				if (tdb_lock_record(tdb, tlock->off) != 0)
2722					goto fail;
2723				return tlock->off;
2724			}
2725
2726			/* Try to clean dead ones from old traverses */
2727			current = tlock->off;
2728			tlock->off = rec->next;
2729			if (!(tdb->read_only || tdb->traverse_read) &&
2730			    tdb_do_delete(tdb, current, rec) != 0)
2731				goto fail;
2732		}
2733		tdb_unlock(tdb, tlock->hash, tlock->lock_rw);
2734		want_next = 0;
2735	}
2736	/* We finished iteration without finding anything */
2737	return TDB_ERRCODE(TDB_SUCCESS, 0);
2738
2739 fail:
2740	tlock->off = 0;
2741	if (tdb_unlock(tdb, tlock->hash, tlock->lock_rw) != 0)
2742		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_next_lock: On error unlock failed!\n"));
2743	return -1;
2744}
2745
2746/* traverse the entire database - calling fn(tdb, key, data) on each element.
2747   return -1 on error or the record count traversed
2748   if fn is NULL then it is not called
2749   a non-zero return value from fn() indicates that the traversal should stop
2750  */
2751static int tdb_traverse_internal(struct tdb_context *tdb,
2752				 tdb_traverse_func fn, void *private_data,
2753				 struct tdb_traverse_lock *tl)
2754{
2755	TDB_DATA key, dbuf;
2756	struct list_struct rec;
2757	int ret, count = 0;
2758
2759	/* This was in the initializaton, above, but the IRIX compiler
2760	 * did not like it.  crh
2761	 */
2762	tl->next = tdb->travlocks.next;
2763
2764	/* fcntl locks don't stack: beware traverse inside traverse */
2765	tdb->travlocks.next = tl;
2766
2767	/* tdb_next_lock places locks on the record returned, and its chain */
2768	while ((ret = tdb_next_lock(tdb, tl, &rec)) > 0) {
2769		count++;
2770		/* now read the full record */
2771		key.dptr = tdb_alloc_read(tdb, tl->off + sizeof(rec),
2772					  rec.key_len + rec.data_len);
2773		if (!key.dptr) {
2774			ret = -1;
2775			if (tdb_unlock(tdb, tl->hash, tl->lock_rw) != 0)
2776				goto out;
2777			if (tdb_unlock_record(tdb, tl->off) != 0)
2778				TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_traverse: key.dptr == NULL and unlock_record failed!\n"));
2779			goto out;
2780		}
2781		key.dsize = rec.key_len;
2782		dbuf.dptr = key.dptr + rec.key_len;
2783		dbuf.dsize = rec.data_len;
2784
2785		/* Drop chain lock, call out */
2786		if (tdb_unlock(tdb, tl->hash, tl->lock_rw) != 0) {
2787			ret = -1;
2788			SAFE_FREE(key.dptr);
2789			goto out;
2790		}
2791		if (fn && fn(tdb, key, dbuf, private_data)) {
2792			/* They want us to terminate traversal */
2793			ret = count;
2794			if (tdb_unlock_record(tdb, tl->off) != 0) {
2795				TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_traverse: unlock_record failed!\n"));;
2796				ret = -1;
2797			}
2798			SAFE_FREE(key.dptr);
2799			goto out;
2800		}
2801		SAFE_FREE(key.dptr);
2802	}
2803out:
2804	tdb->travlocks.next = tl->next;
2805	if (ret < 0)
2806		return -1;
2807	else
2808		return count;
2809}
2810
2811
2812/*
2813  a write style traverse - temporarily marks the db read only
2814*/
2815int tdb_traverse_read(struct tdb_context *tdb,
2816		      tdb_traverse_func fn, void *private_data)
2817{
2818	struct tdb_traverse_lock tl = { NULL, 0, 0, F_RDLCK };
2819	int ret;
2820
2821	/* we need to get a read lock on the transaction lock here to
2822	   cope with the lock ordering semantics of solaris10 */
2823	if (tdb_transaction_lock(tdb, F_RDLCK)) {
2824		return -1;
2825	}
2826
2827	tdb->traverse_read++;
2828	ret = tdb_traverse_internal(tdb, fn, private_data, &tl);
2829	tdb->traverse_read--;
2830
2831	tdb_transaction_unlock(tdb);
2832
2833	return ret;
2834}
2835
2836/*
2837  a write style traverse - needs to get the transaction lock to
2838  prevent deadlocks
2839*/
2840int tdb_traverse(struct tdb_context *tdb,
2841		 tdb_traverse_func fn, void *private_data)
2842{
2843	struct tdb_traverse_lock tl = { NULL, 0, 0, F_WRLCK };
2844	int ret;
2845
2846	if (tdb->read_only || tdb->traverse_read) {
2847		return tdb_traverse_read(tdb, fn, private_data);
2848	}
2849
2850	if (tdb_transaction_lock(tdb, F_WRLCK)) {
2851		return -1;
2852	}
2853
2854	ret = tdb_traverse_internal(tdb, fn, private_data, &tl);
2855
2856	tdb_transaction_unlock(tdb);
2857
2858	return ret;
2859}
2860
2861
2862/* find the first entry in the database and return its key */
2863TDB_DATA tdb_firstkey(struct tdb_context *tdb)
2864{
2865	TDB_DATA key;
2866	struct list_struct rec;
2867
2868	/* release any old lock */
2869	if (tdb_unlock_record(tdb, tdb->travlocks.off) != 0)
2870		return tdb_null;
2871	tdb->travlocks.off = tdb->travlocks.hash = 0;
2872	tdb->travlocks.lock_rw = F_RDLCK;
2873
2874	/* Grab first record: locks chain and returned record. */
2875	if (tdb_next_lock(tdb, &tdb->travlocks, &rec) <= 0)
2876		return tdb_null;
2877	/* now read the key */
2878	key.dsize = rec.key_len;
2879	key.dptr =tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),key.dsize);
2880
2881	/* Unlock the hash chain of the record we just read. */
2882	if (tdb_unlock(tdb, tdb->travlocks.hash, tdb->travlocks.lock_rw) != 0)
2883		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_firstkey: error occurred while tdb_unlocking!\n"));
2884	return key;
2885}
2886
2887/* find the next entry in the database, returning its key */
2888TDB_DATA tdb_nextkey(struct tdb_context *tdb, TDB_DATA oldkey)
2889{
2890	u32 oldhash;
2891	TDB_DATA key = tdb_null;
2892	struct list_struct rec;
2893	unsigned char *k = NULL;
2894
2895	/* Is locked key the old key?  If so, traverse will be reliable. */
2896	if (tdb->travlocks.off) {
2897		if (tdb_lock(tdb,tdb->travlocks.hash,tdb->travlocks.lock_rw))
2898			return tdb_null;
2899		if (tdb_rec_read(tdb, tdb->travlocks.off, &rec) == -1
2900		    || !(k = tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),
2901					    rec.key_len))
2902		    || memcmp(k, oldkey.dptr, oldkey.dsize) != 0) {
2903			/* No, it wasn't: unlock it and start from scratch */
2904			if (tdb_unlock_record(tdb, tdb->travlocks.off) != 0) {
2905				SAFE_FREE(k);
2906				return tdb_null;
2907			}
2908			if (tdb_unlock(tdb, tdb->travlocks.hash, tdb->travlocks.lock_rw) != 0) {
2909				SAFE_FREE(k);
2910				return tdb_null;
2911			}
2912			tdb->travlocks.off = 0;
2913		}
2914
2915		SAFE_FREE(k);
2916	}
2917
2918	if (!tdb->travlocks.off) {
2919		/* No previous element: do normal find, and lock record */
2920		tdb->travlocks.off = tdb_find_lock_hash(tdb, oldkey, tdb->hash_fn(&oldkey), tdb->travlocks.lock_rw, &rec);
2921		if (!tdb->travlocks.off)
2922			return tdb_null;
2923		tdb->travlocks.hash = BUCKET(rec.full_hash);
2924		if (tdb_lock_record(tdb, tdb->travlocks.off) != 0) {
2925			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_nextkey: lock_record failed (%s)!\n", strerror(errno)));
2926			return tdb_null;
2927		}
2928	}
2929	oldhash = tdb->travlocks.hash;
2930
2931	/* Grab next record: locks chain and returned record,
2932	   unlocks old record */
2933	if (tdb_next_lock(tdb, &tdb->travlocks, &rec) > 0) {
2934		key.dsize = rec.key_len;
2935		key.dptr = tdb_alloc_read(tdb, tdb->travlocks.off+sizeof(rec),
2936					  key.dsize);
2937		/* Unlock the chain of this new record */
2938		if (tdb_unlock(tdb, tdb->travlocks.hash, tdb->travlocks.lock_rw) != 0)
2939			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
2940	}
2941	/* Unlock the chain of old record */
2942	if (tdb_unlock(tdb, BUCKET(oldhash), tdb->travlocks.lock_rw) != 0)
2943		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
2944	return key;
2945}
2946
2947/* file: dump.c */
2948
2949static tdb_off_t tdb_dump_record(struct tdb_context *tdb, int hash,
2950				 tdb_off_t offset)
2951{
2952	struct list_struct rec;
2953	tdb_off_t tailer_ofs, tailer;
2954
2955	if (tdb->methods->tdb_read(tdb, offset, (char *)&rec,
2956				   sizeof(rec), DOCONV()) == -1) {
2957		printf("ERROR: failed to read record at %u\n", offset);
2958		return 0;
2959	}
2960
2961	printf(" rec: hash=%d offset=0x%08x next=0x%08x rec_len=%d "
2962	       "key_len=%d data_len=%d full_hash=0x%x magic=0x%x\n",
2963	       hash, offset, rec.next, rec.rec_len, rec.key_len, rec.data_len,
2964	       rec.full_hash, rec.magic);
2965
2966	tailer_ofs = offset + sizeof(rec) + rec.rec_len - sizeof(tdb_off_t);
2967
2968	if (tdb_ofs_read(tdb, tailer_ofs, &tailer) == -1) {
2969		printf("ERROR: failed to read tailer at %u\n", tailer_ofs);
2970		return rec.next;
2971	}
2972
2973	if (tailer != rec.rec_len + sizeof(rec)) {
2974		printf("ERROR: tailer does not match record! tailer=%u totalsize=%u\n",
2975				(unsigned int)tailer, (unsigned int)(rec.rec_len + sizeof(rec)));
2976	}
2977	return rec.next;
2978}
2979
2980static int tdb_dump_chain(struct tdb_context *tdb, int i)
2981{
2982	tdb_off_t rec_ptr, top;
2983
2984	top = TDB_HASH_TOP(i);
2985
2986	if (tdb_lock(tdb, i, F_WRLCK) != 0)
2987		return -1;
2988
2989	if (tdb_ofs_read(tdb, top, &rec_ptr) == -1)
2990		return tdb_unlock(tdb, i, F_WRLCK);
2991
2992	if (rec_ptr)
2993		printf("hash=%d\n", i);
2994
2995	while (rec_ptr) {
2996		rec_ptr = tdb_dump_record(tdb, i, rec_ptr);
2997	}
2998
2999	return tdb_unlock(tdb, i, F_WRLCK);
3000}
3001
3002void tdb_dump_all(struct tdb_context *tdb)
3003{
3004	int i;
3005	for (i=0;i<tdb->header.hash_size;i++) {
3006		tdb_dump_chain(tdb, i);
3007	}
3008	printf("freelist:\n");
3009	tdb_dump_chain(tdb, -1);
3010}
3011
3012int tdb_printfreelist(struct tdb_context *tdb)
3013{
3014	int ret;
3015	long total_free = 0;
3016	tdb_off_t offset, rec_ptr;
3017	struct list_struct rec;
3018
3019	if ((ret = tdb_lock(tdb, -1, F_WRLCK)) != 0)
3020		return ret;
3021
3022	offset = FREELIST_TOP;
3023
3024	/* read in the freelist top */
3025	if (tdb_ofs_read(tdb, offset, &rec_ptr) == -1) {
3026		tdb_unlock(tdb, -1, F_WRLCK);
3027		return 0;
3028	}
3029
3030	printf("freelist top=[0x%08x]\n", rec_ptr );
3031	while (rec_ptr) {
3032		if (tdb->methods->tdb_read(tdb, rec_ptr, (char *)&rec,
3033					   sizeof(rec), DOCONV()) == -1) {
3034			tdb_unlock(tdb, -1, F_WRLCK);
3035			return -1;
3036		}
3037
3038		if (rec.magic != TDB_FREE_MAGIC) {
3039			printf("bad magic 0x%08x in free list\n", rec.magic);
3040			tdb_unlock(tdb, -1, F_WRLCK);
3041			return -1;
3042		}
3043
3044		printf("entry offset=[0x%08x], rec.rec_len = [0x%08x (%d)] (end = 0x%08x)\n",
3045		       rec_ptr, rec.rec_len, rec.rec_len, rec_ptr + rec.rec_len);
3046		total_free += rec.rec_len;
3047
3048		/* move to the next record */
3049		rec_ptr = rec.next;
3050	}
3051	printf("total rec_len = [0x%08x (%d)]\n", (int)total_free,
3052               (int)total_free);
3053
3054	return tdb_unlock(tdb, -1, F_WRLCK);
3055}
3056
3057/* file: tdb.c */
3058
3059/*
3060  non-blocking increment of the tdb sequence number if the tdb has been opened using
3061  the TDB_SEQNUM flag
3062*/
3063void tdb_increment_seqnum_nonblock(struct tdb_context *tdb)
3064{
3065	tdb_off_t seqnum=0;
3066
3067	if (!(tdb->flags & TDB_SEQNUM)) {
3068		return;
3069	}
3070
3071	/* we ignore errors from this, as we have no sane way of
3072	   dealing with them.
3073	*/
3074	tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
3075	seqnum++;
3076	tdb_ofs_write(tdb, TDB_SEQNUM_OFS, &seqnum);
3077}
3078
3079/*
3080  increment the tdb sequence number if the tdb has been opened using
3081  the TDB_SEQNUM flag
3082*/
3083static void tdb_increment_seqnum(struct tdb_context *tdb)
3084{
3085	if (!(tdb->flags & TDB_SEQNUM)) {
3086		return;
3087	}
3088
3089	if (tdb_brlock(tdb, TDB_SEQNUM_OFS, F_WRLCK, F_SETLKW, 1, 1) != 0) {
3090		return;
3091	}
3092
3093	tdb_increment_seqnum_nonblock(tdb);
3094
3095	tdb_brlock(tdb, TDB_SEQNUM_OFS, F_UNLCK, F_SETLKW, 1, 1);
3096}
3097
3098static int tdb_key_compare(TDB_DATA key, TDB_DATA data, void *private_data)
3099{
3100	return memcmp(data.dptr, key.dptr, data.dsize);
3101}
3102
3103/* Returns 0 on fail.  On success, return offset of record, and fills
3104   in rec */
3105static tdb_off_t tdb_find(struct tdb_context *tdb, TDB_DATA key, u32 hash,
3106			struct list_struct *r)
3107{
3108	tdb_off_t rec_ptr;
3109
3110	/* read in the hash top */
3111	if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
3112		return 0;
3113
3114	/* keep looking until we find the right record */
3115	while (rec_ptr) {
3116		if (tdb_rec_read(tdb, rec_ptr, r) == -1)
3117			return 0;
3118
3119		if (!TDB_DEAD(r) && hash==r->full_hash
3120		    && key.dsize==r->key_len
3121		    && tdb_parse_data(tdb, key, rec_ptr + sizeof(*r),
3122				      r->key_len, tdb_key_compare,
3123				      NULL) == 0) {
3124			return rec_ptr;
3125		}
3126		rec_ptr = r->next;
3127	}
3128	return TDB_ERRCODE(TDB_ERR_NOEXIST, 0);
3129}
3130
3131/* As tdb_find, but if you succeed, keep the lock */
3132tdb_off_t tdb_find_lock_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash, int locktype,
3133			   struct list_struct *rec)
3134{
3135	u32 rec_ptr;
3136
3137	if (tdb_lock(tdb, BUCKET(hash), locktype) == -1)
3138		return 0;
3139	if (!(rec_ptr = tdb_find(tdb, key, hash, rec)))
3140		tdb_unlock(tdb, BUCKET(hash), locktype);
3141	return rec_ptr;
3142}
3143
3144
3145/* update an entry in place - this only works if the new data size
3146   is <= the old data size and the key exists.
3147   on failure return -1.
3148*/
3149static int tdb_update_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash, TDB_DATA dbuf)
3150{
3151	struct list_struct rec;
3152	tdb_off_t rec_ptr;
3153
3154	/* find entry */
3155	if (!(rec_ptr = tdb_find(tdb, key, hash, &rec)))
3156		return -1;
3157
3158	/* must be long enough key, data and tailer */
3159	if (rec.rec_len < key.dsize + dbuf.dsize + sizeof(tdb_off_t)) {
3160		tdb->ecode = TDB_SUCCESS; /* Not really an error */
3161		return -1;
3162	}
3163
3164	if (tdb->methods->tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len,
3165		      dbuf.dptr, dbuf.dsize) == -1)
3166		return -1;
3167
3168	if (dbuf.dsize != rec.data_len) {
3169		/* update size */
3170		rec.data_len = dbuf.dsize;
3171		return tdb_rec_write(tdb, rec_ptr, &rec);
3172	}
3173
3174	return 0;
3175}
3176
3177/* find an entry in the database given a key */
3178/* If an entry doesn't exist tdb_err will be set to
3179 * TDB_ERR_NOEXIST. If a key has no data attached
3180 * then the TDB_DATA will have zero length but
3181 * a non-zero pointer
3182 */
3183TDB_DATA tdb_fetch(struct tdb_context *tdb, TDB_DATA key)
3184{
3185	tdb_off_t rec_ptr;
3186	struct list_struct rec;
3187	TDB_DATA ret;
3188	u32 hash;
3189
3190	/* find which hash bucket it is in */
3191	hash = tdb->hash_fn(&key);
3192	if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec)))
3193		return tdb_null;
3194
3195	ret.dptr = tdb_alloc_read(tdb, rec_ptr + sizeof(rec) + rec.key_len,
3196				  rec.data_len);
3197	ret.dsize = rec.data_len;
3198	tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
3199	return ret;
3200}
3201
3202/*
3203 * Find an entry in the database and hand the record's data to a parsing
3204 * function. The parsing function is executed under the chain read lock, so it
3205 * should be fast and should not block on other syscalls.
3206 *
3207 * DONT CALL OTHER TDB CALLS FROM THE PARSER, THIS MIGHT LEAD TO SEGFAULTS.
3208 *
3209 * For mmapped tdb's that do not have a transaction open it points the parsing
3210 * function directly at the mmap area, it avoids the malloc/memcpy in this
3211 * case. If a transaction is open or no mmap is available, it has to do
3212 * malloc/read/parse/free.
3213 *
3214 * This is interesting for all readers of potentially large data structures in
3215 * the tdb records, ldb indexes being one example.
3216 */
3217
3218int tdb_parse_record(struct tdb_context *tdb, TDB_DATA key,
3219		     int (*parser)(TDB_DATA key, TDB_DATA data,
3220				   void *private_data),
3221		     void *private_data)
3222{
3223	tdb_off_t rec_ptr;
3224	struct list_struct rec;
3225	int ret;
3226	u32 hash;
3227
3228	/* find which hash bucket it is in */
3229	hash = tdb->hash_fn(&key);
3230
3231	if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec))) {
3232		return TDB_ERRCODE(TDB_ERR_NOEXIST, 0);
3233	}
3234
3235	ret = tdb_parse_data(tdb, key, rec_ptr + sizeof(rec) + rec.key_len,
3236			     rec.data_len, parser, private_data);
3237
3238	tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
3239
3240	return ret;
3241}
3242
3243/* check if an entry in the database exists
3244
3245   note that 1 is returned if the key is found and 0 is returned if not found
3246   this doesn't match the conventions in the rest of this module, but is
3247   compatible with gdbm
3248*/
3249static int tdb_exists_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash)
3250{
3251	struct list_struct rec;
3252
3253	if (tdb_find_lock_hash(tdb, key, hash, F_RDLCK, &rec) == 0)
3254		return 0;
3255	tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
3256	return 1;
3257}
3258
3259int tdb_exists(struct tdb_context *tdb, TDB_DATA key)
3260{
3261	u32 hash = tdb->hash_fn(&key);
3262	return tdb_exists_hash(tdb, key, hash);
3263}
3264
3265/* actually delete an entry in the database given the offset */
3266int tdb_do_delete(struct tdb_context *tdb, tdb_off_t rec_ptr, struct list_struct*rec)
3267{
3268	tdb_off_t last_ptr, i;
3269	struct list_struct lastrec;
3270
3271	if (tdb->read_only || tdb->traverse_read) return -1;
3272
3273	if (tdb_write_lock_record(tdb, rec_ptr) == -1) {
3274		/* Someone traversing here: mark it as dead */
3275		rec->magic = TDB_DEAD_MAGIC;
3276		return tdb_rec_write(tdb, rec_ptr, rec);
3277	}
3278	if (tdb_write_unlock_record(tdb, rec_ptr) != 0)
3279		return -1;
3280
3281	/* find previous record in hash chain */
3282	if (tdb_ofs_read(tdb, TDB_HASH_TOP(rec->full_hash), &i) == -1)
3283		return -1;
3284	for (last_ptr = 0; i != rec_ptr; last_ptr = i, i = lastrec.next)
3285		if (tdb_rec_read(tdb, i, &lastrec) == -1)
3286			return -1;
3287
3288	/* unlink it: next ptr is at start of record. */
3289	if (last_ptr == 0)
3290		last_ptr = TDB_HASH_TOP(rec->full_hash);
3291	if (tdb_ofs_write(tdb, last_ptr, &rec->next) == -1)
3292		return -1;
3293
3294	/* recover the space */
3295	if (tdb_free(tdb, rec_ptr, rec) == -1)
3296		return -1;
3297	return 0;
3298}
3299
3300static int tdb_count_dead(struct tdb_context *tdb, u32 hash)
3301{
3302	int res = 0;
3303	tdb_off_t rec_ptr;
3304	struct list_struct rec;
3305
3306	/* read in the hash top */
3307	if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
3308		return 0;
3309
3310	while (rec_ptr) {
3311		if (tdb_rec_read(tdb, rec_ptr, &rec) == -1)
3312			return 0;
3313
3314		if (rec.magic == TDB_DEAD_MAGIC) {
3315			res += 1;
3316		}
3317		rec_ptr = rec.next;
3318	}
3319	return res;
3320}
3321
3322/*
3323 * Purge all DEAD records from a hash chain
3324 */
3325static int tdb_purge_dead(struct tdb_context *tdb, u32 hash)
3326{
3327	int res = -1;
3328	struct list_struct rec;
3329	tdb_off_t rec_ptr;
3330
3331	if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
3332		return -1;
3333	}
3334
3335	/* read in the hash top */
3336	if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
3337		goto fail;
3338
3339	while (rec_ptr) {
3340		tdb_off_t next;
3341
3342		if (tdb_rec_read(tdb, rec_ptr, &rec) == -1) {
3343			goto fail;
3344		}
3345
3346		next = rec.next;
3347
3348		if (rec.magic == TDB_DEAD_MAGIC
3349		    && tdb_do_delete(tdb, rec_ptr, &rec) == -1) {
3350			goto fail;
3351		}
3352		rec_ptr = next;
3353	}
3354	res = 0;
3355 fail:
3356	tdb_unlock(tdb, -1, F_WRLCK);
3357	return res;
3358}
3359
3360/* delete an entry in the database given a key */
3361static int tdb_delete_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash)
3362{
3363	tdb_off_t rec_ptr;
3364	struct list_struct rec;
3365	int ret;
3366
3367	if (tdb->max_dead_records != 0) {
3368
3369		/*
3370		 * Allow for some dead records per hash chain, mainly for
3371		 * tdb's with a very high create/delete rate like locking.tdb.
3372		 */
3373
3374		if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
3375			return -1;
3376
3377		if (tdb_count_dead(tdb, hash) >= tdb->max_dead_records) {
3378			/*
3379			 * Don't let the per-chain freelist grow too large,
3380			 * delete all existing dead records
3381			 */
3382			tdb_purge_dead(tdb, hash);
3383		}
3384
3385		if (!(rec_ptr = tdb_find(tdb, key, hash, &rec))) {
3386			tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
3387			return -1;
3388		}
3389
3390		/*
3391		 * Just mark the record as dead.
3392		 */
3393		rec.magic = TDB_DEAD_MAGIC;
3394		ret = tdb_rec_write(tdb, rec_ptr, &rec);
3395	}
3396	else {
3397		if (!(rec_ptr = tdb_find_lock_hash(tdb, key, hash, F_WRLCK,
3398						   &rec)))
3399			return -1;
3400
3401		ret = tdb_do_delete(tdb, rec_ptr, &rec);
3402	}
3403
3404	if (ret == 0) {
3405		tdb_increment_seqnum(tdb);
3406	}
3407
3408	if (tdb_unlock(tdb, BUCKET(rec.full_hash), F_WRLCK) != 0)
3409		TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_delete: WARNING tdb_unlock failed!\n"));
3410	return ret;
3411}
3412
3413int tdb_delete(struct tdb_context *tdb, TDB_DATA key)
3414{
3415	u32 hash = tdb->hash_fn(&key);
3416	return tdb_delete_hash(tdb, key, hash);
3417}
3418
3419/*
3420 * See if we have a dead record around with enough space
3421 */
3422static tdb_off_t tdb_find_dead(struct tdb_context *tdb, u32 hash,
3423			       struct list_struct *r, tdb_len_t length)
3424{
3425	tdb_off_t rec_ptr;
3426
3427	/* read in the hash top */
3428	if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
3429		return 0;
3430
3431	/* keep looking until we find the right record */
3432	while (rec_ptr) {
3433		if (tdb_rec_read(tdb, rec_ptr, r) == -1)
3434			return 0;
3435
3436		if (TDB_DEAD(r) && r->rec_len >= length) {
3437			/*
3438			 * First fit for simple coding, TODO: change to best
3439			 * fit
3440			 */
3441			return rec_ptr;
3442		}
3443		rec_ptr = r->next;
3444	}
3445	return 0;
3446}
3447
3448/* store an element in the database, replacing any existing element
3449   with the same key
3450
3451   return 0 on success, -1 on failure
3452*/
3453int tdb_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
3454{
3455	struct list_struct rec;
3456	u32 hash;
3457	tdb_off_t rec_ptr;
3458	char *p = NULL;
3459	int ret = -1;
3460
3461	if (tdb->read_only || tdb->traverse_read) {
3462		tdb->ecode = TDB_ERR_RDONLY;
3463		return -1;
3464	}
3465
3466	/* find which hash bucket it is in */
3467	hash = tdb->hash_fn(&key);
3468	if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
3469		return -1;
3470
3471	/* check for it existing, on insert. */
3472	if (flag == TDB_INSERT) {
3473		if (tdb_exists_hash(tdb, key, hash)) {
3474			tdb->ecode = TDB_ERR_EXISTS;
3475			goto fail;
3476		}
3477	} else {
3478		/* first try in-place update, on modify or replace. */
3479		if (tdb_update_hash(tdb, key, hash, dbuf) == 0) {
3480			goto done;
3481		}
3482		if (tdb->ecode == TDB_ERR_NOEXIST &&
3483		    flag == TDB_MODIFY) {
3484			/* if the record doesn't exist and we are in TDB_MODIFY mode then
3485			 we should fail the store */
3486			goto fail;
3487		}
3488	}
3489	/* reset the error code potentially set by the tdb_update() */
3490	tdb->ecode = TDB_SUCCESS;
3491
3492	/* delete any existing record - if it doesn't exist we don't
3493           care.  Doing this first reduces fragmentation, and avoids
3494           coalescing with `allocated' block before it's updated. */
3495	if (flag != TDB_INSERT)
3496		tdb_delete_hash(tdb, key, hash);
3497
3498	/* Copy key+value *before* allocating free space in case malloc
3499	   fails and we are left with a dead spot in the tdb. */
3500
3501	if (!(p = (char *)malloc(key.dsize + dbuf.dsize))) {
3502		tdb->ecode = TDB_ERR_OOM;
3503		goto fail;
3504	}
3505
3506	memcpy(p, key.dptr, key.dsize);
3507	if (dbuf.dsize)
3508		memcpy(p+key.dsize, dbuf.dptr, dbuf.dsize);
3509
3510	if (tdb->max_dead_records != 0) {
3511		/*
3512		 * Allow for some dead records per hash chain, look if we can
3513		 * find one that can hold the new record. We need enough space
3514		 * for key, data and tailer. If we find one, we don't have to
3515		 * consult the central freelist.
3516		 */
3517		rec_ptr = tdb_find_dead(
3518			tdb, hash, &rec,
3519			key.dsize + dbuf.dsize + sizeof(tdb_off_t));
3520
3521		if (rec_ptr != 0) {
3522			rec.key_len = key.dsize;
3523			rec.data_len = dbuf.dsize;
3524			rec.full_hash = hash;
3525			rec.magic = TDB_MAGIC;
3526			if (tdb_rec_write(tdb, rec_ptr, &rec) == -1
3527			    || tdb->methods->tdb_write(
3528				    tdb, rec_ptr + sizeof(rec),
3529				    p, key.dsize + dbuf.dsize) == -1) {
3530				goto fail;
3531			}
3532			goto done;
3533		}
3534	}
3535
3536	/*
3537	 * We have to allocate some space from the freelist, so this means we
3538	 * have to lock it. Use the chance to purge all the DEAD records from
3539	 * the hash chain under the freelist lock.
3540	 */
3541
3542	if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
3543		goto fail;
3544	}
3545
3546	if ((tdb->max_dead_records != 0)
3547	    && (tdb_purge_dead(tdb, hash) == -1)) {
3548		tdb_unlock(tdb, -1, F_WRLCK);
3549		goto fail;
3550	}
3551
3552	/* we have to allocate some space */
3553	rec_ptr = tdb_allocate(tdb, key.dsize + dbuf.dsize, &rec);
3554
3555	tdb_unlock(tdb, -1, F_WRLCK);
3556
3557	if (rec_ptr == 0) {
3558		goto fail;
3559	}
3560
3561	/* Read hash top into next ptr */
3562	if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
3563		goto fail;
3564
3565	rec.key_len = key.dsize;
3566	rec.data_len = dbuf.dsize;
3567	rec.full_hash = hash;
3568	rec.magic = TDB_MAGIC;
3569
3570	/* write out and point the top of the hash chain at it */
3571	if (tdb_rec_write(tdb, rec_ptr, &rec) == -1
3572	    || tdb->methods->tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+dbuf.dsize)==-1
3573	    || tdb_ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
3574		/* Need to tdb_unallocate() here */
3575		goto fail;
3576	}
3577
3578 done:
3579	ret = 0;
3580 fail:
3581	if (ret == 0) {
3582		tdb_increment_seqnum(tdb);
3583	}
3584
3585	SAFE_FREE(p);
3586	tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
3587	return ret;
3588}
3589
3590
3591/* Append to an entry. Create if not exist. */
3592int tdb_append(struct tdb_context *tdb, TDB_DATA key, TDB_DATA new_dbuf)
3593{
3594	u32 hash;
3595	TDB_DATA dbuf;
3596	int ret = -1;
3597
3598	/* find which hash bucket it is in */
3599	hash = tdb->hash_fn(&key);
3600	if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
3601		return -1;
3602
3603	dbuf = tdb_fetch(tdb, key);
3604
3605	if (dbuf.dptr == NULL) {
3606		dbuf.dptr = (unsigned char *)malloc(new_dbuf.dsize);
3607	} else {
3608		unsigned char *new_dptr = (unsigned char *)realloc(dbuf.dptr,
3609						     dbuf.dsize + new_dbuf.dsize);
3610		if (new_dptr == NULL) {
3611			free(dbuf.dptr);
3612		}
3613		dbuf.dptr = new_dptr;
3614	}
3615
3616	if (dbuf.dptr == NULL) {
3617		tdb->ecode = TDB_ERR_OOM;
3618		goto failed;
3619	}
3620
3621	memcpy(dbuf.dptr + dbuf.dsize, new_dbuf.dptr, new_dbuf.dsize);
3622	dbuf.dsize += new_dbuf.dsize;
3623
3624	ret = tdb_store(tdb, key, dbuf, 0);
3625
3626failed:
3627	tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
3628	SAFE_FREE(dbuf.dptr);
3629	return ret;
3630}
3631
3632
3633/*
3634  return the name of the current tdb file
3635  useful for external logging functions
3636*/
3637const char *tdb_name(struct tdb_context *tdb)
3638{
3639	return tdb->name;
3640}
3641
3642/*
3643  return the underlying file descriptor being used by tdb, or -1
3644  useful for external routines that want to check the device/inode
3645  of the fd
3646*/
3647int tdb_fd(struct tdb_context *tdb)
3648{
3649	return tdb->fd;
3650}
3651
3652/*
3653  return the current logging function
3654  useful for external tdb routines that wish to log tdb errors
3655*/
3656tdb_log_func tdb_log_fn(struct tdb_context *tdb)
3657{
3658	return tdb->log.log_fn;
3659}
3660
3661
3662/*
3663  get the tdb sequence number. Only makes sense if the writers opened
3664  with TDB_SEQNUM set. Note that this sequence number will wrap quite
3665  quickly, so it should only be used for a 'has something changed'
3666  test, not for code that relies on the count of the number of changes
3667  made. If you want a counter then use a tdb record.
3668
3669  The aim of this sequence number is to allow for a very lightweight
3670  test of a possible tdb change.
3671*/
3672int tdb_get_seqnum(struct tdb_context *tdb)
3673{
3674	tdb_off_t seqnum=0;
3675
3676	tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
3677	return seqnum;
3678}
3679
3680int tdb_hash_size(struct tdb_context *tdb)
3681{
3682	return tdb->header.hash_size;
3683}
3684
3685size_t tdb_map_size(struct tdb_context *tdb)
3686{
3687	return tdb->map_size;
3688}
3689
3690int tdb_get_flags(struct tdb_context *tdb)
3691{
3692	return tdb->flags;
3693}
3694
3695
3696/*
3697  enable sequence number handling on an open tdb
3698*/
3699void tdb_enable_seqnum(struct tdb_context *tdb)
3700{
3701	tdb->flags |= TDB_SEQNUM;
3702}
3703
3704/* file: open.c */
3705
3706/* all contexts, to ensure no double-opens (fcntl locks don't nest!) */
3707static struct tdb_context *tdbs = NULL;
3708
3709
3710/* This is from a hash algorithm suggested by Rogier Wolff */
3711static unsigned int default_tdb_hash(TDB_DATA *key)
3712{
3713	u32 value;	/* Used to compute the hash value.  */
3714	u32   i;	/* Used to cycle through random values. */
3715
3716	/* Set the initial value from the key size. */
3717	for (value = 0, i=0; i < key->dsize; i++)
3718		value = value * 256 + key->dptr[i] + (value >> 24) * 241;
3719
3720	return value;
3721}
3722
3723
3724/* initialise a new database with a specified hash size */
3725static int tdb_new_database(struct tdb_context *tdb, int hash_size)
3726{
3727	struct tdb_header *newdb;
3728	int size, ret = -1;
3729
3730	/* We make it up in memory, then write it out if not internal */
3731	size = sizeof(struct tdb_header) + (hash_size+1)*sizeof(tdb_off_t);
3732	if (!(newdb = (struct tdb_header *)calloc(size, 1)))
3733		return TDB_ERRCODE(TDB_ERR_OOM, -1);
3734
3735	/* Fill in the header */
3736	newdb->version = TDB_VERSION;
3737	newdb->hash_size = hash_size;
3738	if (tdb->flags & TDB_INTERNAL) {
3739		tdb->map_size = size;
3740		tdb->map_ptr = (char *)newdb;
3741		memcpy(&tdb->header, newdb, sizeof(tdb->header));
3742		/* Convert the `ondisk' version if asked. */
3743		CONVERT(*newdb);
3744		return 0;
3745	}
3746	if (lseek(tdb->fd, 0, SEEK_SET) == -1)
3747		goto fail;
3748
3749	if (ftruncate(tdb->fd, 0) == -1)
3750		goto fail;
3751
3752	/* This creates an endian-converted header, as if read from disk */
3753	CONVERT(*newdb);
3754	memcpy(&tdb->header, newdb, sizeof(tdb->header));
3755	/* Don't endian-convert the magic food! */
3756	memcpy(newdb->magic_food, TDB_MAGIC_FOOD, strlen(TDB_MAGIC_FOOD)+1);
3757	if (write(tdb->fd, newdb, size) != size) {
3758		ret = -1;
3759	} else {
3760		ret = 0;
3761	}
3762
3763  fail:
3764	SAFE_FREE(newdb);
3765	return ret;
3766}
3767
3768
3769
3770static int tdb_already_open(dev_t device,
3771			    ino_t ino)
3772{
3773	struct tdb_context *i;
3774
3775	for (i = tdbs; i; i = i->next) {
3776		if (i->device == device && i->inode == ino) {
3777			return 1;
3778		}
3779	}
3780
3781	return 0;
3782}
3783
3784/* open the database, creating it if necessary
3785
3786   The open_flags and mode are passed straight to the open call on the
3787   database file. A flags value of O_WRONLY is invalid. The hash size
3788   is advisory, use zero for a default value.
3789
3790   Return is NULL on error, in which case errno is also set.  Don't
3791   try to call tdb_error or tdb_errname, just do strerror(errno).
3792
3793   @param name may be NULL for internal databases. */
3794struct tdb_context *tdb_open(const char *name, int hash_size, int tdb_flags,
3795		      int open_flags, mode_t mode)
3796{
3797	return tdb_open_ex(name, hash_size, tdb_flags, open_flags, mode, NULL, NULL);
3798}
3799
3800/* a default logging function */
3801static void null_log_fn(struct tdb_context *tdb, enum tdb_debug_level level, const char *fmt, ...) PRINTF_ATTRIBUTE(3, 4);
3802static void null_log_fn(struct tdb_context *tdb, enum tdb_debug_level level, const char *fmt, ...)
3803{
3804}
3805
3806
3807struct tdb_context *tdb_open_ex(const char *name, int hash_size, int tdb_flags,
3808				int open_flags, mode_t mode,
3809				const struct tdb_logging_context *log_ctx,
3810				tdb_hash_func hash_fn)
3811{
3812	struct tdb_context *tdb;
3813	struct stat st;
3814	int rev = 0, locked = 0;
3815	unsigned char *vp;
3816	u32 vertest;
3817
3818	if (!(tdb = (struct tdb_context *)calloc(1, sizeof *tdb))) {
3819		/* Can't log this */
3820		errno = ENOMEM;
3821		goto fail;
3822	}
3823	tdb_io_init(tdb);
3824	tdb->fd = -1;
3825	tdb->name = NULL;
3826	tdb->map_ptr = NULL;
3827	tdb->flags = tdb_flags;
3828	tdb->open_flags = open_flags;
3829	if (log_ctx) {
3830		tdb->log = *log_ctx;
3831	} else {
3832		tdb->log.log_fn = null_log_fn;
3833		tdb->log.log_private = NULL;
3834	}
3835	tdb->hash_fn = hash_fn ? hash_fn : default_tdb_hash;
3836
3837	/* cache the page size */
3838	tdb->page_size = sysconf(_SC_PAGESIZE);
3839	if (tdb->page_size <= 0) {
3840		tdb->page_size = 0x2000;
3841	}
3842
3843	if ((open_flags & O_ACCMODE) == O_WRONLY) {
3844		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: can't open tdb %s write-only\n",
3845			 name));
3846		errno = EINVAL;
3847		goto fail;
3848	}
3849
3850	if (hash_size == 0)
3851		hash_size = DEFAULT_HASH_SIZE;
3852	if ((open_flags & O_ACCMODE) == O_RDONLY) {
3853		tdb->read_only = 1;
3854		/* read only databases don't do locking or clear if first */
3855		tdb->flags |= TDB_NOLOCK;
3856		tdb->flags &= ~TDB_CLEAR_IF_FIRST;
3857	}
3858
3859	/* internal databases don't mmap or lock, and start off cleared */
3860	if (tdb->flags & TDB_INTERNAL) {
3861		tdb->flags |= (TDB_NOLOCK | TDB_NOMMAP);
3862		tdb->flags &= ~TDB_CLEAR_IF_FIRST;
3863		if (tdb_new_database(tdb, hash_size) != 0) {
3864			TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: tdb_new_database failed!"));
3865			goto fail;
3866		}
3867		goto internal;
3868	}
3869
3870	if ((tdb->fd = open(name, open_flags, mode)) == -1) {
3871		TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_open_ex: could not open file %s: %s\n",
3872			 name, strerror(errno)));
3873		goto fail;	/* errno set by open(2) */
3874	}
3875
3876	/* ensure there is only one process initialising at once */
3877	if (tdb->methods->tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
3878		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: failed to get global lock on %s: %s\n",
3879			 name, strerror(errno)));
3880		goto fail;	/* errno set by tdb_brlock */
3881	}
3882
3883	/* we need to zero database if we are the only one with it open */
3884	if ((tdb_flags & TDB_CLEAR_IF_FIRST) &&
3885	    (locked = (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_WRLCK, F_SETLK, 0, 1) == 0))) {
3886		open_flags |= O_CREAT;
3887		if (ftruncate(tdb->fd, 0) == -1) {
3888			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_open_ex: "
3889				 "failed to truncate %s: %s\n",
3890				 name, strerror(errno)));
3891			goto fail; /* errno set by ftruncate */
3892		}
3893	}
3894
3895	if (read(tdb->fd, &tdb->header, sizeof(tdb->header)) != sizeof(tdb->header)
3896	    || strcmp(tdb->header.magic_food, TDB_MAGIC_FOOD) != 0
3897	    || (tdb->header.version != TDB_VERSION
3898		&& !(rev = (tdb->header.version==TDB_BYTEREV(TDB_VERSION))))) {
3899		/* its not a valid database - possibly initialise it */
3900		if (!(open_flags & O_CREAT) || tdb_new_database(tdb, hash_size) == -1) {
3901			errno = EIO; /* ie bad format or something */
3902			goto fail;
3903		}
3904		rev = (tdb->flags & TDB_CONVERT);
3905	}
3906	vp = (unsigned char *)&tdb->header.version;
3907	vertest = (((u32)vp[0]) << 24) | (((u32)vp[1]) << 16) |
3908		  (((u32)vp[2]) << 8) | (u32)vp[3];
3909	tdb->flags |= (vertest==TDB_VERSION) ? TDB_BIGENDIAN : 0;
3910	if (!rev)
3911		tdb->flags &= ~TDB_CONVERT;
3912	else {
3913		tdb->flags |= TDB_CONVERT;
3914		tdb_convert(&tdb->header, sizeof(tdb->header));
3915	}
3916	if (fstat(tdb->fd, &st) == -1)
3917		goto fail;
3918
3919	if (tdb->header.rwlocks != 0) {
3920		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: spinlocks no longer supported\n"));
3921		goto fail;
3922	}
3923
3924	/* Is it already in the open list?  If so, fail. */
3925	if (tdb_already_open(st.st_dev, st.st_ino)) {
3926		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: "
3927			 "%s (%d,%d) is already open in this process\n",
3928			 name, (int)st.st_dev, (int)st.st_ino));
3929		errno = EBUSY;
3930		goto fail;
3931	}
3932
3933	if (!(tdb->name = (char *)strdup(name))) {
3934		errno = ENOMEM;
3935		goto fail;
3936	}
3937
3938	tdb->map_size = st.st_size;
3939	tdb->device = st.st_dev;
3940	tdb->inode = st.st_ino;
3941	tdb->max_dead_records = 0;
3942	tdb_mmap(tdb);
3943	if (locked) {
3944		if (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_UNLCK, F_SETLK, 0, 1) == -1) {
3945			TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: "
3946				 "failed to take ACTIVE_LOCK on %s: %s\n",
3947				 name, strerror(errno)));
3948			goto fail;
3949		}
3950
3951	}
3952
3953	/* We always need to do this if the CLEAR_IF_FIRST flag is set, even if
3954	   we didn't get the initial exclusive lock as we need to let all other
3955	   users know we're using it. */
3956
3957	if (tdb_flags & TDB_CLEAR_IF_FIRST) {
3958		/* leave this lock in place to indicate it's in use */
3959		if (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0, 1) == -1)
3960			goto fail;
3961	}
3962
3963	/* if needed, run recovery */
3964	if (tdb_transaction_recover(tdb) == -1) {
3965		goto fail;
3966	}
3967
3968 internal:
3969	/* Internal (memory-only) databases skip all the code above to
3970	 * do with disk files, and resume here by releasing their
3971	 * global lock and hooking into the active list. */
3972	if (tdb->methods->tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1) == -1)
3973		goto fail;
3974	tdb->next = tdbs;
3975	tdbs = tdb;
3976	return tdb;
3977
3978 fail:
3979	{ int save_errno = errno;
3980
3981	if (!tdb)
3982		return NULL;
3983
3984	if (tdb->map_ptr) {
3985		if (tdb->flags & TDB_INTERNAL)
3986			SAFE_FREE(tdb->map_ptr);
3987		else
3988			tdb_munmap(tdb);
3989	}
3990	SAFE_FREE(tdb->name);
3991	if (tdb->fd != -1)
3992		if (close(tdb->fd) != 0)
3993			TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: failed to close tdb->fd on error!\n"));
3994	SAFE_FREE(tdb);
3995	errno = save_errno;
3996	return NULL;
3997	}
3998}
3999
4000/*
4001 * Set the maximum number of dead records per hash chain
4002 */
4003
4004void tdb_set_max_dead(struct tdb_context *tdb, int max_dead)
4005{
4006	tdb->max_dead_records = max_dead;
4007}
4008
4009/**
4010 * Close a database.
4011 *
4012 * @returns -1 for error; 0 for success.
4013 **/
4014int tdb_close(struct tdb_context *tdb)
4015{
4016	struct tdb_context **i;
4017	int ret = 0;
4018
4019	if (tdb->transaction) {
4020		tdb_transaction_cancel(tdb);
4021	}
4022
4023	if (tdb->map_ptr) {
4024		if (tdb->flags & TDB_INTERNAL)
4025			SAFE_FREE(tdb->map_ptr);
4026		else
4027			tdb_munmap(tdb);
4028	}
4029	SAFE_FREE(tdb->name);
4030	if (tdb->fd != -1)
4031		ret = close(tdb->fd);
4032	SAFE_FREE(tdb->lockrecs);
4033
4034	/* Remove from contexts list */
4035	for (i = &tdbs; *i; i = &(*i)->next) {
4036		if (*i == tdb) {
4037			*i = tdb->next;
4038			break;
4039		}
4040	}
4041
4042	memset(tdb, 0, sizeof(*tdb));
4043	SAFE_FREE(tdb);
4044
4045	return ret;
4046}
4047
4048/* register a loging function */
4049void tdb_set_logging_function(struct tdb_context *tdb,
4050                              const struct tdb_logging_context *log_ctx)
4051{
4052        tdb->log = *log_ctx;
4053}
4054
4055void *tdb_get_logging_private(struct tdb_context *tdb)
4056{
4057	return tdb->log.log_private;
4058}
4059
4060/* reopen a tdb - this can be used after a fork to ensure that we have an independent
4061   seek pointer from our parent and to re-establish locks */
4062int tdb_reopen(struct tdb_context *tdb)
4063{
4064	struct stat st;
4065
4066	if (tdb->flags & TDB_INTERNAL) {
4067		return 0; /* Nothing to do. */
4068	}
4069
4070	if (tdb->num_locks != 0 || tdb->global_lock.count) {
4071		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_reopen: reopen not allowed with locks held\n"));
4072		goto fail;
4073	}
4074
4075	if (tdb->transaction != 0) {
4076		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_reopen: reopen not allowed inside a transaction\n"));
4077		goto fail;
4078	}
4079
4080	if (tdb_munmap(tdb) != 0) {
4081		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: munmap failed (%s)\n", strerror(errno)));
4082		goto fail;
4083	}
4084	if (close(tdb->fd) != 0)
4085		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: WARNING closing tdb->fd failed!\n"));
4086	tdb->fd = open(tdb->name, tdb->open_flags & ~(O_CREAT|O_TRUNC), 0);
4087	if (tdb->fd == -1) {
4088		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: open failed (%s)\n", strerror(errno)));
4089		goto fail;
4090	}
4091	if ((tdb->flags & TDB_CLEAR_IF_FIRST) &&
4092	    (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0, 1) == -1)) {
4093		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: failed to obtain active lock\n"));
4094		goto fail;
4095	}
4096	if (fstat(tdb->fd, &st) != 0) {
4097		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: fstat failed (%s)\n", strerror(errno)));
4098		goto fail;
4099	}
4100	if (st.st_ino != tdb->inode || st.st_dev != tdb->device) {
4101		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: file dev/inode has changed!\n"));
4102		goto fail;
4103	}
4104	tdb_mmap(tdb);
4105
4106	return 0;
4107
4108fail:
4109	tdb_close(tdb);
4110	return -1;
4111}
4112
4113/* reopen all tdb's */
4114int tdb_reopen_all(int parent_longlived)
4115{
4116	struct tdb_context *tdb;
4117
4118	for (tdb=tdbs; tdb; tdb = tdb->next) {
4119		/*
4120		 * If the parent is longlived (ie. a
4121		 * parent daemon architecture), we know
4122		 * it will keep it's active lock on a
4123		 * tdb opened with CLEAR_IF_FIRST. Thus
4124		 * for child processes we don't have to
4125		 * add an active lock. This is essential
4126		 * to improve performance on systems that
4127		 * keep POSIX locks as a non-scalable data
4128		 * structure in the kernel.
4129		 */
4130		if (parent_longlived) {
4131			/* Ensure no clear-if-first. */
4132			tdb->flags &= ~TDB_CLEAR_IF_FIRST;
4133		}
4134
4135		if (tdb_reopen(tdb) != 0)
4136			return -1;
4137	}
4138
4139	return 0;
4140}
4141