1/*
2URL: svn://svnanon.samba.org/samba/branches/SAMBA_4_0/source/lib/tdb/common
3Rev: 23590
4Last Changed Date: 2007-06-22 13:36:10 -0400 (Fri, 22 Jun 2007)
5*/
6 /*
7   trivial database library - standalone version
8
9   Copyright (C) Andrew Tridgell              1999-2005
10   Copyright (C) Jeremy Allison               2000-2006
11   Copyright (C) Paul `Rusty' Russell         2000
12
13     ** NOTE! The following LGPL license applies to the tdb
14     ** library. This does NOT imply that all of Samba is released
15     ** under the LGPL
16
17   This library is free software; you can redistribute it and/or
18   modify it under the terms of the GNU Lesser General Public
19   License as published by the Free Software Foundation; either
20   version 2 of the License, or (at your option) any later version.
21
22   This library is distributed in the hope that it will be useful,
23   but WITHOUT ANY WARRANTY; without even the implied warranty of
24   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
25   Lesser General Public License for more details.
26
27   You should have received a copy of the GNU Lesser General Public
28   License along with this library; if not, write to the Free Software
29   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
30*/
31
32#ifdef CONFIG_STAND_ALONE
33#define HAVE_MMAP
34#define HAVE_STRDUP
35#define HAVE_SYS_MMAN_H
36#define HAVE_UTIME_H
37#define HAVE_UTIME
38#endif
39#ifndef __FreeBSD__
40#define _XOPEN_SOURCE 600
41#endif
42
43#include "config.h"
44#include <unistd.h>
45#include <stdio.h>
46#include <stdlib.h>
47#include <stdarg.h>
48#include <stddef.h>
49#include <errno.h>
50#include <string.h>
51#ifdef HAVE_SYS_SELECT_H
52#include <sys/select.h>
53#endif
54#include <sys/time.h>
55#include <sys/types.h>
56#include <time.h>
57#ifdef HAVE_UTIME_H
58#include <utime.h>
59#endif
60#include <sys/stat.h>
61#include <sys/file.h>
62#include <fcntl.h>
63
64#ifdef HAVE_SYS_MMAN_H
65#include <sys/mman.h>
66#endif
67
68#ifndef MAP_FILE
69#define MAP_FILE 0
70#endif
71
72#ifndef MAP_FAILED
73#define MAP_FAILED ((void *)-1)
74#endif
75
76#ifndef HAVE_STRDUP
77#define strdup rep_strdup
78static char *rep_strdup(const char *s)
79{
80	char *ret;
81	int length;
82	if (!s)
83		return NULL;
84
85	if (!length)
86		length = strlen(s);
87
88	ret = malloc(length + 1);
89	if (ret) {
90		strncpy(ret, s, length);
91		ret[length] = '\0';
92	}
93	return ret;
94}
95#endif
96
97#ifndef PRINTF_ATTRIBUTE
98#if (__GNUC__ >= 3) && (__GNUC_MINOR__ >= 1 )
99/** Use gcc attribute to check printf fns.  a1 is the 1-based index of
100 * the parameter containing the format, and a2 the index of the first
101 * argument. Note that some gcc 2.x versions don't handle this
102 * properly **/
103#define PRINTF_ATTRIBUTE(a1, a2) __attribute__ ((format (__printf__, a1, a2)))
104#else
105#define PRINTF_ATTRIBUTE(a1, a2)
106#endif
107#endif
108
109typedef int bool;
110
111#include "tdb.h"
112
113static TDB_DATA tdb_null;
114
115#ifndef u32
116#define u32 unsigned
117#endif
118
119typedef u32 tdb_len_t;
120typedef u32 tdb_off_t;
121
122#ifndef offsetof
123#define offsetof(t,f) ((unsigned int)&((t *)0)->f)
124#endif
125
126#define TDB_MAGIC_FOOD "TDB file\n"
127#define TDB_VERSION (0x26011967 + 6)
128#define TDB_MAGIC (0x26011999U)
129#define TDB_FREE_MAGIC (~TDB_MAGIC)
130#define TDB_DEAD_MAGIC (0xFEE1DEAD)
131#define TDB_RECOVERY_MAGIC (0xf53bc0e7U)
132#define TDB_ALIGNMENT 4
133#define MIN_REC_SIZE (2*sizeof(struct list_struct) + TDB_ALIGNMENT)
134#define DEFAULT_HASH_SIZE 131
135#define FREELIST_TOP (sizeof(struct tdb_header))
136#define TDB_ALIGN(x,a) (((x) + (a)-1) & ~((a)-1))
137#define TDB_BYTEREV(x) (((((x)&0xff)<<24)|((x)&0xFF00)<<8)|(((x)>>8)&0xFF00)|((x)>>24))
138#define TDB_DEAD(r) ((r)->magic == TDB_DEAD_MAGIC)
139#define TDB_BAD_MAGIC(r) ((r)->magic != TDB_MAGIC && !TDB_DEAD(r))
140#define TDB_HASH_TOP(hash) (FREELIST_TOP + (BUCKET(hash)+1)*sizeof(tdb_off_t))
141#define TDB_HASHTABLE_SIZE(tdb) ((tdb->header.hash_size+1)*sizeof(tdb_off_t))
142#define TDB_DATA_START(hash_size) TDB_HASH_TOP(hash_size-1)
143#define TDB_RECOVERY_HEAD offsetof(struct tdb_header, recovery_start)
144#define TDB_SEQNUM_OFS    offsetof(struct tdb_header, sequence_number)
145#define TDB_PAD_BYTE 0x42
146#define TDB_PAD_U32  0x42424242
147
148/* NB assumes there is a local variable called "tdb" that is the
149 * current context, also takes doubly-parenthesized print-style
150 * argument. */
151#define TDB_LOG(x) tdb->log.log_fn x
152
153/* lock offsets */
154#define GLOBAL_LOCK      0
155#define ACTIVE_LOCK      4
156#define TRANSACTION_LOCK 8
157
158/* free memory if the pointer is valid and zero the pointer */
159#ifndef SAFE_FREE
160#define SAFE_FREE(x) do { if ((x) != NULL) {free(x); (x)=NULL;} } while(0)
161#endif
162
163#define BUCKET(hash) ((hash) % tdb->header.hash_size)
164
165#define DOCONV() (tdb->flags & TDB_CONVERT)
166#define CONVERT(x) (DOCONV() ? tdb_convert(&x, sizeof(x)) : &x)
167
168
169/* the body of the database is made of one list_struct for the free space
170   plus a separate data list for each hash value */
171struct list_struct {
172	tdb_off_t next; /* offset of the next record in the list */
173	tdb_len_t rec_len; /* total byte length of record */
174	tdb_len_t key_len; /* byte length of key */
175	tdb_len_t data_len; /* byte length of data */
176	u32 full_hash; /* the full 32 bit hash of the key */
177	u32 magic;   /* try to catch errors */
178	/* the following union is implied:
179		union {
180			char record[rec_len];
181			struct {
182				char key[key_len];
183				char data[data_len];
184			}
185			u32 totalsize; (tailer)
186		}
187	*/
188};
189
190
191/* this is stored at the front of every database */
192struct tdb_header {
193	char magic_food[32]; /* for /etc/magic */
194	u32 version; /* version of the code */
195	u32 hash_size; /* number of hash entries */
196	tdb_off_t rwlocks; /* obsolete - kept to detect old formats */
197	tdb_off_t recovery_start; /* offset of transaction recovery region */
198	tdb_off_t sequence_number; /* used when TDB_SEQNUM is set */
199	tdb_off_t reserved[29];
200};
201
202struct tdb_lock_type {
203	int list;
204	u32 count;
205	u32 ltype;
206};
207
208struct tdb_traverse_lock {
209	struct tdb_traverse_lock *next;
210	u32 off;
211	u32 hash;
212	int lock_rw;
213};
214
215
216struct tdb_methods {
217	int (*tdb_read)(struct tdb_context *, tdb_off_t , void *, tdb_len_t , int );
218	int (*tdb_write)(struct tdb_context *, tdb_off_t, const void *, tdb_len_t);
219	void (*next_hash_chain)(struct tdb_context *, u32 *);
220	int (*tdb_oob)(struct tdb_context *, tdb_off_t , int );
221	int (*tdb_expand_file)(struct tdb_context *, tdb_off_t , tdb_off_t );
222	int (*tdb_brlock)(struct tdb_context *, tdb_off_t , int, int, int, size_t);
223};
224
225struct tdb_context {
226	char *name; /* the name of the database */
227	void *map_ptr; /* where it is currently mapped */
228	int fd; /* open file descriptor for the database */
229	tdb_len_t map_size; /* how much space has been mapped */
230	int read_only; /* opened read-only */
231	int traverse_read; /* read-only traversal */
232	struct tdb_lock_type global_lock;
233	int num_lockrecs;
234	struct tdb_lock_type *lockrecs; /* only real locks, all with count>0 */
235	enum TDB_ERROR ecode; /* error code for last tdb error */
236	struct tdb_header header; /* a cached copy of the header */
237	u32 flags; /* the flags passed to tdb_open */
238	struct tdb_traverse_lock travlocks; /* current traversal locks */
239	struct tdb_context *next; /* all tdbs to avoid multiple opens */
240	dev_t device;	/* uniquely identifies this tdb */
241	ino_t inode;	/* uniquely identifies this tdb */
242	struct tdb_logging_context log;
243	unsigned int (*hash_fn)(TDB_DATA *key);
244	int open_flags; /* flags used in the open - needed by reopen */
245	unsigned int num_locks; /* number of chain locks held */
246	const struct tdb_methods *methods;
247	struct tdb_transaction *transaction;
248	int page_size;
249	int max_dead_records;
250	bool have_transaction_lock;
251	tdb_len_t real_map_size; /* how much space has been mapped */
252};
253
254
255/*
256  internal prototypes
257*/
258static int tdb_munmap(struct tdb_context *tdb);
259static void tdb_mmap(struct tdb_context *tdb);
260static int tdb_lock(struct tdb_context *tdb, int list, int ltype);
261static int tdb_unlock(struct tdb_context *tdb, int list, int ltype);
262static int tdb_brlock(struct tdb_context *tdb, tdb_off_t offset, int rw_type, int lck_type, int probe, size_t len);
263static int tdb_transaction_lock(struct tdb_context *tdb, int ltype);
264static int tdb_transaction_unlock(struct tdb_context *tdb);
265static int tdb_brlock_upgrade(struct tdb_context *tdb, tdb_off_t offset, size_t len);
266static int tdb_write_lock_record(struct tdb_context *tdb, tdb_off_t off);
267static int tdb_write_unlock_record(struct tdb_context *tdb, tdb_off_t off);
268static int tdb_ofs_read(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
269static int tdb_ofs_write(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
270static void *tdb_convert(void *buf, u32 size);
271static int tdb_free(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec);
272static tdb_off_t tdb_allocate(struct tdb_context *tdb, tdb_len_t length, struct list_struct *rec);
273static int tdb_ofs_read(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
274static int tdb_ofs_write(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
275static int tdb_lock_record(struct tdb_context *tdb, tdb_off_t off);
276static int tdb_unlock_record(struct tdb_context *tdb, tdb_off_t off);
277static int tdb_rec_read(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec);
278static int tdb_rec_write(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec);
279static int tdb_do_delete(struct tdb_context *tdb, tdb_off_t rec_ptr, struct list_struct *rec);
280static unsigned char *tdb_alloc_read(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t len);
281static int tdb_parse_data(struct tdb_context *tdb, TDB_DATA key,
282		   tdb_off_t offset, tdb_len_t len,
283		   int (*parser)(TDB_DATA key, TDB_DATA data,
284				 void *private_data),
285		   void *private_data);
286static tdb_off_t tdb_find_lock_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash, int locktype,
287			   struct list_struct *rec);
288static void tdb_io_init(struct tdb_context *tdb);
289static int tdb_expand(struct tdb_context *tdb, tdb_off_t size);
290static int tdb_rec_free_read(struct tdb_context *tdb, tdb_off_t off,
291		      struct list_struct *rec);
292
293
294/* file: error.c */
295
296enum TDB_ERROR tdb_error(struct tdb_context *tdb)
297{
298	return tdb->ecode;
299}
300
301static struct tdb_errname {
302	enum TDB_ERROR ecode; const char *estring;
303} emap[] = { {TDB_SUCCESS, "Success"},
304	     {TDB_ERR_CORRUPT, "Corrupt database"},
305	     {TDB_ERR_IO, "IO Error"},
306	     {TDB_ERR_LOCK, "Locking error"},
307	     {TDB_ERR_OOM, "Out of memory"},
308	     {TDB_ERR_EXISTS, "Record exists"},
309	     {TDB_ERR_NOLOCK, "Lock exists on other keys"},
310	     {TDB_ERR_EINVAL, "Invalid parameter"},
311	     {TDB_ERR_NOEXIST, "Record does not exist"},
312	     {TDB_ERR_RDONLY, "write not permitted"} };
313
314/* Error string for the last tdb error */
315const char *tdb_errorstr(struct tdb_context *tdb)
316{
317	u32 i;
318	for (i = 0; i < sizeof(emap) / sizeof(struct tdb_errname); i++)
319		if (tdb->ecode == emap[i].ecode)
320			return emap[i].estring;
321	return "Invalid error code";
322}
323
324/* file: lock.c */
325
326#define TDB_MARK_LOCK 0x80000000
327
328/* a byte range locking function - return 0 on success
329   this functions locks/unlocks 1 byte at the specified offset.
330
331   On error, errno is also set so that errors are passed back properly
332   through tdb_open().
333
334   note that a len of zero means lock to end of file
335*/
336int tdb_brlock(struct tdb_context *tdb, tdb_off_t offset,
337	       int rw_type, int lck_type, int probe, size_t len)
338{
339	struct flock fl;
340	int ret;
341
342	if (tdb->flags & TDB_NOLOCK) {
343		return 0;
344	}
345
346	if ((rw_type == F_WRLCK) && (tdb->read_only || tdb->traverse_read)) {
347		tdb->ecode = TDB_ERR_RDONLY;
348		return -1;
349	}
350
351	fl.l_type = rw_type;
352	fl.l_whence = SEEK_SET;
353	fl.l_start = offset;
354	fl.l_len = len;
355	fl.l_pid = 0;
356
357	do {
358		ret = fcntl(tdb->fd,lck_type,&fl);
359	} while (ret == -1 && errno == EINTR);
360
361	if (ret == -1) {
362		/* Generic lock error. errno set by fcntl.
363		 * EAGAIN is an expected return from non-blocking
364		 * locks. */
365		if (!probe && lck_type != F_SETLK) {
366			/* Ensure error code is set for log fun to examine. */
367			tdb->ecode = TDB_ERR_LOCK;
368			TDB_LOG((tdb, TDB_DEBUG_TRACE,"tdb_brlock failed (fd=%d) at offset %d rw_type=%d lck_type=%d len=%d\n",
369				 tdb->fd, offset, rw_type, lck_type, (int)len));
370		}
371		return TDB_ERRCODE(TDB_ERR_LOCK, -1);
372	}
373	return 0;
374}
375
376
377/*
378  upgrade a read lock to a write lock. This needs to be handled in a
379  special way as some OSes (such as solaris) have too conservative
380  deadlock detection and claim a deadlock when progress can be
381  made. For those OSes we may loop for a while.
382*/
383int tdb_brlock_upgrade(struct tdb_context *tdb, tdb_off_t offset, size_t len)
384{
385	int count = 1000;
386	while (count--) {
387		struct timeval tv;
388		if (tdb_brlock(tdb, offset, F_WRLCK, F_SETLKW, 1, len) == 0) {
389			return 0;
390		}
391		if (errno != EDEADLK) {
392			break;
393		}
394		/* sleep for as short a time as we can - more portable than usleep() */
395		tv.tv_sec = 0;
396		tv.tv_usec = 1;
397		select(0, NULL, NULL, NULL, &tv);
398	}
399	TDB_LOG((tdb, TDB_DEBUG_TRACE,"tdb_brlock_upgrade failed at offset %d\n", offset));
400	return -1;
401}
402
403
404/* lock a list in the database. list -1 is the alloc list */
405static int _tdb_lock(struct tdb_context *tdb, int list, int ltype, int op)
406{
407	struct tdb_lock_type *new_lck;
408	int i;
409	bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
410
411	ltype &= ~TDB_MARK_LOCK;
412
413	/* a global lock allows us to avoid per chain locks */
414	if (tdb->global_lock.count &&
415	    (ltype == tdb->global_lock.ltype || ltype == F_RDLCK)) {
416		return 0;
417	}
418
419	if (tdb->global_lock.count) {
420		return TDB_ERRCODE(TDB_ERR_LOCK, -1);
421	}
422
423	if (list < -1 || list >= (int)tdb->header.hash_size) {
424		TDB_LOG((tdb, TDB_DEBUG_ERROR,"tdb_lock: invalid list %d for ltype=%d\n",
425			   list, ltype));
426		return -1;
427	}
428	if (tdb->flags & TDB_NOLOCK)
429		return 0;
430
431	for (i=0; i<tdb->num_lockrecs; i++) {
432		if (tdb->lockrecs[i].list == list) {
433			if (tdb->lockrecs[i].count == 0) {
434				/*
435				 * Can't happen, see tdb_unlock(). It should
436				 * be an assert.
437				 */
438				TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lock: "
439					 "lck->count == 0 for list %d", list));
440			}
441			/*
442			 * Just increment the in-memory struct, posix locks
443			 * don't stack.
444			 */
445			tdb->lockrecs[i].count++;
446			return 0;
447		}
448	}
449
450	new_lck = (struct tdb_lock_type *)realloc(
451		tdb->lockrecs,
452		sizeof(*tdb->lockrecs) * (tdb->num_lockrecs+1));
453	if (new_lck == NULL) {
454		errno = ENOMEM;
455		return -1;
456	}
457	tdb->lockrecs = new_lck;
458
459	/* Since fcntl locks don't nest, we do a lock for the first one,
460	   and simply bump the count for future ones */
461	if (!mark_lock &&
462	    tdb->methods->tdb_brlock(tdb,FREELIST_TOP+4*list, ltype, op,
463				     0, 1)) {
464		return -1;
465	}
466
467	tdb->num_locks++;
468
469	tdb->lockrecs[tdb->num_lockrecs].list = list;
470	tdb->lockrecs[tdb->num_lockrecs].count = 1;
471	tdb->lockrecs[tdb->num_lockrecs].ltype = ltype;
472	tdb->num_lockrecs += 1;
473
474	return 0;
475}
476
477/* lock a list in the database. list -1 is the alloc list */
478int tdb_lock(struct tdb_context *tdb, int list, int ltype)
479{
480	int ret;
481	ret = _tdb_lock(tdb, list, ltype, F_SETLKW);
482	if (ret) {
483		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lock failed on list %d "
484			 "ltype=%d (%s)\n",  list, ltype, strerror(errno)));
485	}
486	return ret;
487}
488
489/* lock a list in the database. list -1 is the alloc list. non-blocking lock */
490int tdb_lock_nonblock(struct tdb_context *tdb, int list, int ltype)
491{
492	return _tdb_lock(tdb, list, ltype, F_SETLK);
493}
494
495
496/* unlock the database: returns void because it's too late for errors. */
497	/* changed to return int it may be interesting to know there
498	   has been an error  --simo */
499int tdb_unlock(struct tdb_context *tdb, int list, int ltype)
500{
501	int ret = -1;
502	int i;
503	struct tdb_lock_type *lck = NULL;
504	bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
505
506	ltype &= ~TDB_MARK_LOCK;
507
508	/* a global lock allows us to avoid per chain locks */
509	if (tdb->global_lock.count &&
510	    (ltype == tdb->global_lock.ltype || ltype == F_RDLCK)) {
511		return 0;
512	}
513
514	if (tdb->global_lock.count) {
515		return TDB_ERRCODE(TDB_ERR_LOCK, -1);
516	}
517
518	if (tdb->flags & TDB_NOLOCK)
519		return 0;
520
521	/* Sanity checks */
522	if (list < -1 || list >= (int)tdb->header.hash_size) {
523		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: list %d invalid (%d)\n", list, tdb->header.hash_size));
524		return ret;
525	}
526
527	for (i=0; i<tdb->num_lockrecs; i++) {
528		if (tdb->lockrecs[i].list == list) {
529			lck = &tdb->lockrecs[i];
530			break;
531		}
532	}
533
534	if ((lck == NULL) || (lck->count == 0)) {
535		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: count is 0\n"));
536		return -1;
537	}
538
539	if (lck->count > 1) {
540		lck->count--;
541		return 0;
542	}
543
544	/*
545	 * This lock has count==1 left, so we need to unlock it in the
546	 * kernel. We don't bother with decrementing the in-memory array
547	 * element, we're about to overwrite it with the last array element
548	 * anyway.
549	 */
550
551	if (mark_lock) {
552		ret = 0;
553	} else {
554		ret = tdb->methods->tdb_brlock(tdb, FREELIST_TOP+4*list, F_UNLCK,
555					       F_SETLKW, 0, 1);
556	}
557	tdb->num_locks--;
558
559	/*
560	 * Shrink the array by overwriting the element just unlocked with the
561	 * last array element.
562	 */
563
564	if (tdb->num_lockrecs > 1) {
565		*lck = tdb->lockrecs[tdb->num_lockrecs-1];
566	}
567	tdb->num_lockrecs -= 1;
568
569	/*
570	 * We don't bother with realloc when the array shrinks, but if we have
571	 * a completely idle tdb we should get rid of the locked array.
572	 */
573
574	if (tdb->num_lockrecs == 0) {
575		SAFE_FREE(tdb->lockrecs);
576	}
577
578	if (ret)
579		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: An error occurred unlocking!\n"));
580	return ret;
581}
582
583/*
584  get the transaction lock
585 */
586int tdb_transaction_lock(struct tdb_context *tdb, int ltype)
587{
588	if (tdb->have_transaction_lock || tdb->global_lock.count) {
589		return 0;
590	}
591	if (tdb->methods->tdb_brlock(tdb, TRANSACTION_LOCK, ltype,
592				     F_SETLKW, 0, 1) == -1) {
593		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_lock: failed to get transaction lock\n"));
594		tdb->ecode = TDB_ERR_LOCK;
595		return -1;
596	}
597	tdb->have_transaction_lock = 1;
598	return 0;
599}
600
601/*
602  release the transaction lock
603 */
604int tdb_transaction_unlock(struct tdb_context *tdb)
605{
606	int ret;
607	if (!tdb->have_transaction_lock) {
608		return 0;
609	}
610	ret = tdb->methods->tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
611	if (ret == 0) {
612		tdb->have_transaction_lock = 0;
613	}
614	return ret;
615}
616
617
618
619
620/* lock/unlock entire database */
621static int _tdb_lockall(struct tdb_context *tdb, int ltype, int op)
622{
623	bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
624
625	ltype &= ~TDB_MARK_LOCK;
626
627	/* There are no locks on read-only dbs */
628	if (tdb->read_only || tdb->traverse_read)
629		return TDB_ERRCODE(TDB_ERR_LOCK, -1);
630
631	if (tdb->global_lock.count && tdb->global_lock.ltype == ltype) {
632		tdb->global_lock.count++;
633		return 0;
634	}
635
636	if (tdb->global_lock.count) {
637		/* a global lock of a different type exists */
638		return TDB_ERRCODE(TDB_ERR_LOCK, -1);
639	}
640
641	if (tdb->num_locks != 0) {
642		/* can't combine global and chain locks */
643		return TDB_ERRCODE(TDB_ERR_LOCK, -1);
644	}
645
646	if (!mark_lock &&
647	    tdb->methods->tdb_brlock(tdb, FREELIST_TOP, ltype, op,
648				     0, 4*tdb->header.hash_size)) {
649		if (op == F_SETLKW) {
650			TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lockall failed (%s)\n", strerror(errno)));
651		}
652		return -1;
653	}
654
655	tdb->global_lock.count = 1;
656	tdb->global_lock.ltype = ltype;
657
658	return 0;
659}
660
661
662
663/* unlock entire db */
664static int _tdb_unlockall(struct tdb_context *tdb, int ltype)
665{
666	bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
667
668	ltype &= ~TDB_MARK_LOCK;
669
670	/* There are no locks on read-only dbs */
671	if (tdb->read_only || tdb->traverse_read) {
672		return TDB_ERRCODE(TDB_ERR_LOCK, -1);
673	}
674
675	if (tdb->global_lock.ltype != ltype || tdb->global_lock.count == 0) {
676		return TDB_ERRCODE(TDB_ERR_LOCK, -1);
677	}
678
679	if (tdb->global_lock.count > 1) {
680		tdb->global_lock.count--;
681		return 0;
682	}
683
684	if (!mark_lock &&
685	    tdb->methods->tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW,
686				     0, 4*tdb->header.hash_size)) {
687		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlockall failed (%s)\n", strerror(errno)));
688		return -1;
689	}
690
691	tdb->global_lock.count = 0;
692	tdb->global_lock.ltype = 0;
693
694	return 0;
695}
696
697/* lock entire database with write lock */
698int tdb_lockall(struct tdb_context *tdb)
699{
700	return _tdb_lockall(tdb, F_WRLCK, F_SETLKW);
701}
702
703/* lock entire database with write lock - mark only */
704int tdb_lockall_mark(struct tdb_context *tdb)
705{
706	return _tdb_lockall(tdb, F_WRLCK | TDB_MARK_LOCK, F_SETLKW);
707}
708
709/* unlock entire database with write lock - unmark only */
710int tdb_lockall_unmark(struct tdb_context *tdb)
711{
712	return _tdb_unlockall(tdb, F_WRLCK | TDB_MARK_LOCK);
713}
714
715/* lock entire database with write lock - nonblocking varient */
716int tdb_lockall_nonblock(struct tdb_context *tdb)
717{
718	return _tdb_lockall(tdb, F_WRLCK, F_SETLK);
719}
720
721/* unlock entire database with write lock */
722int tdb_unlockall(struct tdb_context *tdb)
723{
724	return _tdb_unlockall(tdb, F_WRLCK);
725}
726
727/* lock entire database with read lock */
728int tdb_lockall_read(struct tdb_context *tdb)
729{
730	return _tdb_lockall(tdb, F_RDLCK, F_SETLKW);
731}
732
733/* lock entire database with read lock - nonblock varient */
734int tdb_lockall_read_nonblock(struct tdb_context *tdb)
735{
736	return _tdb_lockall(tdb, F_RDLCK, F_SETLK);
737}
738
739/* unlock entire database with read lock */
740int tdb_unlockall_read(struct tdb_context *tdb)
741{
742	return _tdb_unlockall(tdb, F_RDLCK);
743}
744
745/* lock/unlock one hash chain. This is meant to be used to reduce
746   contention - it cannot guarantee how many records will be locked */
747int tdb_chainlock(struct tdb_context *tdb, TDB_DATA key)
748{
749	return tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
750}
751
752/* lock/unlock one hash chain, non-blocking. This is meant to be used
753   to reduce contention - it cannot guarantee how many records will be
754   locked */
755int tdb_chainlock_nonblock(struct tdb_context *tdb, TDB_DATA key)
756{
757	return tdb_lock_nonblock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
758}
759
760/* mark a chain as locked without actually locking it. Warning! use with great caution! */
761int tdb_chainlock_mark(struct tdb_context *tdb, TDB_DATA key)
762{
763	return tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK | TDB_MARK_LOCK);
764}
765
766/* unmark a chain as locked without actually locking it. Warning! use with great caution! */
767int tdb_chainlock_unmark(struct tdb_context *tdb, TDB_DATA key)
768{
769	return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK | TDB_MARK_LOCK);
770}
771
772int tdb_chainunlock(struct tdb_context *tdb, TDB_DATA key)
773{
774	return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
775}
776
777int tdb_chainlock_read(struct tdb_context *tdb, TDB_DATA key)
778{
779	return tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_RDLCK);
780}
781
782int tdb_chainunlock_read(struct tdb_context *tdb, TDB_DATA key)
783{
784	return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_RDLCK);
785}
786
787
788
789/* record lock stops delete underneath */
790int tdb_lock_record(struct tdb_context *tdb, tdb_off_t off)
791{
792	return off ? tdb->methods->tdb_brlock(tdb, off, F_RDLCK, F_SETLKW, 0, 1) : 0;
793}
794
795/*
796  Write locks override our own fcntl readlocks, so check it here.
797  Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
798  an error to fail to get the lock here.
799*/
800int tdb_write_lock_record(struct tdb_context *tdb, tdb_off_t off)
801{
802	struct tdb_traverse_lock *i;
803	for (i = &tdb->travlocks; i; i = i->next)
804		if (i->off == off)
805			return -1;
806	return tdb->methods->tdb_brlock(tdb, off, F_WRLCK, F_SETLK, 1, 1);
807}
808
809/*
810  Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
811  an error to fail to get the lock here.
812*/
813int tdb_write_unlock_record(struct tdb_context *tdb, tdb_off_t off)
814{
815	return tdb->methods->tdb_brlock(tdb, off, F_UNLCK, F_SETLK, 0, 1);
816}
817
818/* fcntl locks don't stack: avoid unlocking someone else's */
819int tdb_unlock_record(struct tdb_context *tdb, tdb_off_t off)
820{
821	struct tdb_traverse_lock *i;
822	u32 count = 0;
823
824	if (off == 0)
825		return 0;
826	for (i = &tdb->travlocks; i; i = i->next)
827		if (i->off == off)
828			count++;
829	return (count == 1 ? tdb->methods->tdb_brlock(tdb, off, F_UNLCK, F_SETLKW, 0, 1) : 0);
830}
831
832/* file: io.c */
833
834/* check for an out of bounds access - if it is out of bounds then
835   see if the database has been expanded by someone else and expand
836   if necessary
837   note that "len" is the minimum length needed for the db
838*/
839static int tdb_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
840{
841	struct stat st;
842	if (len <= tdb->map_size)
843		return 0;
844	if (tdb->flags & TDB_INTERNAL) {
845		if (!probe) {
846			/* Ensure ecode is set for log fn. */
847			tdb->ecode = TDB_ERR_IO;
848			TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_oob len %d beyond internal malloc size %d\n",
849				 (int)len, (int)tdb->map_size));
850		}
851		return TDB_ERRCODE(TDB_ERR_IO, -1);
852	}
853
854	if (fstat(tdb->fd, &st) == -1) {
855		return TDB_ERRCODE(TDB_ERR_IO, -1);
856	}
857
858	if (st.st_size < (size_t)len) {
859		if (!probe) {
860			/* Ensure ecode is set for log fn. */
861			tdb->ecode = TDB_ERR_IO;
862			TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_oob len %d beyond eof at %d\n",
863				 (int)len, (int)st.st_size));
864		}
865		return TDB_ERRCODE(TDB_ERR_IO, -1);
866	}
867
868	/* Unmap, update size, remap */
869	if (tdb_munmap(tdb) == -1)
870		return TDB_ERRCODE(TDB_ERR_IO, -1);
871	tdb->map_size = st.st_size;
872	tdb_mmap(tdb);
873	return 0;
874}
875
876/* write a lump of data at a specified offset */
877static int tdb_write(struct tdb_context *tdb, tdb_off_t off,
878		     const void *buf, tdb_len_t len)
879{
880	if (len == 0) {
881		return 0;
882	}
883
884	if (tdb->read_only || tdb->traverse_read) {
885		tdb->ecode = TDB_ERR_RDONLY;
886		return -1;
887	}
888
889	if (tdb->methods->tdb_oob(tdb, off + len, 0) != 0)
890		return -1;
891
892	if (tdb->map_ptr) {
893		memcpy(off + (char *)tdb->map_ptr, buf, len);
894	} else if (pwrite(tdb->fd, buf, len, off) != (ssize_t)len) {
895		/* Ensure ecode is set for log fn. */
896		tdb->ecode = TDB_ERR_IO;
897		TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_write failed at %d len=%d (%s)\n",
898			   off, len, strerror(errno)));
899		return TDB_ERRCODE(TDB_ERR_IO, -1);
900	}
901	return 0;
902}
903
904/* Endian conversion: we only ever deal with 4 byte quantities */
905void *tdb_convert(void *buf, u32 size)
906{
907	u32 i, *p = (u32 *)buf;
908	for (i = 0; i < size / 4; i++)
909		p[i] = TDB_BYTEREV(p[i]);
910	return buf;
911}
912
913
914/* read a lump of data at a specified offset, maybe convert */
915static int tdb_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
916		    tdb_len_t len, int cv)
917{
918	if (tdb->methods->tdb_oob(tdb, off + len, 0) != 0) {
919		return -1;
920	}
921
922	if (tdb->map_ptr) {
923		memcpy(buf, off + (char *)tdb->map_ptr, len);
924	} else {
925		ssize_t ret = pread(tdb->fd, buf, len, off);
926		if (ret != (ssize_t)len) {
927			/* Ensure ecode is set for log fn. */
928			tdb->ecode = TDB_ERR_IO;
929			TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_read failed at %d "
930				 "len=%d ret=%d (%s) map_size=%d\n",
931				 (int)off, (int)len, (int)ret, strerror(errno),
932				 (int)tdb->map_size));
933			return TDB_ERRCODE(TDB_ERR_IO, -1);
934		}
935	}
936	if (cv) {
937		tdb_convert(buf, len);
938	}
939	return 0;
940}
941
942
943
944/*
945  do an unlocked scan of the hash table heads to find the next non-zero head. The value
946  will then be confirmed with the lock held
947*/
948static void tdb_next_hash_chain(struct tdb_context *tdb, u32 *chain)
949{
950	u32 h = *chain;
951	if (tdb->map_ptr) {
952		for (;h < tdb->header.hash_size;h++) {
953			if (0 != *(u32 *)(TDB_HASH_TOP(h) + (unsigned char *)tdb->map_ptr)) {
954				break;
955			}
956		}
957	} else {
958		u32 off=0;
959		for (;h < tdb->header.hash_size;h++) {
960			if (tdb_ofs_read(tdb, TDB_HASH_TOP(h), &off) != 0 || off != 0) {
961				break;
962			}
963		}
964	}
965	(*chain) = h;
966}
967
968
969int tdb_munmap(struct tdb_context *tdb)
970{
971	if (tdb->flags & TDB_INTERNAL)
972		return 0;
973
974#ifdef HAVE_MMAP
975	if (tdb->map_ptr) {
976		int ret = munmap(tdb->map_ptr, tdb->real_map_size);
977		if (ret != 0)
978			return ret;
979		tdb->real_map_size = 0;
980	}
981#endif
982	tdb->map_ptr = NULL;
983	return 0;
984}
985
986void tdb_mmap(struct tdb_context *tdb)
987{
988	if (tdb->flags & TDB_INTERNAL)
989		return;
990
991#ifdef HAVE_MMAP
992	if (!(tdb->flags & TDB_NOMMAP)) {
993		tdb->map_ptr = mmap(NULL, tdb->map_size,
994				    PROT_READ|(tdb->read_only? 0:PROT_WRITE),
995				    MAP_SHARED|MAP_FILE, tdb->fd, 0);
996
997		/*
998		 * NB. When mmap fails it returns MAP_FAILED *NOT* NULL !!!!
999		 */
1000
1001		if (tdb->map_ptr == MAP_FAILED) {
1002			tdb->real_map_size = 0;
1003			tdb->map_ptr = NULL;
1004			TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_mmap failed for size %d (%s)\n",
1005				 tdb->map_size, strerror(errno)));
1006		}
1007		tdb->real_map_size = tdb->map_size;
1008	} else {
1009		tdb->map_ptr = NULL;
1010	}
1011#else
1012	tdb->map_ptr = NULL;
1013#endif
1014}
1015
1016/* expand a file.  we prefer to use ftruncate, as that is what posix
1017  says to use for mmap expansion */
1018static int tdb_expand_file(struct tdb_context *tdb, tdb_off_t size, tdb_off_t addition)
1019{
1020	char buf[1024];
1021
1022	if (tdb->read_only || tdb->traverse_read) {
1023		tdb->ecode = TDB_ERR_RDONLY;
1024		return -1;
1025	}
1026
1027	if (ftruncate(tdb->fd, size+addition) == -1) {
1028		char b = 0;
1029		if (pwrite(tdb->fd,  &b, 1, (size+addition) - 1) != 1) {
1030			TDB_LOG((tdb, TDB_DEBUG_FATAL, "expand_file to %d failed (%s)\n",
1031				 size+addition, strerror(errno)));
1032			return -1;
1033		}
1034	}
1035
1036	/* now fill the file with something. This ensures that the
1037	   file isn't sparse, which would be very bad if we ran out of
1038	   disk. This must be done with write, not via mmap */
1039	memset(buf, TDB_PAD_BYTE, sizeof(buf));
1040	while (addition) {
1041		int n = addition>sizeof(buf)?sizeof(buf):addition;
1042		int ret = pwrite(tdb->fd, buf, n, size);
1043		if (ret != n) {
1044			TDB_LOG((tdb, TDB_DEBUG_FATAL, "expand_file write of %d failed (%s)\n",
1045				   n, strerror(errno)));
1046			return -1;
1047		}
1048		addition -= n;
1049		size += n;
1050	}
1051	return 0;
1052}
1053
1054
1055/* expand the database at least size bytes by expanding the underlying
1056   file and doing the mmap again if necessary */
1057int tdb_expand(struct tdb_context *tdb, tdb_off_t size)
1058{
1059	struct list_struct rec;
1060	tdb_off_t offset;
1061
1062	if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
1063		TDB_LOG((tdb, TDB_DEBUG_ERROR, "lock failed in tdb_expand\n"));
1064		return -1;
1065	}
1066
1067	/* must know about any previous expansions by another process */
1068	tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
1069
1070	/* always make room for at least 10 more records, and round
1071           the database up to a multiple of the page size */
1072	size = TDB_ALIGN(tdb->map_size + size*10, tdb->page_size) - tdb->map_size;
1073
1074	if (!(tdb->flags & TDB_INTERNAL))
1075		tdb_munmap(tdb);
1076
1077	/*
1078	 * We must ensure the file is unmapped before doing this
1079	 * to ensure consistency with systems like OpenBSD where
1080	 * writes and mmaps are not consistent.
1081	 */
1082
1083	/* expand the file itself */
1084	if (!(tdb->flags & TDB_INTERNAL)) {
1085		if (tdb->methods->tdb_expand_file(tdb, tdb->map_size, size) != 0)
1086			goto fail;
1087	}
1088
1089	tdb->map_size += size;
1090
1091	if (tdb->flags & TDB_INTERNAL) {
1092		char *new_map_ptr = (char *)realloc(tdb->map_ptr,
1093						    tdb->map_size);
1094		if (!new_map_ptr) {
1095			tdb->map_size -= size;
1096			goto fail;
1097		}
1098		tdb->map_ptr = new_map_ptr;
1099	} else {
1100		/*
1101		 * We must ensure the file is remapped before adding the space
1102		 * to ensure consistency with systems like OpenBSD where
1103		 * writes and mmaps are not consistent.
1104		 */
1105
1106		/* We're ok if the mmap fails as we'll fallback to read/write */
1107		tdb_mmap(tdb);
1108	}
1109
1110	/* form a new freelist record */
1111	memset(&rec,'\0',sizeof(rec));
1112	rec.rec_len = size - sizeof(rec);
1113
1114	/* link it into the free list */
1115	offset = tdb->map_size - size;
1116	if (tdb_free(tdb, offset, &rec) == -1)
1117		goto fail;
1118
1119	tdb_unlock(tdb, -1, F_WRLCK);
1120	return 0;
1121 fail:
1122	tdb_unlock(tdb, -1, F_WRLCK);
1123	return -1;
1124}
1125
1126/* read/write a tdb_off_t */
1127int tdb_ofs_read(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d)
1128{
1129	return tdb->methods->tdb_read(tdb, offset, (char*)d, sizeof(*d), DOCONV());
1130}
1131
1132int tdb_ofs_write(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d)
1133{
1134	tdb_off_t off = *d;
1135	return tdb->methods->tdb_write(tdb, offset, CONVERT(off), sizeof(*d));
1136}
1137
1138
1139/* read a lump of data, allocating the space for it */
1140unsigned char *tdb_alloc_read(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t len)
1141{
1142	unsigned char *buf;
1143
1144	/* some systems don't like zero length malloc */
1145	if (len == 0) {
1146		len = 1;
1147	}
1148
1149	if (!(buf = (unsigned char *)malloc(len))) {
1150		/* Ensure ecode is set for log fn. */
1151		tdb->ecode = TDB_ERR_OOM;
1152		TDB_LOG((tdb, TDB_DEBUG_ERROR,"tdb_alloc_read malloc failed len=%d (%s)\n",
1153			   len, strerror(errno)));
1154		return TDB_ERRCODE(TDB_ERR_OOM, buf);
1155	}
1156	if (tdb->methods->tdb_read(tdb, offset, buf, len, 0) == -1) {
1157		SAFE_FREE(buf);
1158		return NULL;
1159	}
1160	return buf;
1161}
1162
1163/* Give a piece of tdb data to a parser */
1164
1165int tdb_parse_data(struct tdb_context *tdb, TDB_DATA key,
1166		   tdb_off_t offset, tdb_len_t len,
1167		   int (*parser)(TDB_DATA key, TDB_DATA data,
1168				 void *private_data),
1169		   void *private_data)
1170{
1171	TDB_DATA data;
1172	int result;
1173
1174	data.dsize = len;
1175
1176	if ((tdb->transaction == NULL) && (tdb->map_ptr != NULL)) {
1177		/*
1178		 * Optimize by avoiding the malloc/memcpy/free, point the
1179		 * parser directly at the mmap area.
1180		 */
1181		if (tdb->methods->tdb_oob(tdb, offset+len, 0) != 0) {
1182			return -1;
1183		}
1184		data.dptr = offset + (unsigned char *)tdb->map_ptr;
1185		return parser(key, data, private_data);
1186	}
1187
1188	if (!(data.dptr = tdb_alloc_read(tdb, offset, len))) {
1189		return -1;
1190	}
1191
1192	result = parser(key, data, private_data);
1193	free(data.dptr);
1194	return result;
1195}
1196
1197/* read/write a record */
1198int tdb_rec_read(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec)
1199{
1200	if (tdb->methods->tdb_read(tdb, offset, rec, sizeof(*rec),DOCONV()) == -1)
1201		return -1;
1202	if (TDB_BAD_MAGIC(rec)) {
1203		/* Ensure ecode is set for log fn. */
1204		tdb->ecode = TDB_ERR_CORRUPT;
1205		TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_rec_read bad magic 0x%x at offset=%d\n", rec->magic, offset));
1206		return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
1207	}
1208	return tdb->methods->tdb_oob(tdb, rec->next+sizeof(*rec), 0);
1209}
1210
1211int tdb_rec_write(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec)
1212{
1213	struct list_struct r = *rec;
1214	return tdb->methods->tdb_write(tdb, offset, CONVERT(r), sizeof(r));
1215}
1216
1217static const struct tdb_methods io_methods = {
1218	tdb_read,
1219	tdb_write,
1220	tdb_next_hash_chain,
1221	tdb_oob,
1222	tdb_expand_file,
1223	tdb_brlock
1224};
1225
1226/*
1227  initialise the default methods table
1228*/
1229void tdb_io_init(struct tdb_context *tdb)
1230{
1231	tdb->methods = &io_methods;
1232}
1233
1234/* file: transaction.c */
1235
1236/*
1237  transaction design:
1238
1239  - only allow a single transaction at a time per database. This makes
1240    using the transaction API simpler, as otherwise the caller would
1241    have to cope with temporary failures in transactions that conflict
1242    with other current transactions
1243
1244  - keep the transaction recovery information in the same file as the
1245    database, using a special 'transaction recovery' record pointed at
1246    by the header. This removes the need for extra journal files as
1247    used by some other databases
1248
1249  - dynamically allocated the transaction recover record, re-using it
1250    for subsequent transactions. If a larger record is needed then
1251    tdb_free() the old record to place it on the normal tdb freelist
1252    before allocating the new record
1253
1254  - during transactions, keep a linked list of writes all that have
1255    been performed by intercepting all tdb_write() calls. The hooked
1256    transaction versions of tdb_read() and tdb_write() check this
1257    linked list and try to use the elements of the list in preference
1258    to the real database.
1259
1260  - don't allow any locks to be held when a transaction starts,
1261    otherwise we can end up with deadlock (plus lack of lock nesting
1262    in posix locks would mean the lock is lost)
1263
1264  - if the caller gains a lock during the transaction but doesn't
1265    release it then fail the commit
1266
1267  - allow for nested calls to tdb_transaction_start(), re-using the
1268    existing transaction record. If the inner transaction is cancelled
1269    then a subsequent commit will fail
1270
1271  - keep a mirrored copy of the tdb hash chain heads to allow for the
1272    fast hash heads scan on traverse, updating the mirrored copy in
1273    the transaction version of tdb_write
1274
1275  - allow callers to mix transaction and non-transaction use of tdb,
1276    although once a transaction is started then an exclusive lock is
1277    gained until the transaction is committed or cancelled
1278
1279  - the commit stategy involves first saving away all modified data
1280    into a linearised buffer in the transaction recovery area, then
1281    marking the transaction recovery area with a magic value to
1282    indicate a valid recovery record. In total 4 fsync/msync calls are
1283    needed per commit to prevent race conditions. It might be possible
1284    to reduce this to 3 or even 2 with some more work.
1285
1286  - check for a valid recovery record on open of the tdb, while the
1287    global lock is held. Automatically recover from the transaction
1288    recovery area if needed, then continue with the open as
1289    usual. This allows for smooth crash recovery with no administrator
1290    intervention.
1291
1292  - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
1293    still available, but no transaction recovery area is used and no
1294    fsync/msync calls are made.
1295
1296*/
1297
1298struct tdb_transaction_el {
1299	struct tdb_transaction_el *next, *prev;
1300	tdb_off_t offset;
1301	tdb_len_t length;
1302	unsigned char *data;
1303};
1304
1305/*
1306  hold the context of any current transaction
1307*/
1308struct tdb_transaction {
1309	/* we keep a mirrored copy of the tdb hash heads here so
1310	   tdb_next_hash_chain() can operate efficiently */
1311	u32 *hash_heads;
1312
1313	/* the original io methods - used to do IOs to the real db */
1314	const struct tdb_methods *io_methods;
1315
1316	/* the list of transaction elements. We use a doubly linked
1317	   list with a last pointer to allow us to keep the list
1318	   ordered, with first element at the front of the list. It
1319	   needs to be doubly linked as the read/write traversals need
1320	   to be backwards, while the commit needs to be forwards */
1321	struct tdb_transaction_el *elements, *elements_last;
1322
1323	/* non-zero when an internal transaction error has
1324	   occurred. All write operations will then fail until the
1325	   transaction is ended */
1326	int transaction_error;
1327
1328	/* when inside a transaction we need to keep track of any
1329	   nested tdb_transaction_start() calls, as these are allowed,
1330	   but don't create a new transaction */
1331	int nesting;
1332
1333	/* old file size before transaction */
1334	tdb_len_t old_map_size;
1335};
1336
1337
1338/*
1339  read while in a transaction. We need to check first if the data is in our list
1340  of transaction elements, then if not do a real read
1341*/
1342static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
1343			    tdb_len_t len, int cv)
1344{
1345	struct tdb_transaction_el *el;
1346
1347	/* we need to walk the list backwards to get the most recent data */
1348	for (el=tdb->transaction->elements_last;el;el=el->prev) {
1349		tdb_len_t partial;
1350
1351		if (off+len <= el->offset) {
1352			continue;
1353		}
1354		if (off >= el->offset + el->length) {
1355			continue;
1356		}
1357
1358		/* an overlapping read - needs to be split into up to
1359		   2 reads and a memcpy */
1360		if (off < el->offset) {
1361			partial = el->offset - off;
1362			if (transaction_read(tdb, off, buf, partial, cv) != 0) {
1363				goto fail;
1364			}
1365			len -= partial;
1366			off += partial;
1367			buf = (void *)(partial + (char *)buf);
1368		}
1369		if (off + len <= el->offset + el->length) {
1370			partial = len;
1371		} else {
1372			partial = el->offset + el->length - off;
1373		}
1374		memcpy(buf, el->data + (off - el->offset), partial);
1375		if (cv) {
1376			tdb_convert(buf, len);
1377		}
1378		len -= partial;
1379		off += partial;
1380		buf = (void *)(partial + (char *)buf);
1381
1382		if (len != 0 && transaction_read(tdb, off, buf, len, cv) != 0) {
1383			goto fail;
1384		}
1385
1386		return 0;
1387	}
1388
1389	/* its not in the transaction elements - do a real read */
1390	return tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv);
1391
1392fail:
1393	TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%d len=%d\n", off, len));
1394	tdb->ecode = TDB_ERR_IO;
1395	tdb->transaction->transaction_error = 1;
1396	return -1;
1397}
1398
1399
1400/*
1401  write while in a transaction
1402*/
1403static int transaction_write(struct tdb_context *tdb, tdb_off_t off,
1404			     const void *buf, tdb_len_t len)
1405{
1406	struct tdb_transaction_el *el, *best_el=NULL;
1407
1408	if (len == 0) {
1409		return 0;
1410	}
1411
1412	/* if the write is to a hash head, then update the transaction
1413	   hash heads */
1414	if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
1415	    off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
1416		u32 chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
1417		memcpy(&tdb->transaction->hash_heads[chain], buf, len);
1418	}
1419
1420	/* first see if we can replace an existing entry */
1421	for (el=tdb->transaction->elements_last;el;el=el->prev) {
1422		tdb_len_t partial;
1423
1424		if (best_el == NULL && off == el->offset+el->length) {
1425			best_el = el;
1426		}
1427
1428		if (off+len <= el->offset) {
1429			continue;
1430		}
1431		if (off >= el->offset + el->length) {
1432			continue;
1433		}
1434
1435		/* an overlapping write - needs to be split into up to
1436		   2 writes and a memcpy */
1437		if (off < el->offset) {
1438			partial = el->offset - off;
1439			if (transaction_write(tdb, off, buf, partial) != 0) {
1440				goto fail;
1441			}
1442			len -= partial;
1443			off += partial;
1444			buf = (const void *)(partial + (const char *)buf);
1445		}
1446		if (off + len <= el->offset + el->length) {
1447			partial = len;
1448		} else {
1449			partial = el->offset + el->length - off;
1450		}
1451		memcpy(el->data + (off - el->offset), buf, partial);
1452		len -= partial;
1453		off += partial;
1454		buf = (const void *)(partial + (const char *)buf);
1455
1456		if (len != 0 && transaction_write(tdb, off, buf, len) != 0) {
1457			goto fail;
1458		}
1459
1460		return 0;
1461	}
1462
1463	/* see if we can append the new entry to an existing entry */
1464	if (best_el && best_el->offset + best_el->length == off &&
1465	    (off+len < tdb->transaction->old_map_size ||
1466	     off > tdb->transaction->old_map_size)) {
1467		unsigned char *data = best_el->data;
1468		el = best_el;
1469		el->data = (unsigned char *)realloc(el->data,
1470						    el->length + len);
1471		if (el->data == NULL) {
1472			tdb->ecode = TDB_ERR_OOM;
1473			tdb->transaction->transaction_error = 1;
1474			el->data = data;
1475			return -1;
1476		}
1477		if (buf) {
1478			memcpy(el->data + el->length, buf, len);
1479		} else {
1480			memset(el->data + el->length, TDB_PAD_BYTE, len);
1481		}
1482		el->length += len;
1483		return 0;
1484	}
1485
1486	/* add a new entry at the end of the list */
1487	el = (struct tdb_transaction_el *)malloc(sizeof(*el));
1488	if (el == NULL) {
1489		tdb->ecode = TDB_ERR_OOM;
1490		tdb->transaction->transaction_error = 1;
1491		return -1;
1492	}
1493	el->next = NULL;
1494	el->prev = tdb->transaction->elements_last;
1495	el->offset = off;
1496	el->length = len;
1497	el->data = (unsigned char *)malloc(len);
1498	if (el->data == NULL) {
1499		free(el);
1500		tdb->ecode = TDB_ERR_OOM;
1501		tdb->transaction->transaction_error = 1;
1502		return -1;
1503	}
1504	if (buf) {
1505		memcpy(el->data, buf, len);
1506	} else {
1507		memset(el->data, TDB_PAD_BYTE, len);
1508	}
1509	if (el->prev) {
1510		el->prev->next = el;
1511	} else {
1512		tdb->transaction->elements = el;
1513	}
1514	tdb->transaction->elements_last = el;
1515	return 0;
1516
1517fail:
1518	TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n", off, len));
1519	tdb->ecode = TDB_ERR_IO;
1520	tdb->transaction->transaction_error = 1;
1521	return -1;
1522}
1523
1524/*
1525  accelerated hash chain head search, using the cached hash heads
1526*/
1527static void transaction_next_hash_chain(struct tdb_context *tdb, u32 *chain)
1528{
1529	u32 h = *chain;
1530	for (;h < tdb->header.hash_size;h++) {
1531		/* the +1 takes account of the freelist */
1532		if (0 != tdb->transaction->hash_heads[h+1]) {
1533			break;
1534		}
1535	}
1536	(*chain) = h;
1537}
1538
1539/*
1540  out of bounds check during a transaction
1541*/
1542static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
1543{
1544	if (len <= tdb->map_size) {
1545		return 0;
1546	}
1547	return TDB_ERRCODE(TDB_ERR_IO, -1);
1548}
1549
1550/*
1551  transaction version of tdb_expand().
1552*/
1553static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size,
1554				   tdb_off_t addition)
1555{
1556	/* add a write to the transaction elements, so subsequent
1557	   reads see the zero data */
1558	if (transaction_write(tdb, size, NULL, addition) != 0) {
1559		return -1;
1560	}
1561
1562	return 0;
1563}
1564
1565/*
1566  brlock during a transaction - ignore them
1567*/
1568static int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset,
1569			      int rw_type, int lck_type, int probe, size_t len)
1570{
1571	return 0;
1572}
1573
1574static const struct tdb_methods transaction_methods = {
1575	transaction_read,
1576	transaction_write,
1577	transaction_next_hash_chain,
1578	transaction_oob,
1579	transaction_expand_file,
1580	transaction_brlock
1581};
1582
1583
1584/*
1585  start a tdb transaction. No token is returned, as only a single
1586  transaction is allowed to be pending per tdb_context
1587*/
1588int tdb_transaction_start(struct tdb_context *tdb)
1589{
1590	/* some sanity checks */
1591	if (tdb->read_only || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) {
1592		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
1593		tdb->ecode = TDB_ERR_EINVAL;
1594		return -1;
1595	}
1596
1597	/* cope with nested tdb_transaction_start() calls */
1598	if (tdb->transaction != NULL) {
1599		tdb->transaction->nesting++;
1600		TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n",
1601			 tdb->transaction->nesting));
1602		return 0;
1603	}
1604
1605	if (tdb->num_locks != 0 || tdb->global_lock.count) {
1606		/* the caller must not have any locks when starting a
1607		   transaction as otherwise we'll be screwed by lack
1608		   of nested locks in posix */
1609		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n"));
1610		tdb->ecode = TDB_ERR_LOCK;
1611		return -1;
1612	}
1613
1614	if (tdb->travlocks.next != NULL) {
1615		/* you cannot use transactions inside a traverse (although you can use
1616		   traverse inside a transaction) as otherwise you can end up with
1617		   deadlock */
1618		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
1619		tdb->ecode = TDB_ERR_LOCK;
1620		return -1;
1621	}
1622
1623	tdb->transaction = (struct tdb_transaction *)
1624		calloc(sizeof(struct tdb_transaction), 1);
1625	if (tdb->transaction == NULL) {
1626		tdb->ecode = TDB_ERR_OOM;
1627		return -1;
1628	}
1629
1630	/* get the transaction write lock. This is a blocking lock. As
1631	   discussed with Volker, there are a number of ways we could
1632	   make this async, which we will probably do in the future */
1633	if (tdb_transaction_lock(tdb, F_WRLCK) == -1) {
1634		SAFE_FREE(tdb->transaction);
1635		return -1;
1636	}
1637
1638	/* get a read lock from the freelist to the end of file. This
1639	   is upgraded to a write lock during the commit */
1640	if (tdb_brlock(tdb, FREELIST_TOP, F_RDLCK, F_SETLKW, 0, 0) == -1) {
1641		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
1642		tdb->ecode = TDB_ERR_LOCK;
1643		goto fail;
1644	}
1645
1646	/* setup a copy of the hash table heads so the hash scan in
1647	   traverse can be fast */
1648	tdb->transaction->hash_heads = (u32 *)
1649		calloc(tdb->header.hash_size+1, sizeof(u32));
1650	if (tdb->transaction->hash_heads == NULL) {
1651		tdb->ecode = TDB_ERR_OOM;
1652		goto fail;
1653	}
1654	if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
1655				   TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
1656		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n"));
1657		tdb->ecode = TDB_ERR_IO;
1658		goto fail;
1659	}
1660
1661	/* make sure we know about any file expansions already done by
1662	   anyone else */
1663	tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
1664	tdb->transaction->old_map_size = tdb->map_size;
1665
1666	/* finally hook the io methods, replacing them with
1667	   transaction specific methods */
1668	tdb->transaction->io_methods = tdb->methods;
1669	tdb->methods = &transaction_methods;
1670
1671	/* by calling this transaction write here, we ensure that we don't grow the
1672	   transaction linked list due to hash table updates */
1673	if (transaction_write(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
1674			      TDB_HASHTABLE_SIZE(tdb)) != 0) {
1675		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to prime hash table\n"));
1676		tdb->ecode = TDB_ERR_IO;
1677		tdb->methods = tdb->transaction->io_methods;
1678		goto fail;
1679	}
1680
1681	return 0;
1682
1683fail:
1684	tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
1685	tdb_transaction_unlock(tdb);
1686	SAFE_FREE(tdb->transaction->hash_heads);
1687	SAFE_FREE(tdb->transaction);
1688	return -1;
1689}
1690
1691
1692/*
1693  cancel the current transaction
1694*/
1695int tdb_transaction_cancel(struct tdb_context *tdb)
1696{
1697	if (tdb->transaction == NULL) {
1698		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
1699		return -1;
1700	}
1701
1702	if (tdb->transaction->nesting != 0) {
1703		tdb->transaction->transaction_error = 1;
1704		tdb->transaction->nesting--;
1705		return 0;
1706	}
1707
1708	tdb->map_size = tdb->transaction->old_map_size;
1709
1710	/* free all the transaction elements */
1711	while (tdb->transaction->elements) {
1712		struct tdb_transaction_el *el = tdb->transaction->elements;
1713		tdb->transaction->elements = el->next;
1714		free(el->data);
1715		free(el);
1716	}
1717
1718	/* remove any global lock created during the transaction */
1719	if (tdb->global_lock.count != 0) {
1720		tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 4*tdb->header.hash_size);
1721		tdb->global_lock.count = 0;
1722	}
1723
1724	/* remove any locks created during the transaction */
1725	if (tdb->num_locks != 0) {
1726		int i;
1727		for (i=0;i<tdb->num_lockrecs;i++) {
1728			tdb_brlock(tdb,FREELIST_TOP+4*tdb->lockrecs[i].list,
1729				   F_UNLCK,F_SETLKW, 0, 1);
1730		}
1731		tdb->num_locks = 0;
1732		tdb->num_lockrecs = 0;
1733		SAFE_FREE(tdb->lockrecs);
1734	}
1735
1736	/* restore the normal io methods */
1737	tdb->methods = tdb->transaction->io_methods;
1738
1739	tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
1740	tdb_transaction_unlock(tdb);
1741	SAFE_FREE(tdb->transaction->hash_heads);
1742	SAFE_FREE(tdb->transaction);
1743
1744	return 0;
1745}
1746
1747/*
1748  sync to disk
1749*/
1750static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
1751{
1752	if (fsync(tdb->fd) != 0) {
1753		tdb->ecode = TDB_ERR_IO;
1754		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
1755		return -1;
1756	}
1757#if defined(HAVE_MSYNC) && defined(MS_SYNC)
1758	if (tdb->map_ptr) {
1759		tdb_off_t moffset = offset & ~(tdb->page_size-1);
1760		if (msync(moffset + (char *)tdb->map_ptr,
1761			  length + (offset - moffset), MS_SYNC) != 0) {
1762			tdb->ecode = TDB_ERR_IO;
1763			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
1764				 strerror(errno)));
1765			return -1;
1766		}
1767	}
1768#endif
1769	return 0;
1770}
1771
1772
1773/*
1774  work out how much space the linearised recovery data will consume
1775*/
1776static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
1777{
1778	struct tdb_transaction_el *el;
1779	tdb_len_t recovery_size = 0;
1780
1781	recovery_size = sizeof(u32);
1782	for (el=tdb->transaction->elements;el;el=el->next) {
1783		if (el->offset >= tdb->transaction->old_map_size) {
1784			continue;
1785		}
1786		recovery_size += 2*sizeof(tdb_off_t) + el->length;
1787	}
1788
1789	return recovery_size;
1790}
1791
1792/*
1793  allocate the recovery area, or use an existing recovery area if it is
1794  large enough
1795*/
1796static int tdb_recovery_allocate(struct tdb_context *tdb,
1797				 tdb_len_t *recovery_size,
1798				 tdb_off_t *recovery_offset,
1799				 tdb_len_t *recovery_max_size)
1800{
1801	struct list_struct rec;
1802	const struct tdb_methods *methods = tdb->transaction->io_methods;
1803	tdb_off_t recovery_head;
1804
1805	if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
1806		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
1807		return -1;
1808	}
1809
1810	rec.rec_len = 0;
1811
1812	if (recovery_head != 0 &&
1813	    methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
1814		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery record\n"));
1815		return -1;
1816	}
1817
1818	*recovery_size = tdb_recovery_size(tdb);
1819
1820	if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
1821		/* it fits in the existing area */
1822		*recovery_max_size = rec.rec_len;
1823		*recovery_offset = recovery_head;
1824		return 0;
1825	}
1826
1827	/* we need to free up the old recovery area, then allocate a
1828	   new one at the end of the file. Note that we cannot use
1829	   tdb_allocate() to allocate the new one as that might return
1830	   us an area that is being currently used (as of the start of
1831	   the transaction) */
1832	if (recovery_head != 0) {
1833		if (tdb_free(tdb, recovery_head, &rec) == -1) {
1834			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to free previous recovery area\n"));
1835			return -1;
1836		}
1837	}
1838
1839	/* the tdb_free() call might have increased the recovery size */
1840	*recovery_size = tdb_recovery_size(tdb);
1841
1842	/* round up to a multiple of page size */
1843	*recovery_max_size = TDB_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
1844	*recovery_offset = tdb->map_size;
1845	recovery_head = *recovery_offset;
1846
1847	if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
1848				     (tdb->map_size - tdb->transaction->old_map_size) +
1849				     sizeof(rec) + *recovery_max_size) == -1) {
1850		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
1851		return -1;
1852	}
1853
1854	/* remap the file (if using mmap) */
1855	methods->tdb_oob(tdb, tdb->map_size + 1, 1);
1856
1857	/* we have to reset the old map size so that we don't try to expand the file
1858	   again in the transaction commit, which would destroy the recovery area */
1859	tdb->transaction->old_map_size = tdb->map_size;
1860
1861	/* write the recovery header offset and sync - we can sync without a race here
1862	   as the magic ptr in the recovery record has not been set */
1863	CONVERT(recovery_head);
1864	if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD,
1865			       &recovery_head, sizeof(tdb_off_t)) == -1) {
1866		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
1867		return -1;
1868	}
1869
1870	return 0;
1871}
1872
1873
1874/*
1875  setup the recovery data that will be used on a crash during commit
1876*/
1877static int transaction_setup_recovery(struct tdb_context *tdb,
1878				      tdb_off_t *magic_offset)
1879{
1880	struct tdb_transaction_el *el;
1881	tdb_len_t recovery_size;
1882	unsigned char *data, *p;
1883	const struct tdb_methods *methods = tdb->transaction->io_methods;
1884	struct list_struct *rec;
1885	tdb_off_t recovery_offset, recovery_max_size;
1886	tdb_off_t old_map_size = tdb->transaction->old_map_size;
1887	u32 magic, tailer;
1888
1889	/*
1890	  check that the recovery area has enough space
1891	*/
1892	if (tdb_recovery_allocate(tdb, &recovery_size,
1893				  &recovery_offset, &recovery_max_size) == -1) {
1894		return -1;
1895	}
1896
1897	data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
1898	if (data == NULL) {
1899		tdb->ecode = TDB_ERR_OOM;
1900		return -1;
1901	}
1902
1903	rec = (struct list_struct *)data;
1904	memset(rec, 0, sizeof(*rec));
1905
1906	rec->magic    = 0;
1907	rec->data_len = recovery_size;
1908	rec->rec_len  = recovery_max_size;
1909	rec->key_len  = old_map_size;
1910	CONVERT(rec);
1911
1912	/* build the recovery data into a single blob to allow us to do a single
1913	   large write, which should be more efficient */
1914	p = data + sizeof(*rec);
1915	for (el=tdb->transaction->elements;el;el=el->next) {
1916		if (el->offset >= old_map_size) {
1917			continue;
1918		}
1919		if (el->offset + el->length > tdb->transaction->old_map_size) {
1920			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
1921			free(data);
1922			tdb->ecode = TDB_ERR_CORRUPT;
1923			return -1;
1924		}
1925		memcpy(p, &el->offset, 4);
1926		memcpy(p+4, &el->length, 4);
1927		if (DOCONV()) {
1928			tdb_convert(p, 8);
1929		}
1930		/* the recovery area contains the old data, not the
1931		   new data, so we have to call the original tdb_read
1932		   method to get it */
1933		if (methods->tdb_read(tdb, el->offset, p + 8, el->length, 0) != 0) {
1934			free(data);
1935			tdb->ecode = TDB_ERR_IO;
1936			return -1;
1937		}
1938		p += 8 + el->length;
1939	}
1940
1941	/* and the tailer */
1942	tailer = sizeof(*rec) + recovery_max_size;
1943	memcpy(p, &tailer, 4);
1944	CONVERT(p);
1945
1946	/* write the recovery data to the recovery area */
1947	if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
1948		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
1949		free(data);
1950		tdb->ecode = TDB_ERR_IO;
1951		return -1;
1952	}
1953
1954	/* as we don't have ordered writes, we have to sync the recovery
1955	   data before we update the magic to indicate that the recovery
1956	   data is present */
1957	if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
1958		free(data);
1959		return -1;
1960	}
1961
1962	free(data);
1963
1964	magic = TDB_RECOVERY_MAGIC;
1965	CONVERT(magic);
1966
1967	*magic_offset = recovery_offset + offsetof(struct list_struct, magic);
1968
1969	if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
1970		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
1971		tdb->ecode = TDB_ERR_IO;
1972		return -1;
1973	}
1974
1975	/* ensure the recovery magic marker is on disk */
1976	if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
1977		return -1;
1978	}
1979
1980	return 0;
1981}
1982
1983/*
1984  commit the current transaction
1985*/
1986int tdb_transaction_commit(struct tdb_context *tdb)
1987{
1988	const struct tdb_methods *methods;
1989	tdb_off_t magic_offset = 0;
1990	u32 zero = 0;
1991
1992	if (tdb->transaction == NULL) {
1993		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
1994		return -1;
1995	}
1996
1997	if (tdb->transaction->transaction_error) {
1998		tdb->ecode = TDB_ERR_IO;
1999		tdb_transaction_cancel(tdb);
2000		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
2001		return -1;
2002	}
2003
2004	if (tdb->transaction->nesting != 0) {
2005		tdb->transaction->nesting--;
2006		return 0;
2007	}
2008
2009	/* check for a null transaction */
2010	if (tdb->transaction->elements == NULL) {
2011		tdb_transaction_cancel(tdb);
2012		return 0;
2013	}
2014
2015	methods = tdb->transaction->io_methods;
2016
2017	/* if there are any locks pending then the caller has not
2018	   nested their locks properly, so fail the transaction */
2019	if (tdb->num_locks || tdb->global_lock.count) {
2020		tdb->ecode = TDB_ERR_LOCK;
2021		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: locks pending on commit\n"));
2022		tdb_transaction_cancel(tdb);
2023		return -1;
2024	}
2025
2026	/* upgrade the main transaction lock region to a write lock */
2027	if (tdb_brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) {
2028		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to upgrade hash locks\n"));
2029		tdb->ecode = TDB_ERR_LOCK;
2030		tdb_transaction_cancel(tdb);
2031		return -1;
2032	}
2033
2034	/* get the global lock - this prevents new users attaching to the database
2035	   during the commit */
2036	if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
2037		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: failed to get global lock\n"));
2038		tdb->ecode = TDB_ERR_LOCK;
2039		tdb_transaction_cancel(tdb);
2040		return -1;
2041	}
2042
2043	if (!(tdb->flags & TDB_NOSYNC)) {
2044		/* write the recovery data to the end of the file */
2045		if (transaction_setup_recovery(tdb, &magic_offset) == -1) {
2046			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to setup recovery data\n"));
2047			tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
2048			tdb_transaction_cancel(tdb);
2049			return -1;
2050		}
2051	}
2052
2053	/* expand the file to the new size if needed */
2054	if (tdb->map_size != tdb->transaction->old_map_size) {
2055		if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
2056					     tdb->map_size -
2057					     tdb->transaction->old_map_size) == -1) {
2058			tdb->ecode = TDB_ERR_IO;
2059			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: expansion failed\n"));
2060			tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
2061			tdb_transaction_cancel(tdb);
2062			return -1;
2063		}
2064		tdb->map_size = tdb->transaction->old_map_size;
2065		methods->tdb_oob(tdb, tdb->map_size + 1, 1);
2066	}
2067
2068	/* perform all the writes */
2069	while (tdb->transaction->elements) {
2070		struct tdb_transaction_el *el = tdb->transaction->elements;
2071
2072		if (methods->tdb_write(tdb, el->offset, el->data, el->length) == -1) {
2073			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
2074
2075			/* we've overwritten part of the data and
2076			   possibly expanded the file, so we need to
2077			   run the crash recovery code */
2078			tdb->methods = methods;
2079			tdb_transaction_recover(tdb);
2080
2081			tdb_transaction_cancel(tdb);
2082			tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
2083
2084			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
2085			return -1;
2086		}
2087		tdb->transaction->elements = el->next;
2088		free(el->data);
2089		free(el);
2090	}
2091
2092	if (!(tdb->flags & TDB_NOSYNC)) {
2093		/* ensure the new data is on disk */
2094		if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
2095			return -1;
2096		}
2097
2098		/* remove the recovery marker */
2099		if (methods->tdb_write(tdb, magic_offset, &zero, 4) == -1) {
2100			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to remove recovery magic\n"));
2101			return -1;
2102		}
2103
2104		/* ensure the recovery marker has been removed on disk */
2105		if (transaction_sync(tdb, magic_offset, 4) == -1) {
2106			return -1;
2107		}
2108	}
2109
2110	tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
2111
2112	/*
2113	  TODO: maybe write to some dummy hdr field, or write to magic
2114	  offset without mmap, before the last sync, instead of the
2115	  utime() call
2116	*/
2117
2118	/* on some systems (like Linux 2.6.x) changes via mmap/msync
2119	   don't change the mtime of the file, this means the file may
2120	   not be backed up (as tdb rounding to block sizes means that
2121	   file size changes are quite rare too). The following forces
2122	   mtime changes when a transaction completes */
2123#ifdef HAVE_UTIME
2124	utime(tdb->name, NULL);
2125#endif
2126
2127	/* use a transaction cancel to free memory and remove the
2128	   transaction locks */
2129	tdb_transaction_cancel(tdb);
2130	return 0;
2131}
2132
2133
2134/*
2135  recover from an aborted transaction. Must be called with exclusive
2136  database write access already established (including the global
2137  lock to prevent new processes attaching)
2138*/
2139int tdb_transaction_recover(struct tdb_context *tdb)
2140{
2141	tdb_off_t recovery_head, recovery_eof;
2142	unsigned char *data, *p;
2143	u32 zero = 0;
2144	struct list_struct rec;
2145
2146	/* find the recovery area */
2147	if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
2148		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery head\n"));
2149		tdb->ecode = TDB_ERR_IO;
2150		return -1;
2151	}
2152
2153	if (recovery_head == 0) {
2154		/* we have never allocated a recovery record */
2155		return 0;
2156	}
2157
2158	/* read the recovery record */
2159	if (tdb->methods->tdb_read(tdb, recovery_head, &rec,
2160				   sizeof(rec), DOCONV()) == -1) {
2161		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));
2162		tdb->ecode = TDB_ERR_IO;
2163		return -1;
2164	}
2165
2166	if (rec.magic != TDB_RECOVERY_MAGIC) {
2167		/* there is no valid recovery data */
2168		return 0;
2169	}
2170
2171	if (tdb->read_only) {
2172		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: attempt to recover read only database\n"));
2173		tdb->ecode = TDB_ERR_CORRUPT;
2174		return -1;
2175	}
2176
2177	recovery_eof = rec.key_len;
2178
2179	data = (unsigned char *)malloc(rec.data_len);
2180	if (data == NULL) {
2181		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));
2182		tdb->ecode = TDB_ERR_OOM;
2183		return -1;
2184	}
2185
2186	/* read the full recovery data */
2187	if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
2188				   rec.data_len, 0) == -1) {
2189		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));
2190		tdb->ecode = TDB_ERR_IO;
2191		return -1;
2192	}
2193
2194	/* recover the file data */
2195	p = data;
2196	while (p+8 < data + rec.data_len) {
2197		u32 ofs, len;
2198		if (DOCONV()) {
2199			tdb_convert(p, 8);
2200		}
2201		memcpy(&ofs, p, 4);
2202		memcpy(&len, p+4, 4);
2203
2204		if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
2205			free(data);
2206			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len, ofs));
2207			tdb->ecode = TDB_ERR_IO;
2208			return -1;
2209		}
2210		p += 8 + len;
2211	}
2212
2213	free(data);
2214
2215	if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
2216		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync recovery\n"));
2217		tdb->ecode = TDB_ERR_IO;
2218		return -1;
2219	}
2220
2221	/* if the recovery area is after the recovered eof then remove it */
2222	if (recovery_eof <= recovery_head) {
2223		if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
2224			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
2225			tdb->ecode = TDB_ERR_IO;
2226			return -1;
2227		}
2228	}
2229
2230	/* remove the recovery magic */
2231	if (tdb_ofs_write(tdb, recovery_head + offsetof(struct list_struct, magic),
2232			  &zero) == -1) {
2233		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
2234		tdb->ecode = TDB_ERR_IO;
2235		return -1;
2236	}
2237
2238	/* reduce the file size to the old size */
2239	tdb_munmap(tdb);
2240	if (ftruncate(tdb->fd, recovery_eof) != 0) {
2241		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to reduce to recovery size\n"));
2242		tdb->ecode = TDB_ERR_IO;
2243		return -1;
2244	}
2245	tdb->map_size = recovery_eof;
2246	tdb_mmap(tdb);
2247
2248	if (transaction_sync(tdb, 0, recovery_eof) == -1) {
2249		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
2250		tdb->ecode = TDB_ERR_IO;
2251		return -1;
2252	}
2253
2254	TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %d byte database\n",
2255		 recovery_eof));
2256
2257	/* all done */
2258	return 0;
2259}
2260
2261/* file: freelist.c */
2262
2263/* read a freelist record and check for simple errors */
2264static int tdb_rec_free_read(struct tdb_context *tdb, tdb_off_t off, struct list_struct *rec)
2265{
2266	if (tdb->methods->tdb_read(tdb, off, rec, sizeof(*rec),DOCONV()) == -1)
2267		return -1;
2268
2269	if (rec->magic == TDB_MAGIC) {
2270		/* this happens when a app is showdown while deleting a record - we should
2271		   not completely fail when this happens */
2272		TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_rec_free_read non-free magic 0x%x at offset=%d - fixing\n",
2273			 rec->magic, off));
2274		rec->magic = TDB_FREE_MAGIC;
2275		if (tdb->methods->tdb_write(tdb, off, rec, sizeof(*rec)) == -1)
2276			return -1;
2277	}
2278
2279	if (rec->magic != TDB_FREE_MAGIC) {
2280		/* Ensure ecode is set for log fn. */
2281		tdb->ecode = TDB_ERR_CORRUPT;
2282		TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_rec_free_read bad magic 0x%x at offset=%d\n",
2283			   rec->magic, off));
2284		return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
2285	}
2286	if (tdb->methods->tdb_oob(tdb, rec->next+sizeof(*rec), 0) != 0)
2287		return -1;
2288	return 0;
2289}
2290
2291
2292
2293/* Remove an element from the freelist.  Must have alloc lock. */
2294static int remove_from_freelist(struct tdb_context *tdb, tdb_off_t off, tdb_off_t next)
2295{
2296	tdb_off_t last_ptr, i;
2297
2298	/* read in the freelist top */
2299	last_ptr = FREELIST_TOP;
2300	while (tdb_ofs_read(tdb, last_ptr, &i) != -1 && i != 0) {
2301		if (i == off) {
2302			/* We've found it! */
2303			return tdb_ofs_write(tdb, last_ptr, &next);
2304		}
2305		/* Follow chain (next offset is at start of record) */
2306		last_ptr = i;
2307	}
2308	TDB_LOG((tdb, TDB_DEBUG_FATAL,"remove_from_freelist: not on list at off=%d\n", off));
2309	return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
2310}
2311
2312
2313/* update a record tailer (must hold allocation lock) */
2314static int update_tailer(struct tdb_context *tdb, tdb_off_t offset,
2315			 const struct list_struct *rec)
2316{
2317	tdb_off_t totalsize;
2318
2319	/* Offset of tailer from record header */
2320	totalsize = sizeof(*rec) + rec->rec_len;
2321	return tdb_ofs_write(tdb, offset + totalsize - sizeof(tdb_off_t),
2322			 &totalsize);
2323}
2324
2325/* Add an element into the freelist. Merge adjacent records if
2326   neccessary. */
2327int tdb_free(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec)
2328{
2329	tdb_off_t right, left;
2330
2331	/* Allocation and tailer lock */
2332	if (tdb_lock(tdb, -1, F_WRLCK) != 0)
2333		return -1;
2334
2335	/* set an initial tailer, so if we fail we don't leave a bogus record */
2336	if (update_tailer(tdb, offset, rec) != 0) {
2337		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: update_tailer failed!\n"));
2338		goto fail;
2339	}
2340
2341	/* Look right first (I'm an Australian, dammit) */
2342	right = offset + sizeof(*rec) + rec->rec_len;
2343	if (right + sizeof(*rec) <= tdb->map_size) {
2344		struct list_struct r;
2345
2346		if (tdb->methods->tdb_read(tdb, right, &r, sizeof(r), DOCONV()) == -1) {
2347			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: right read failed at %u\n", right));
2348			goto left;
2349		}
2350
2351		/* If it's free, expand to include it. */
2352		if (r.magic == TDB_FREE_MAGIC) {
2353			if (remove_from_freelist(tdb, right, r.next) == -1) {
2354				TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: right free failed at %u\n", right));
2355				goto left;
2356			}
2357			rec->rec_len += sizeof(r) + r.rec_len;
2358		}
2359	}
2360
2361left:
2362	/* Look left */
2363	left = offset - sizeof(tdb_off_t);
2364	if (left > TDB_DATA_START(tdb->header.hash_size)) {
2365		struct list_struct l;
2366		tdb_off_t leftsize;
2367
2368		/* Read in tailer and jump back to header */
2369		if (tdb_ofs_read(tdb, left, &leftsize) == -1) {
2370			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: left offset read failed at %u\n", left));
2371			goto update;
2372		}
2373
2374		/* it could be uninitialised data */
2375		if (leftsize == 0 || leftsize == TDB_PAD_U32) {
2376			goto update;
2377		}
2378
2379		left = offset - leftsize;
2380
2381		/* Now read in record */
2382		if (tdb->methods->tdb_read(tdb, left, &l, sizeof(l), DOCONV()) == -1) {
2383			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: left read failed at %u (%u)\n", left, leftsize));
2384			goto update;
2385		}
2386
2387		/* If it's free, expand to include it. */
2388		if (l.magic == TDB_FREE_MAGIC) {
2389			if (remove_from_freelist(tdb, left, l.next) == -1) {
2390				TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: left free failed at %u\n", left));
2391				goto update;
2392			} else {
2393				offset = left;
2394				rec->rec_len += leftsize;
2395			}
2396		}
2397	}
2398
2399update:
2400	if (update_tailer(tdb, offset, rec) == -1) {
2401		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: update_tailer failed at %u\n", offset));
2402		goto fail;
2403	}
2404
2405	/* Now, prepend to free list */
2406	rec->magic = TDB_FREE_MAGIC;
2407
2408	if (tdb_ofs_read(tdb, FREELIST_TOP, &rec->next) == -1 ||
2409	    tdb_rec_write(tdb, offset, rec) == -1 ||
2410	    tdb_ofs_write(tdb, FREELIST_TOP, &offset) == -1) {
2411		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free record write failed at offset=%d\n", offset));
2412		goto fail;
2413	}
2414
2415	/* And we're done. */
2416	tdb_unlock(tdb, -1, F_WRLCK);
2417	return 0;
2418
2419 fail:
2420	tdb_unlock(tdb, -1, F_WRLCK);
2421	return -1;
2422}
2423
2424
2425/*
2426   the core of tdb_allocate - called when we have decided which
2427   free list entry to use
2428 */
2429static tdb_off_t tdb_allocate_ofs(struct tdb_context *tdb, tdb_len_t length, tdb_off_t rec_ptr,
2430				struct list_struct *rec, tdb_off_t last_ptr)
2431{
2432	struct list_struct newrec;
2433	tdb_off_t newrec_ptr;
2434
2435	memset(&newrec, '\0', sizeof(newrec));
2436
2437	/* found it - now possibly split it up  */
2438	if (rec->rec_len > length + MIN_REC_SIZE) {
2439		/* Length of left piece */
2440		length = TDB_ALIGN(length, TDB_ALIGNMENT);
2441
2442		/* Right piece to go on free list */
2443		newrec.rec_len = rec->rec_len - (sizeof(*rec) + length);
2444		newrec_ptr = rec_ptr + sizeof(*rec) + length;
2445
2446		/* And left record is shortened */
2447		rec->rec_len = length;
2448	} else {
2449		newrec_ptr = 0;
2450	}
2451
2452	/* Remove allocated record from the free list */
2453	if (tdb_ofs_write(tdb, last_ptr, &rec->next) == -1) {
2454		return 0;
2455	}
2456
2457	/* Update header: do this before we drop alloc
2458	   lock, otherwise tdb_free() might try to
2459	   merge with us, thinking we're free.
2460	   (Thanks Jeremy Allison). */
2461	rec->magic = TDB_MAGIC;
2462	if (tdb_rec_write(tdb, rec_ptr, rec) == -1) {
2463		return 0;
2464	}
2465
2466	/* Did we create new block? */
2467	if (newrec_ptr) {
2468		/* Update allocated record tailer (we
2469		   shortened it). */
2470		if (update_tailer(tdb, rec_ptr, rec) == -1) {
2471			return 0;
2472		}
2473
2474		/* Free new record */
2475		if (tdb_free(tdb, newrec_ptr, &newrec) == -1) {
2476			return 0;
2477		}
2478	}
2479
2480	/* all done - return the new record offset */
2481	return rec_ptr;
2482}
2483
2484/* allocate some space from the free list. The offset returned points
2485   to a unconnected list_struct within the database with room for at
2486   least length bytes of total data
2487
2488   0 is returned if the space could not be allocated
2489 */
2490tdb_off_t tdb_allocate(struct tdb_context *tdb, tdb_len_t length, struct list_struct *rec)
2491{
2492	tdb_off_t rec_ptr, last_ptr, newrec_ptr;
2493	struct {
2494		tdb_off_t rec_ptr, last_ptr;
2495		tdb_len_t rec_len;
2496	} bestfit;
2497
2498	if (tdb_lock(tdb, -1, F_WRLCK) == -1)
2499		return 0;
2500
2501	/* Extra bytes required for tailer */
2502	length += sizeof(tdb_off_t);
2503
2504 again:
2505	last_ptr = FREELIST_TOP;
2506
2507	/* read in the freelist top */
2508	if (tdb_ofs_read(tdb, FREELIST_TOP, &rec_ptr) == -1)
2509		goto fail;
2510
2511	bestfit.rec_ptr = 0;
2512	bestfit.last_ptr = 0;
2513	bestfit.rec_len = 0;
2514
2515	/*
2516	   this is a best fit allocation strategy. Originally we used
2517	   a first fit strategy, but it suffered from massive fragmentation
2518	   issues when faced with a slowly increasing record size.
2519	 */
2520	while (rec_ptr) {
2521		if (tdb_rec_free_read(tdb, rec_ptr, rec) == -1) {
2522			goto fail;
2523		}
2524
2525		if (rec->rec_len >= length) {
2526			if (bestfit.rec_ptr == 0 ||
2527			    rec->rec_len < bestfit.rec_len) {
2528				bestfit.rec_len = rec->rec_len;
2529				bestfit.rec_ptr = rec_ptr;
2530				bestfit.last_ptr = last_ptr;
2531				/* consider a fit to be good enough if
2532				   we aren't wasting more than half
2533				   the space */
2534				if (bestfit.rec_len < 2*length) {
2535					break;
2536				}
2537			}
2538		}
2539
2540		/* move to the next record */
2541		last_ptr = rec_ptr;
2542		rec_ptr = rec->next;
2543	}
2544
2545	if (bestfit.rec_ptr != 0) {
2546		if (tdb_rec_free_read(tdb, bestfit.rec_ptr, rec) == -1) {
2547			goto fail;
2548		}
2549
2550		newrec_ptr = tdb_allocate_ofs(tdb, length, bestfit.rec_ptr, rec, bestfit.last_ptr);
2551		tdb_unlock(tdb, -1, F_WRLCK);
2552		return newrec_ptr;
2553	}
2554
2555	/* we didn't find enough space. See if we can expand the
2556	   database and if we can then try again */
2557	if (tdb_expand(tdb, length + sizeof(*rec)) == 0)
2558		goto again;
2559 fail:
2560	tdb_unlock(tdb, -1, F_WRLCK);
2561	return 0;
2562}
2563
2564/* file: freelistcheck.c */
2565
2566/* Check the freelist is good and contains no loops.
2567   Very memory intensive - only do this as a consistency
2568   checker. Heh heh - uses an in memory tdb as the storage
2569   for the "seen" record list. For some reason this strikes
2570   me as extremely clever as I don't have to write another tree
2571   data structure implementation :-).
2572 */
2573
2574static int seen_insert(struct tdb_context *mem_tdb, tdb_off_t rec_ptr)
2575{
2576	TDB_DATA key, data;
2577
2578	memset(&data, '\0', sizeof(data));
2579	key.dptr = (unsigned char *)&rec_ptr;
2580	key.dsize = sizeof(rec_ptr);
2581	return tdb_store(mem_tdb, key, data, TDB_INSERT);
2582}
2583
2584int tdb_validate_freelist(struct tdb_context *tdb, int *pnum_entries)
2585{
2586	struct tdb_context *mem_tdb = NULL;
2587	struct list_struct rec;
2588	tdb_off_t rec_ptr, last_ptr;
2589	int ret = -1;
2590
2591	*pnum_entries = 0;
2592
2593	mem_tdb = tdb_open("flval", tdb->header.hash_size,
2594				TDB_INTERNAL, O_RDWR, 0600);
2595	if (!mem_tdb) {
2596		return -1;
2597	}
2598
2599	if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
2600		tdb_close(mem_tdb);
2601		return 0;
2602	}
2603
2604	last_ptr = FREELIST_TOP;
2605
2606	/* Store the FREELIST_TOP record. */
2607	if (seen_insert(mem_tdb, last_ptr) == -1) {
2608		ret = TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
2609		goto fail;
2610	}
2611
2612	/* read in the freelist top */
2613	if (tdb_ofs_read(tdb, FREELIST_TOP, &rec_ptr) == -1) {
2614		goto fail;
2615	}
2616
2617	while (rec_ptr) {
2618
2619		/* If we can't store this record (we've seen it
2620		   before) then the free list has a loop and must
2621		   be corrupt. */
2622
2623		if (seen_insert(mem_tdb, rec_ptr)) {
2624			ret = TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
2625			goto fail;
2626		}
2627
2628		if (tdb_rec_free_read(tdb, rec_ptr, &rec) == -1) {
2629			goto fail;
2630		}
2631
2632		/* move to the next record */
2633		last_ptr = rec_ptr;
2634		rec_ptr = rec.next;
2635		*pnum_entries += 1;
2636	}
2637
2638	ret = 0;
2639
2640  fail:
2641
2642	tdb_close(mem_tdb);
2643	tdb_unlock(tdb, -1, F_WRLCK);
2644	return ret;
2645}
2646
2647/* file: traverse.c */
2648
2649/* Uses traverse lock: 0 = finish, -1 = error, other = record offset */
2650static int tdb_next_lock(struct tdb_context *tdb, struct tdb_traverse_lock *tlock,
2651			 struct list_struct *rec)
2652{
2653	int want_next = (tlock->off != 0);
2654
2655	/* Lock each chain from the start one. */
2656	for (; tlock->hash < tdb->header.hash_size; tlock->hash++) {
2657		if (!tlock->off && tlock->hash != 0) {
2658			/* this is an optimisation for the common case where
2659			   the hash chain is empty, which is particularly
2660			   common for the use of tdb with ldb, where large
2661			   hashes are used. In that case we spend most of our
2662			   time in tdb_brlock(), locking empty hash chains.
2663
2664			   To avoid this, we do an unlocked pre-check to see
2665			   if the hash chain is empty before starting to look
2666			   inside it. If it is empty then we can avoid that
2667			   hash chain. If it isn't empty then we can't believe
2668			   the value we get back, as we read it without a
2669			   lock, so instead we get the lock and re-fetch the
2670			   value below.
2671
2672			   Notice that not doing this optimisation on the
2673			   first hash chain is critical. We must guarantee
2674			   that we have done at least one fcntl lock at the
2675			   start of a search to guarantee that memory is
2676			   coherent on SMP systems. If records are added by
2677			   others during the search then thats OK, and we
2678			   could possibly miss those with this trick, but we
2679			   could miss them anyway without this trick, so the
2680			   semantics don't change.
2681
2682			   With a non-indexed ldb search this trick gains us a
2683			   factor of around 80 in speed on a linux 2.6.x
2684			   system (testing using ldbtest).
2685			*/
2686			tdb->methods->next_hash_chain(tdb, &tlock->hash);
2687			if (tlock->hash == tdb->header.hash_size) {
2688				continue;
2689			}
2690		}
2691
2692		if (tdb_lock(tdb, tlock->hash, tlock->lock_rw) == -1)
2693			return -1;
2694
2695		/* No previous record?  Start at top of chain. */
2696		if (!tlock->off) {
2697			if (tdb_ofs_read(tdb, TDB_HASH_TOP(tlock->hash),
2698				     &tlock->off) == -1)
2699				goto fail;
2700		} else {
2701			/* Otherwise unlock the previous record. */
2702			if (tdb_unlock_record(tdb, tlock->off) != 0)
2703				goto fail;
2704		}
2705
2706		if (want_next) {
2707			/* We have offset of old record: grab next */
2708			if (tdb_rec_read(tdb, tlock->off, rec) == -1)
2709				goto fail;
2710			tlock->off = rec->next;
2711		}
2712
2713		/* Iterate through chain */
2714		while( tlock->off) {
2715			tdb_off_t current;
2716			if (tdb_rec_read(tdb, tlock->off, rec) == -1)
2717				goto fail;
2718
2719			/* Detect infinite loops. From "Shlomi Yaakobovich" <Shlomi@exanet.com>. */
2720			if (tlock->off == rec->next) {
2721				TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_next_lock: loop detected.\n"));
2722				goto fail;
2723			}
2724
2725			if (!TDB_DEAD(rec)) {
2726				/* Woohoo: we found one! */
2727				if (tdb_lock_record(tdb, tlock->off) != 0)
2728					goto fail;
2729				return tlock->off;
2730			}
2731
2732			/* Try to clean dead ones from old traverses */
2733			current = tlock->off;
2734			tlock->off = rec->next;
2735			if (!(tdb->read_only || tdb->traverse_read) &&
2736			    tdb_do_delete(tdb, current, rec) != 0)
2737				goto fail;
2738		}
2739		tdb_unlock(tdb, tlock->hash, tlock->lock_rw);
2740		want_next = 0;
2741	}
2742	/* We finished iteration without finding anything */
2743	return TDB_ERRCODE(TDB_SUCCESS, 0);
2744
2745 fail:
2746	tlock->off = 0;
2747	if (tdb_unlock(tdb, tlock->hash, tlock->lock_rw) != 0)
2748		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_next_lock: On error unlock failed!\n"));
2749	return -1;
2750}
2751
2752/* traverse the entire database - calling fn(tdb, key, data) on each element.
2753   return -1 on error or the record count traversed
2754   if fn is NULL then it is not called
2755   a non-zero return value from fn() indicates that the traversal should stop
2756  */
2757static int tdb_traverse_internal(struct tdb_context *tdb,
2758				 tdb_traverse_func fn, void *private_data,
2759				 struct tdb_traverse_lock *tl)
2760{
2761	TDB_DATA key, dbuf;
2762	struct list_struct rec;
2763	int ret, count = 0;
2764
2765	/* This was in the initializaton, above, but the IRIX compiler
2766	 * did not like it.  crh
2767	 */
2768	tl->next = tdb->travlocks.next;
2769
2770	/* fcntl locks don't stack: beware traverse inside traverse */
2771	tdb->travlocks.next = tl;
2772
2773	/* tdb_next_lock places locks on the record returned, and its chain */
2774	while ((ret = tdb_next_lock(tdb, tl, &rec)) > 0) {
2775		count++;
2776		/* now read the full record */
2777		key.dptr = tdb_alloc_read(tdb, tl->off + sizeof(rec),
2778					  rec.key_len + rec.data_len);
2779		if (!key.dptr) {
2780			ret = -1;
2781			if (tdb_unlock(tdb, tl->hash, tl->lock_rw) != 0)
2782				goto out;
2783			if (tdb_unlock_record(tdb, tl->off) != 0)
2784				TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_traverse: key.dptr == NULL and unlock_record failed!\n"));
2785			goto out;
2786		}
2787		key.dsize = rec.key_len;
2788		dbuf.dptr = key.dptr + rec.key_len;
2789		dbuf.dsize = rec.data_len;
2790
2791		/* Drop chain lock, call out */
2792		if (tdb_unlock(tdb, tl->hash, tl->lock_rw) != 0) {
2793			ret = -1;
2794			SAFE_FREE(key.dptr);
2795			goto out;
2796		}
2797		if (fn && fn(tdb, key, dbuf, private_data)) {
2798			/* They want us to terminate traversal */
2799			ret = count;
2800			if (tdb_unlock_record(tdb, tl->off) != 0) {
2801				TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_traverse: unlock_record failed!\n"));;
2802				ret = -1;
2803			}
2804			SAFE_FREE(key.dptr);
2805			goto out;
2806		}
2807		SAFE_FREE(key.dptr);
2808	}
2809out:
2810	tdb->travlocks.next = tl->next;
2811	if (ret < 0)
2812		return -1;
2813	else
2814		return count;
2815}
2816
2817
2818/*
2819  a write style traverse - temporarily marks the db read only
2820*/
2821int tdb_traverse_read(struct tdb_context *tdb,
2822		      tdb_traverse_func fn, void *private_data)
2823{
2824	struct tdb_traverse_lock tl = { NULL, 0, 0, F_RDLCK };
2825	int ret;
2826
2827	/* we need to get a read lock on the transaction lock here to
2828	   cope with the lock ordering semantics of solaris10 */
2829	if (tdb_transaction_lock(tdb, F_RDLCK)) {
2830		return -1;
2831	}
2832
2833	tdb->traverse_read++;
2834	ret = tdb_traverse_internal(tdb, fn, private_data, &tl);
2835	tdb->traverse_read--;
2836
2837	tdb_transaction_unlock(tdb);
2838
2839	return ret;
2840}
2841
2842/*
2843  a write style traverse - needs to get the transaction lock to
2844  prevent deadlocks
2845*/
2846int tdb_traverse(struct tdb_context *tdb,
2847		 tdb_traverse_func fn, void *private_data)
2848{
2849	struct tdb_traverse_lock tl = { NULL, 0, 0, F_WRLCK };
2850	int ret;
2851
2852	if (tdb->read_only || tdb->traverse_read) {
2853		return tdb_traverse_read(tdb, fn, private_data);
2854	}
2855
2856	if (tdb_transaction_lock(tdb, F_WRLCK)) {
2857		return -1;
2858	}
2859
2860	ret = tdb_traverse_internal(tdb, fn, private_data, &tl);
2861
2862	tdb_transaction_unlock(tdb);
2863
2864	return ret;
2865}
2866
2867
2868/* find the first entry in the database and return its key */
2869TDB_DATA tdb_firstkey(struct tdb_context *tdb)
2870{
2871	TDB_DATA key;
2872	struct list_struct rec;
2873
2874	/* release any old lock */
2875	if (tdb_unlock_record(tdb, tdb->travlocks.off) != 0)
2876		return tdb_null;
2877	tdb->travlocks.off = tdb->travlocks.hash = 0;
2878	tdb->travlocks.lock_rw = F_RDLCK;
2879
2880	/* Grab first record: locks chain and returned record. */
2881	if (tdb_next_lock(tdb, &tdb->travlocks, &rec) <= 0)
2882		return tdb_null;
2883	/* now read the key */
2884	key.dsize = rec.key_len;
2885	key.dptr =tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),key.dsize);
2886
2887	/* Unlock the hash chain of the record we just read. */
2888	if (tdb_unlock(tdb, tdb->travlocks.hash, tdb->travlocks.lock_rw) != 0)
2889		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_firstkey: error occurred while tdb_unlocking!\n"));
2890	return key;
2891}
2892
2893/* find the next entry in the database, returning its key */
2894TDB_DATA tdb_nextkey(struct tdb_context *tdb, TDB_DATA oldkey)
2895{
2896	u32 oldhash;
2897	TDB_DATA key = tdb_null;
2898	struct list_struct rec;
2899	unsigned char *k = NULL;
2900
2901	/* Is locked key the old key?  If so, traverse will be reliable. */
2902	if (tdb->travlocks.off) {
2903		if (tdb_lock(tdb,tdb->travlocks.hash,tdb->travlocks.lock_rw))
2904			return tdb_null;
2905		if (tdb_rec_read(tdb, tdb->travlocks.off, &rec) == -1
2906		    || !(k = tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),
2907					    rec.key_len))
2908		    || memcmp(k, oldkey.dptr, oldkey.dsize) != 0) {
2909			/* No, it wasn't: unlock it and start from scratch */
2910			if (tdb_unlock_record(tdb, tdb->travlocks.off) != 0) {
2911				SAFE_FREE(k);
2912				return tdb_null;
2913			}
2914			if (tdb_unlock(tdb, tdb->travlocks.hash, tdb->travlocks.lock_rw) != 0) {
2915				SAFE_FREE(k);
2916				return tdb_null;
2917			}
2918			tdb->travlocks.off = 0;
2919		}
2920
2921		SAFE_FREE(k);
2922	}
2923
2924	if (!tdb->travlocks.off) {
2925		/* No previous element: do normal find, and lock record */
2926		tdb->travlocks.off = tdb_find_lock_hash(tdb, oldkey, tdb->hash_fn(&oldkey), tdb->travlocks.lock_rw, &rec);
2927		if (!tdb->travlocks.off)
2928			return tdb_null;
2929		tdb->travlocks.hash = BUCKET(rec.full_hash);
2930		if (tdb_lock_record(tdb, tdb->travlocks.off) != 0) {
2931			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_nextkey: lock_record failed (%s)!\n", strerror(errno)));
2932			return tdb_null;
2933		}
2934	}
2935	oldhash = tdb->travlocks.hash;
2936
2937	/* Grab next record: locks chain and returned record,
2938	   unlocks old record */
2939	if (tdb_next_lock(tdb, &tdb->travlocks, &rec) > 0) {
2940		key.dsize = rec.key_len;
2941		key.dptr = tdb_alloc_read(tdb, tdb->travlocks.off+sizeof(rec),
2942					  key.dsize);
2943		/* Unlock the chain of this new record */
2944		if (tdb_unlock(tdb, tdb->travlocks.hash, tdb->travlocks.lock_rw) != 0)
2945			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
2946	}
2947	/* Unlock the chain of old record */
2948	if (tdb_unlock(tdb, BUCKET(oldhash), tdb->travlocks.lock_rw) != 0)
2949		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
2950	return key;
2951}
2952
2953/* file: dump.c */
2954
2955static tdb_off_t tdb_dump_record(struct tdb_context *tdb, int hash,
2956				 tdb_off_t offset)
2957{
2958	struct list_struct rec;
2959	tdb_off_t tailer_ofs, tailer;
2960
2961	if (tdb->methods->tdb_read(tdb, offset, (char *)&rec,
2962				   sizeof(rec), DOCONV()) == -1) {
2963		printf("ERROR: failed to read record at %u\n", offset);
2964		return 0;
2965	}
2966
2967	printf(" rec: hash=%d offset=0x%08x next=0x%08x rec_len=%d "
2968	       "key_len=%d data_len=%d full_hash=0x%x magic=0x%x\n",
2969	       hash, offset, rec.next, rec.rec_len, rec.key_len, rec.data_len,
2970	       rec.full_hash, rec.magic);
2971
2972	tailer_ofs = offset + sizeof(rec) + rec.rec_len - sizeof(tdb_off_t);
2973
2974	if (tdb_ofs_read(tdb, tailer_ofs, &tailer) == -1) {
2975		printf("ERROR: failed to read tailer at %u\n", tailer_ofs);
2976		return rec.next;
2977	}
2978
2979	if (tailer != rec.rec_len + sizeof(rec)) {
2980		printf("ERROR: tailer does not match record! tailer=%u totalsize=%u\n",
2981				(unsigned int)tailer, (unsigned int)(rec.rec_len + sizeof(rec)));
2982	}
2983	return rec.next;
2984}
2985
2986static int tdb_dump_chain(struct tdb_context *tdb, int i)
2987{
2988	tdb_off_t rec_ptr, top;
2989
2990	top = TDB_HASH_TOP(i);
2991
2992	if (tdb_lock(tdb, i, F_WRLCK) != 0)
2993		return -1;
2994
2995	if (tdb_ofs_read(tdb, top, &rec_ptr) == -1)
2996		return tdb_unlock(tdb, i, F_WRLCK);
2997
2998	if (rec_ptr)
2999		printf("hash=%d\n", i);
3000
3001	while (rec_ptr) {
3002		rec_ptr = tdb_dump_record(tdb, i, rec_ptr);
3003	}
3004
3005	return tdb_unlock(tdb, i, F_WRLCK);
3006}
3007
3008void tdb_dump_all(struct tdb_context *tdb)
3009{
3010	int i;
3011	for (i=0;i<tdb->header.hash_size;i++) {
3012		tdb_dump_chain(tdb, i);
3013	}
3014	printf("freelist:\n");
3015	tdb_dump_chain(tdb, -1);
3016}
3017
3018int tdb_printfreelist(struct tdb_context *tdb)
3019{
3020	int ret;
3021	long total_free = 0;
3022	tdb_off_t offset, rec_ptr;
3023	struct list_struct rec;
3024
3025	if ((ret = tdb_lock(tdb, -1, F_WRLCK)) != 0)
3026		return ret;
3027
3028	offset = FREELIST_TOP;
3029
3030	/* read in the freelist top */
3031	if (tdb_ofs_read(tdb, offset, &rec_ptr) == -1) {
3032		tdb_unlock(tdb, -1, F_WRLCK);
3033		return 0;
3034	}
3035
3036	printf("freelist top=[0x%08x]\n", rec_ptr );
3037	while (rec_ptr) {
3038		if (tdb->methods->tdb_read(tdb, rec_ptr, (char *)&rec,
3039					   sizeof(rec), DOCONV()) == -1) {
3040			tdb_unlock(tdb, -1, F_WRLCK);
3041			return -1;
3042		}
3043
3044		if (rec.magic != TDB_FREE_MAGIC) {
3045			printf("bad magic 0x%08x in free list\n", rec.magic);
3046			tdb_unlock(tdb, -1, F_WRLCK);
3047			return -1;
3048		}
3049
3050		printf("entry offset=[0x%08x], rec.rec_len = [0x%08x (%d)] (end = 0x%08x)\n",
3051		       rec_ptr, rec.rec_len, rec.rec_len, rec_ptr + rec.rec_len);
3052		total_free += rec.rec_len;
3053
3054		/* move to the next record */
3055		rec_ptr = rec.next;
3056	}
3057	printf("total rec_len = [0x%08x (%d)]\n", (int)total_free,
3058               (int)total_free);
3059
3060	return tdb_unlock(tdb, -1, F_WRLCK);
3061}
3062
3063/* file: tdb.c */
3064
3065/*
3066  non-blocking increment of the tdb sequence number if the tdb has been opened using
3067  the TDB_SEQNUM flag
3068*/
3069void tdb_increment_seqnum_nonblock(struct tdb_context *tdb)
3070{
3071	tdb_off_t seqnum=0;
3072
3073	if (!(tdb->flags & TDB_SEQNUM)) {
3074		return;
3075	}
3076
3077	/* we ignore errors from this, as we have no sane way of
3078	   dealing with them.
3079	*/
3080	tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
3081	seqnum++;
3082	tdb_ofs_write(tdb, TDB_SEQNUM_OFS, &seqnum);
3083}
3084
3085/*
3086  increment the tdb sequence number if the tdb has been opened using
3087  the TDB_SEQNUM flag
3088*/
3089static void tdb_increment_seqnum(struct tdb_context *tdb)
3090{
3091	if (!(tdb->flags & TDB_SEQNUM)) {
3092		return;
3093	}
3094
3095	if (tdb_brlock(tdb, TDB_SEQNUM_OFS, F_WRLCK, F_SETLKW, 1, 1) != 0) {
3096		return;
3097	}
3098
3099	tdb_increment_seqnum_nonblock(tdb);
3100
3101	tdb_brlock(tdb, TDB_SEQNUM_OFS, F_UNLCK, F_SETLKW, 1, 1);
3102}
3103
3104static int tdb_key_compare(TDB_DATA key, TDB_DATA data, void *private_data)
3105{
3106	return memcmp(data.dptr, key.dptr, data.dsize);
3107}
3108
3109/* Returns 0 on fail.  On success, return offset of record, and fills
3110   in rec */
3111static tdb_off_t tdb_find(struct tdb_context *tdb, TDB_DATA key, u32 hash,
3112			struct list_struct *r)
3113{
3114	tdb_off_t rec_ptr;
3115
3116	/* read in the hash top */
3117	if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
3118		return 0;
3119
3120	/* keep looking until we find the right record */
3121	while (rec_ptr) {
3122		if (tdb_rec_read(tdb, rec_ptr, r) == -1)
3123			return 0;
3124
3125		if (!TDB_DEAD(r) && hash==r->full_hash
3126		    && key.dsize==r->key_len
3127		    && tdb_parse_data(tdb, key, rec_ptr + sizeof(*r),
3128				      r->key_len, tdb_key_compare,
3129				      NULL) == 0) {
3130			return rec_ptr;
3131		}
3132		rec_ptr = r->next;
3133	}
3134	return TDB_ERRCODE(TDB_ERR_NOEXIST, 0);
3135}
3136
3137/* As tdb_find, but if you succeed, keep the lock */
3138tdb_off_t tdb_find_lock_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash, int locktype,
3139			   struct list_struct *rec)
3140{
3141	u32 rec_ptr;
3142
3143	if (tdb_lock(tdb, BUCKET(hash), locktype) == -1)
3144		return 0;
3145	if (!(rec_ptr = tdb_find(tdb, key, hash, rec)))
3146		tdb_unlock(tdb, BUCKET(hash), locktype);
3147	return rec_ptr;
3148}
3149
3150
3151/* update an entry in place - this only works if the new data size
3152   is <= the old data size and the key exists.
3153   on failure return -1.
3154*/
3155static int tdb_update_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash, TDB_DATA dbuf)
3156{
3157	struct list_struct rec;
3158	tdb_off_t rec_ptr;
3159
3160	/* find entry */
3161	if (!(rec_ptr = tdb_find(tdb, key, hash, &rec)))
3162		return -1;
3163
3164	/* must be long enough key, data and tailer */
3165	if (rec.rec_len < key.dsize + dbuf.dsize + sizeof(tdb_off_t)) {
3166		tdb->ecode = TDB_SUCCESS; /* Not really an error */
3167		return -1;
3168	}
3169
3170	if (tdb->methods->tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len,
3171		      dbuf.dptr, dbuf.dsize) == -1)
3172		return -1;
3173
3174	if (dbuf.dsize != rec.data_len) {
3175		/* update size */
3176		rec.data_len = dbuf.dsize;
3177		return tdb_rec_write(tdb, rec_ptr, &rec);
3178	}
3179
3180	return 0;
3181}
3182
3183/* find an entry in the database given a key */
3184/* If an entry doesn't exist tdb_err will be set to
3185 * TDB_ERR_NOEXIST. If a key has no data attached
3186 * then the TDB_DATA will have zero length but
3187 * a non-zero pointer
3188 */
3189TDB_DATA tdb_fetch(struct tdb_context *tdb, TDB_DATA key)
3190{
3191	tdb_off_t rec_ptr;
3192	struct list_struct rec;
3193	TDB_DATA ret;
3194	u32 hash;
3195
3196	/* find which hash bucket it is in */
3197	hash = tdb->hash_fn(&key);
3198	if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec)))
3199		return tdb_null;
3200
3201	ret.dptr = tdb_alloc_read(tdb, rec_ptr + sizeof(rec) + rec.key_len,
3202				  rec.data_len);
3203	ret.dsize = rec.data_len;
3204	tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
3205	return ret;
3206}
3207
3208/*
3209 * Find an entry in the database and hand the record's data to a parsing
3210 * function. The parsing function is executed under the chain read lock, so it
3211 * should be fast and should not block on other syscalls.
3212 *
3213 * DONT CALL OTHER TDB CALLS FROM THE PARSER, THIS MIGHT LEAD TO SEGFAULTS.
3214 *
3215 * For mmapped tdb's that do not have a transaction open it points the parsing
3216 * function directly at the mmap area, it avoids the malloc/memcpy in this
3217 * case. If a transaction is open or no mmap is available, it has to do
3218 * malloc/read/parse/free.
3219 *
3220 * This is interesting for all readers of potentially large data structures in
3221 * the tdb records, ldb indexes being one example.
3222 */
3223
3224int tdb_parse_record(struct tdb_context *tdb, TDB_DATA key,
3225		     int (*parser)(TDB_DATA key, TDB_DATA data,
3226				   void *private_data),
3227		     void *private_data)
3228{
3229	tdb_off_t rec_ptr;
3230	struct list_struct rec;
3231	int ret;
3232	u32 hash;
3233
3234	/* find which hash bucket it is in */
3235	hash = tdb->hash_fn(&key);
3236
3237	if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec))) {
3238		return TDB_ERRCODE(TDB_ERR_NOEXIST, 0);
3239	}
3240
3241	ret = tdb_parse_data(tdb, key, rec_ptr + sizeof(rec) + rec.key_len,
3242			     rec.data_len, parser, private_data);
3243
3244	tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
3245
3246	return ret;
3247}
3248
3249/* check if an entry in the database exists
3250
3251   note that 1 is returned if the key is found and 0 is returned if not found
3252   this doesn't match the conventions in the rest of this module, but is
3253   compatible with gdbm
3254*/
3255static int tdb_exists_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash)
3256{
3257	struct list_struct rec;
3258
3259	if (tdb_find_lock_hash(tdb, key, hash, F_RDLCK, &rec) == 0)
3260		return 0;
3261	tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
3262	return 1;
3263}
3264
3265int tdb_exists(struct tdb_context *tdb, TDB_DATA key)
3266{
3267	u32 hash = tdb->hash_fn(&key);
3268	return tdb_exists_hash(tdb, key, hash);
3269}
3270
3271/* actually delete an entry in the database given the offset */
3272int tdb_do_delete(struct tdb_context *tdb, tdb_off_t rec_ptr, struct list_struct*rec)
3273{
3274	tdb_off_t last_ptr, i;
3275	struct list_struct lastrec;
3276
3277	if (tdb->read_only || tdb->traverse_read) return -1;
3278
3279	if (tdb_write_lock_record(tdb, rec_ptr) == -1) {
3280		/* Someone traversing here: mark it as dead */
3281		rec->magic = TDB_DEAD_MAGIC;
3282		return tdb_rec_write(tdb, rec_ptr, rec);
3283	}
3284	if (tdb_write_unlock_record(tdb, rec_ptr) != 0)
3285		return -1;
3286
3287	/* find previous record in hash chain */
3288	if (tdb_ofs_read(tdb, TDB_HASH_TOP(rec->full_hash), &i) == -1)
3289		return -1;
3290	for (last_ptr = 0; i != rec_ptr; last_ptr = i, i = lastrec.next)
3291		if (tdb_rec_read(tdb, i, &lastrec) == -1)
3292			return -1;
3293
3294	/* unlink it: next ptr is at start of record. */
3295	if (last_ptr == 0)
3296		last_ptr = TDB_HASH_TOP(rec->full_hash);
3297	if (tdb_ofs_write(tdb, last_ptr, &rec->next) == -1)
3298		return -1;
3299
3300	/* recover the space */
3301	if (tdb_free(tdb, rec_ptr, rec) == -1)
3302		return -1;
3303	return 0;
3304}
3305
3306static int tdb_count_dead(struct tdb_context *tdb, u32 hash)
3307{
3308	int res = 0;
3309	tdb_off_t rec_ptr;
3310	struct list_struct rec;
3311
3312	/* read in the hash top */
3313	if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
3314		return 0;
3315
3316	while (rec_ptr) {
3317		if (tdb_rec_read(tdb, rec_ptr, &rec) == -1)
3318			return 0;
3319
3320		if (rec.magic == TDB_DEAD_MAGIC) {
3321			res += 1;
3322		}
3323		rec_ptr = rec.next;
3324	}
3325	return res;
3326}
3327
3328/*
3329 * Purge all DEAD records from a hash chain
3330 */
3331static int tdb_purge_dead(struct tdb_context *tdb, u32 hash)
3332{
3333	int res = -1;
3334	struct list_struct rec;
3335	tdb_off_t rec_ptr;
3336
3337	if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
3338		return -1;
3339	}
3340
3341	/* read in the hash top */
3342	if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
3343		goto fail;
3344
3345	while (rec_ptr) {
3346		tdb_off_t next;
3347
3348		if (tdb_rec_read(tdb, rec_ptr, &rec) == -1) {
3349			goto fail;
3350		}
3351
3352		next = rec.next;
3353
3354		if (rec.magic == TDB_DEAD_MAGIC
3355		    && tdb_do_delete(tdb, rec_ptr, &rec) == -1) {
3356			goto fail;
3357		}
3358		rec_ptr = next;
3359	}
3360	res = 0;
3361 fail:
3362	tdb_unlock(tdb, -1, F_WRLCK);
3363	return res;
3364}
3365
3366/* delete an entry in the database given a key */
3367static int tdb_delete_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash)
3368{
3369	tdb_off_t rec_ptr;
3370	struct list_struct rec;
3371	int ret;
3372
3373	if (tdb->max_dead_records != 0) {
3374
3375		/*
3376		 * Allow for some dead records per hash chain, mainly for
3377		 * tdb's with a very high create/delete rate like locking.tdb.
3378		 */
3379
3380		if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
3381			return -1;
3382
3383		if (tdb_count_dead(tdb, hash) >= tdb->max_dead_records) {
3384			/*
3385			 * Don't let the per-chain freelist grow too large,
3386			 * delete all existing dead records
3387			 */
3388			tdb_purge_dead(tdb, hash);
3389		}
3390
3391		if (!(rec_ptr = tdb_find(tdb, key, hash, &rec))) {
3392			tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
3393			return -1;
3394		}
3395
3396		/*
3397		 * Just mark the record as dead.
3398		 */
3399		rec.magic = TDB_DEAD_MAGIC;
3400		ret = tdb_rec_write(tdb, rec_ptr, &rec);
3401	}
3402	else {
3403		if (!(rec_ptr = tdb_find_lock_hash(tdb, key, hash, F_WRLCK,
3404						   &rec)))
3405			return -1;
3406
3407		ret = tdb_do_delete(tdb, rec_ptr, &rec);
3408	}
3409
3410	if (ret == 0) {
3411		tdb_increment_seqnum(tdb);
3412	}
3413
3414	if (tdb_unlock(tdb, BUCKET(rec.full_hash), F_WRLCK) != 0)
3415		TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_delete: WARNING tdb_unlock failed!\n"));
3416	return ret;
3417}
3418
3419int tdb_delete(struct tdb_context *tdb, TDB_DATA key)
3420{
3421	u32 hash = tdb->hash_fn(&key);
3422	return tdb_delete_hash(tdb, key, hash);
3423}
3424
3425/*
3426 * See if we have a dead record around with enough space
3427 */
3428static tdb_off_t tdb_find_dead(struct tdb_context *tdb, u32 hash,
3429			       struct list_struct *r, tdb_len_t length)
3430{
3431	tdb_off_t rec_ptr;
3432
3433	/* read in the hash top */
3434	if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
3435		return 0;
3436
3437	/* keep looking until we find the right record */
3438	while (rec_ptr) {
3439		if (tdb_rec_read(tdb, rec_ptr, r) == -1)
3440			return 0;
3441
3442		if (TDB_DEAD(r) && r->rec_len >= length) {
3443			/*
3444			 * First fit for simple coding, TODO: change to best
3445			 * fit
3446			 */
3447			return rec_ptr;
3448		}
3449		rec_ptr = r->next;
3450	}
3451	return 0;
3452}
3453
3454/* store an element in the database, replacing any existing element
3455   with the same key
3456
3457   return 0 on success, -1 on failure
3458*/
3459int tdb_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
3460{
3461	struct list_struct rec;
3462	u32 hash;
3463	tdb_off_t rec_ptr;
3464	char *p = NULL;
3465	int ret = -1;
3466
3467	if (tdb->read_only || tdb->traverse_read) {
3468		tdb->ecode = TDB_ERR_RDONLY;
3469		return -1;
3470	}
3471
3472	/* find which hash bucket it is in */
3473	hash = tdb->hash_fn(&key);
3474	if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
3475		return -1;
3476
3477	/* check for it existing, on insert. */
3478	if (flag == TDB_INSERT) {
3479		if (tdb_exists_hash(tdb, key, hash)) {
3480			tdb->ecode = TDB_ERR_EXISTS;
3481			goto fail;
3482		}
3483	} else {
3484		/* first try in-place update, on modify or replace. */
3485		if (tdb_update_hash(tdb, key, hash, dbuf) == 0) {
3486			goto done;
3487		}
3488		if (tdb->ecode == TDB_ERR_NOEXIST &&
3489		    flag == TDB_MODIFY) {
3490			/* if the record doesn't exist and we are in TDB_MODIFY mode then
3491			 we should fail the store */
3492			goto fail;
3493		}
3494	}
3495	/* reset the error code potentially set by the tdb_update() */
3496	tdb->ecode = TDB_SUCCESS;
3497
3498	/* delete any existing record - if it doesn't exist we don't
3499           care.  Doing this first reduces fragmentation, and avoids
3500           coalescing with `allocated' block before it's updated. */
3501	if (flag != TDB_INSERT)
3502		tdb_delete_hash(tdb, key, hash);
3503
3504	/* Copy key+value *before* allocating free space in case malloc
3505	   fails and we are left with a dead spot in the tdb. */
3506
3507	if (!(p = (char *)malloc(key.dsize + dbuf.dsize))) {
3508		tdb->ecode = TDB_ERR_OOM;
3509		goto fail;
3510	}
3511
3512	memcpy(p, key.dptr, key.dsize);
3513	if (dbuf.dsize)
3514		memcpy(p+key.dsize, dbuf.dptr, dbuf.dsize);
3515
3516	if (tdb->max_dead_records != 0) {
3517		/*
3518		 * Allow for some dead records per hash chain, look if we can
3519		 * find one that can hold the new record. We need enough space
3520		 * for key, data and tailer. If we find one, we don't have to
3521		 * consult the central freelist.
3522		 */
3523		rec_ptr = tdb_find_dead(
3524			tdb, hash, &rec,
3525			key.dsize + dbuf.dsize + sizeof(tdb_off_t));
3526
3527		if (rec_ptr != 0) {
3528			rec.key_len = key.dsize;
3529			rec.data_len = dbuf.dsize;
3530			rec.full_hash = hash;
3531			rec.magic = TDB_MAGIC;
3532			if (tdb_rec_write(tdb, rec_ptr, &rec) == -1
3533			    || tdb->methods->tdb_write(
3534				    tdb, rec_ptr + sizeof(rec),
3535				    p, key.dsize + dbuf.dsize) == -1) {
3536				goto fail;
3537			}
3538			goto done;
3539		}
3540	}
3541
3542	/*
3543	 * We have to allocate some space from the freelist, so this means we
3544	 * have to lock it. Use the chance to purge all the DEAD records from
3545	 * the hash chain under the freelist lock.
3546	 */
3547
3548	if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
3549		goto fail;
3550	}
3551
3552	if ((tdb->max_dead_records != 0)
3553	    && (tdb_purge_dead(tdb, hash) == -1)) {
3554		tdb_unlock(tdb, -1, F_WRLCK);
3555		goto fail;
3556	}
3557
3558	/* we have to allocate some space */
3559	rec_ptr = tdb_allocate(tdb, key.dsize + dbuf.dsize, &rec);
3560
3561	tdb_unlock(tdb, -1, F_WRLCK);
3562
3563	if (rec_ptr == 0) {
3564		goto fail;
3565	}
3566
3567	/* Read hash top into next ptr */
3568	if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
3569		goto fail;
3570
3571	rec.key_len = key.dsize;
3572	rec.data_len = dbuf.dsize;
3573	rec.full_hash = hash;
3574	rec.magic = TDB_MAGIC;
3575
3576	/* write out and point the top of the hash chain at it */
3577	if (tdb_rec_write(tdb, rec_ptr, &rec) == -1
3578	    || tdb->methods->tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+dbuf.dsize)==-1
3579	    || tdb_ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
3580		/* Need to tdb_unallocate() here */
3581		goto fail;
3582	}
3583
3584 done:
3585	ret = 0;
3586 fail:
3587	if (ret == 0) {
3588		tdb_increment_seqnum(tdb);
3589	}
3590
3591	SAFE_FREE(p);
3592	tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
3593	return ret;
3594}
3595
3596
3597/* Append to an entry. Create if not exist. */
3598int tdb_append(struct tdb_context *tdb, TDB_DATA key, TDB_DATA new_dbuf)
3599{
3600	u32 hash;
3601	TDB_DATA dbuf;
3602	int ret = -1;
3603
3604	/* find which hash bucket it is in */
3605	hash = tdb->hash_fn(&key);
3606	if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
3607		return -1;
3608
3609	dbuf = tdb_fetch(tdb, key);
3610
3611	if (dbuf.dptr == NULL) {
3612		dbuf.dptr = (unsigned char *)malloc(new_dbuf.dsize);
3613	} else {
3614		unsigned char *new_dptr = (unsigned char *)realloc(dbuf.dptr,
3615						     dbuf.dsize + new_dbuf.dsize);
3616		if (new_dptr == NULL) {
3617			free(dbuf.dptr);
3618		}
3619		dbuf.dptr = new_dptr;
3620	}
3621
3622	if (dbuf.dptr == NULL) {
3623		tdb->ecode = TDB_ERR_OOM;
3624		goto failed;
3625	}
3626
3627	memcpy(dbuf.dptr + dbuf.dsize, new_dbuf.dptr, new_dbuf.dsize);
3628	dbuf.dsize += new_dbuf.dsize;
3629
3630	ret = tdb_store(tdb, key, dbuf, 0);
3631
3632failed:
3633	tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
3634	SAFE_FREE(dbuf.dptr);
3635	return ret;
3636}
3637
3638
3639/*
3640  return the name of the current tdb file
3641  useful for external logging functions
3642*/
3643const char *tdb_name(struct tdb_context *tdb)
3644{
3645	return tdb->name;
3646}
3647
3648/*
3649  return the underlying file descriptor being used by tdb, or -1
3650  useful for external routines that want to check the device/inode
3651  of the fd
3652*/
3653int tdb_fd(struct tdb_context *tdb)
3654{
3655	return tdb->fd;
3656}
3657
3658/*
3659  return the current logging function
3660  useful for external tdb routines that wish to log tdb errors
3661*/
3662tdb_log_func tdb_log_fn(struct tdb_context *tdb)
3663{
3664	return tdb->log.log_fn;
3665}
3666
3667
3668/*
3669  get the tdb sequence number. Only makes sense if the writers opened
3670  with TDB_SEQNUM set. Note that this sequence number will wrap quite
3671  quickly, so it should only be used for a 'has something changed'
3672  test, not for code that relies on the count of the number of changes
3673  made. If you want a counter then use a tdb record.
3674
3675  The aim of this sequence number is to allow for a very lightweight
3676  test of a possible tdb change.
3677*/
3678int tdb_get_seqnum(struct tdb_context *tdb)
3679{
3680	tdb_off_t seqnum=0;
3681
3682	tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
3683	return seqnum;
3684}
3685
3686int tdb_hash_size(struct tdb_context *tdb)
3687{
3688	return tdb->header.hash_size;
3689}
3690
3691size_t tdb_map_size(struct tdb_context *tdb)
3692{
3693	return tdb->map_size;
3694}
3695
3696int tdb_get_flags(struct tdb_context *tdb)
3697{
3698	return tdb->flags;
3699}
3700
3701
3702/*
3703  enable sequence number handling on an open tdb
3704*/
3705void tdb_enable_seqnum(struct tdb_context *tdb)
3706{
3707	tdb->flags |= TDB_SEQNUM;
3708}
3709
3710/* file: open.c */
3711
3712/* all contexts, to ensure no double-opens (fcntl locks don't nest!) */
3713static struct tdb_context *tdbs = NULL;
3714
3715
3716/* This is from a hash algorithm suggested by Rogier Wolff */
3717static unsigned int default_tdb_hash(TDB_DATA *key)
3718{
3719	u32 value;	/* Used to compute the hash value.  */
3720	u32   i;	/* Used to cycle through random values. */
3721
3722	/* Set the initial value from the key size. */
3723	for (value = 0, i=0; i < key->dsize; i++)
3724		value = value * 256 + key->dptr[i] + (value >> 24) * 241;
3725
3726	return value;
3727}
3728
3729
3730/* initialise a new database with a specified hash size */
3731static int tdb_new_database(struct tdb_context *tdb, int hash_size)
3732{
3733	struct tdb_header *newdb;
3734	int size, ret = -1;
3735
3736	/* We make it up in memory, then write it out if not internal */
3737	size = sizeof(struct tdb_header) + (hash_size+1)*sizeof(tdb_off_t);
3738	if (!(newdb = (struct tdb_header *)calloc(size, 1)))
3739		return TDB_ERRCODE(TDB_ERR_OOM, -1);
3740
3741	/* Fill in the header */
3742	newdb->version = TDB_VERSION;
3743	newdb->hash_size = hash_size;
3744	if (tdb->flags & TDB_INTERNAL) {
3745		tdb->map_size = size;
3746		tdb->map_ptr = (char *)newdb;
3747		memcpy(&tdb->header, newdb, sizeof(tdb->header));
3748		/* Convert the `ondisk' version if asked. */
3749		CONVERT(*newdb);
3750		return 0;
3751	}
3752	if (lseek(tdb->fd, 0, SEEK_SET) == -1)
3753		goto fail;
3754
3755	if (ftruncate(tdb->fd, 0) == -1)
3756		goto fail;
3757
3758	/* This creates an endian-converted header, as if read from disk */
3759	CONVERT(*newdb);
3760	memcpy(&tdb->header, newdb, sizeof(tdb->header));
3761	/* Don't endian-convert the magic food! */
3762	memcpy(newdb->magic_food, TDB_MAGIC_FOOD, strlen(TDB_MAGIC_FOOD)+1);
3763	if (write(tdb->fd, newdb, size) != size) {
3764		ret = -1;
3765	} else {
3766		ret = 0;
3767	}
3768
3769  fail:
3770	SAFE_FREE(newdb);
3771	return ret;
3772}
3773
3774
3775
3776static int tdb_already_open(dev_t device,
3777			    ino_t ino)
3778{
3779	struct tdb_context *i;
3780
3781	for (i = tdbs; i; i = i->next) {
3782		if (i->device == device && i->inode == ino) {
3783			return 1;
3784		}
3785	}
3786
3787	return 0;
3788}
3789
3790/* open the database, creating it if necessary
3791
3792   The open_flags and mode are passed straight to the open call on the
3793   database file. A flags value of O_WRONLY is invalid. The hash size
3794   is advisory, use zero for a default value.
3795
3796   Return is NULL on error, in which case errno is also set.  Don't
3797   try to call tdb_error or tdb_errname, just do strerror(errno).
3798
3799   @param name may be NULL for internal databases. */
3800struct tdb_context *tdb_open(const char *name, int hash_size, int tdb_flags,
3801		      int open_flags, mode_t mode)
3802{
3803	return tdb_open_ex(name, hash_size, tdb_flags, open_flags, mode, NULL, NULL);
3804}
3805
3806/* a default logging function */
3807static void null_log_fn(struct tdb_context *tdb, enum tdb_debug_level level, const char *fmt, ...) PRINTF_ATTRIBUTE(3, 4);
3808static void null_log_fn(struct tdb_context *tdb, enum tdb_debug_level level, const char *fmt, ...)
3809{
3810}
3811
3812
3813struct tdb_context *tdb_open_ex(const char *name, int hash_size, int tdb_flags,
3814				int open_flags, mode_t mode,
3815				const struct tdb_logging_context *log_ctx,
3816				tdb_hash_func hash_fn)
3817{
3818	struct tdb_context *tdb;
3819	struct stat st;
3820	int rev = 0, locked = 0;
3821	unsigned char *vp;
3822	u32 vertest;
3823
3824	if (!(tdb = (struct tdb_context *)calloc(1, sizeof *tdb))) {
3825		/* Can't log this */
3826		errno = ENOMEM;
3827		goto fail;
3828	}
3829	tdb_io_init(tdb);
3830	tdb->fd = -1;
3831	tdb->name = NULL;
3832	tdb->map_ptr = NULL;
3833	tdb->flags = tdb_flags;
3834	tdb->open_flags = open_flags;
3835	if (log_ctx) {
3836		tdb->log = *log_ctx;
3837	} else {
3838		tdb->log.log_fn = null_log_fn;
3839		tdb->log.log_private = NULL;
3840	}
3841	tdb->hash_fn = hash_fn ? hash_fn : default_tdb_hash;
3842
3843	/* cache the page size */
3844	tdb->page_size = sysconf(_SC_PAGESIZE);
3845	if (tdb->page_size <= 0) {
3846		tdb->page_size = 0x2000;
3847	}
3848
3849	if ((open_flags & O_ACCMODE) == O_WRONLY) {
3850		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: can't open tdb %s write-only\n",
3851			 name));
3852		errno = EINVAL;
3853		goto fail;
3854	}
3855
3856	if (hash_size == 0)
3857		hash_size = DEFAULT_HASH_SIZE;
3858	if ((open_flags & O_ACCMODE) == O_RDONLY) {
3859		tdb->read_only = 1;
3860		/* read only databases don't do locking or clear if first */
3861		tdb->flags |= TDB_NOLOCK;
3862		tdb->flags &= ~TDB_CLEAR_IF_FIRST;
3863	}
3864
3865	/* internal databases don't mmap or lock, and start off cleared */
3866	if (tdb->flags & TDB_INTERNAL) {
3867		tdb->flags |= (TDB_NOLOCK | TDB_NOMMAP);
3868		tdb->flags &= ~TDB_CLEAR_IF_FIRST;
3869		if (tdb_new_database(tdb, hash_size) != 0) {
3870			TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: tdb_new_database failed!"));
3871			goto fail;
3872		}
3873		goto internal;
3874	}
3875
3876	if ((tdb->fd = open(name, open_flags, mode)) == -1) {
3877		TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_open_ex: could not open file %s: %s\n",
3878			 name, strerror(errno)));
3879		goto fail;	/* errno set by open(2) */
3880	}
3881
3882	/* ensure there is only one process initialising at once */
3883	if (tdb->methods->tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
3884		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: failed to get global lock on %s: %s\n",
3885			 name, strerror(errno)));
3886		goto fail;	/* errno set by tdb_brlock */
3887	}
3888
3889	/* we need to zero database if we are the only one with it open */
3890	if ((tdb_flags & TDB_CLEAR_IF_FIRST) &&
3891	    (locked = (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_WRLCK, F_SETLK, 0, 1) == 0))) {
3892		open_flags |= O_CREAT;
3893		if (ftruncate(tdb->fd, 0) == -1) {
3894			TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_open_ex: "
3895				 "failed to truncate %s: %s\n",
3896				 name, strerror(errno)));
3897			goto fail; /* errno set by ftruncate */
3898		}
3899	}
3900
3901	if (read(tdb->fd, &tdb->header, sizeof(tdb->header)) != sizeof(tdb->header)
3902	    || strcmp(tdb->header.magic_food, TDB_MAGIC_FOOD) != 0
3903	    || (tdb->header.version != TDB_VERSION
3904		&& !(rev = (tdb->header.version==TDB_BYTEREV(TDB_VERSION))))) {
3905		/* its not a valid database - possibly initialise it */
3906		if (!(open_flags & O_CREAT) || tdb_new_database(tdb, hash_size) == -1) {
3907			errno = EIO; /* ie bad format or something */
3908			goto fail;
3909		}
3910		rev = (tdb->flags & TDB_CONVERT);
3911	}
3912	vp = (unsigned char *)&tdb->header.version;
3913	vertest = (((u32)vp[0]) << 24) | (((u32)vp[1]) << 16) |
3914		  (((u32)vp[2]) << 8) | (u32)vp[3];
3915	tdb->flags |= (vertest==TDB_VERSION) ? TDB_BIGENDIAN : 0;
3916	if (!rev)
3917		tdb->flags &= ~TDB_CONVERT;
3918	else {
3919		tdb->flags |= TDB_CONVERT;
3920		tdb_convert(&tdb->header, sizeof(tdb->header));
3921	}
3922	if (fstat(tdb->fd, &st) == -1)
3923		goto fail;
3924
3925	if (tdb->header.rwlocks != 0) {
3926		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: spinlocks no longer supported\n"));
3927		goto fail;
3928	}
3929
3930	/* Is it already in the open list?  If so, fail. */
3931	if (tdb_already_open(st.st_dev, st.st_ino)) {
3932		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: "
3933			 "%s (%d,%d) is already open in this process\n",
3934			 name, (int)st.st_dev, (int)st.st_ino));
3935		errno = EBUSY;
3936		goto fail;
3937	}
3938
3939	if (!(tdb->name = (char *)strdup(name))) {
3940		errno = ENOMEM;
3941		goto fail;
3942	}
3943
3944	tdb->map_size = st.st_size;
3945	tdb->device = st.st_dev;
3946	tdb->inode = st.st_ino;
3947	tdb->max_dead_records = 0;
3948	tdb_mmap(tdb);
3949	if (locked) {
3950		if (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_UNLCK, F_SETLK, 0, 1) == -1) {
3951			TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: "
3952				 "failed to take ACTIVE_LOCK on %s: %s\n",
3953				 name, strerror(errno)));
3954			goto fail;
3955		}
3956
3957	}
3958
3959	/* We always need to do this if the CLEAR_IF_FIRST flag is set, even if
3960	   we didn't get the initial exclusive lock as we need to let all other
3961	   users know we're using it. */
3962
3963	if (tdb_flags & TDB_CLEAR_IF_FIRST) {
3964		/* leave this lock in place to indicate it's in use */
3965		if (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0, 1) == -1)
3966			goto fail;
3967	}
3968
3969	/* if needed, run recovery */
3970	if (tdb_transaction_recover(tdb) == -1) {
3971		goto fail;
3972	}
3973
3974 internal:
3975	/* Internal (memory-only) databases skip all the code above to
3976	 * do with disk files, and resume here by releasing their
3977	 * global lock and hooking into the active list. */
3978	if (tdb->methods->tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1) == -1)
3979		goto fail;
3980	tdb->next = tdbs;
3981	tdbs = tdb;
3982	return tdb;
3983
3984 fail:
3985	{ int save_errno = errno;
3986
3987	if (!tdb)
3988		return NULL;
3989
3990	if (tdb->map_ptr) {
3991		if (tdb->flags & TDB_INTERNAL)
3992			SAFE_FREE(tdb->map_ptr);
3993		else
3994			tdb_munmap(tdb);
3995	}
3996	SAFE_FREE(tdb->name);
3997	if (tdb->fd != -1)
3998		if (close(tdb->fd) != 0)
3999			TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: failed to close tdb->fd on error!\n"));
4000	SAFE_FREE(tdb);
4001	errno = save_errno;
4002	return NULL;
4003	}
4004}
4005
4006/*
4007 * Set the maximum number of dead records per hash chain
4008 */
4009
4010void tdb_set_max_dead(struct tdb_context *tdb, int max_dead)
4011{
4012	tdb->max_dead_records = max_dead;
4013}
4014
4015/**
4016 * Close a database.
4017 *
4018 * @returns -1 for error; 0 for success.
4019 **/
4020int tdb_close(struct tdb_context *tdb)
4021{
4022	struct tdb_context **i;
4023	int ret = 0;
4024
4025	if (tdb->transaction) {
4026		tdb_transaction_cancel(tdb);
4027	}
4028
4029	if (tdb->map_ptr) {
4030		if (tdb->flags & TDB_INTERNAL)
4031			SAFE_FREE(tdb->map_ptr);
4032		else
4033			tdb_munmap(tdb);
4034	}
4035	SAFE_FREE(tdb->name);
4036	if (tdb->fd != -1)
4037		ret = close(tdb->fd);
4038	SAFE_FREE(tdb->lockrecs);
4039
4040	/* Remove from contexts list */
4041	for (i = &tdbs; *i; i = &(*i)->next) {
4042		if (*i == tdb) {
4043			*i = tdb->next;
4044			break;
4045		}
4046	}
4047
4048	memset(tdb, 0, sizeof(*tdb));
4049	SAFE_FREE(tdb);
4050
4051	return ret;
4052}
4053
4054/* register a loging function */
4055void tdb_set_logging_function(struct tdb_context *tdb,
4056                              const struct tdb_logging_context *log_ctx)
4057{
4058        tdb->log = *log_ctx;
4059}
4060
4061void *tdb_get_logging_private(struct tdb_context *tdb)
4062{
4063	return tdb->log.log_private;
4064}
4065
4066/* reopen a tdb - this can be used after a fork to ensure that we have an independent
4067   seek pointer from our parent and to re-establish locks */
4068int tdb_reopen(struct tdb_context *tdb)
4069{
4070	struct stat st;
4071
4072	if (tdb->flags & TDB_INTERNAL) {
4073		return 0; /* Nothing to do. */
4074	}
4075
4076	if (tdb->num_locks != 0 || tdb->global_lock.count) {
4077		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_reopen: reopen not allowed with locks held\n"));
4078		goto fail;
4079	}
4080
4081	if (tdb->transaction != 0) {
4082		TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_reopen: reopen not allowed inside a transaction\n"));
4083		goto fail;
4084	}
4085
4086	if (tdb_munmap(tdb) != 0) {
4087		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: munmap failed (%s)\n", strerror(errno)));
4088		goto fail;
4089	}
4090	if (close(tdb->fd) != 0)
4091		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: WARNING closing tdb->fd failed!\n"));
4092	tdb->fd = open(tdb->name, tdb->open_flags & ~(O_CREAT|O_TRUNC), 0);
4093	if (tdb->fd == -1) {
4094		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: open failed (%s)\n", strerror(errno)));
4095		goto fail;
4096	}
4097	if ((tdb->flags & TDB_CLEAR_IF_FIRST) &&
4098	    (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0, 1) == -1)) {
4099		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: failed to obtain active lock\n"));
4100		goto fail;
4101	}
4102	if (fstat(tdb->fd, &st) != 0) {
4103		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: fstat failed (%s)\n", strerror(errno)));
4104		goto fail;
4105	}
4106	if (st.st_ino != tdb->inode || st.st_dev != tdb->device) {
4107		TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: file dev/inode has changed!\n"));
4108		goto fail;
4109	}
4110	tdb_mmap(tdb);
4111
4112	return 0;
4113
4114fail:
4115	tdb_close(tdb);
4116	return -1;
4117}
4118
4119/* reopen all tdb's */
4120int tdb_reopen_all(int parent_longlived)
4121{
4122	struct tdb_context *tdb;
4123
4124	for (tdb=tdbs; tdb; tdb = tdb->next) {
4125		/*
4126		 * If the parent is longlived (ie. a
4127		 * parent daemon architecture), we know
4128		 * it will keep it's active lock on a
4129		 * tdb opened with CLEAR_IF_FIRST. Thus
4130		 * for child processes we don't have to
4131		 * add an active lock. This is essential
4132		 * to improve performance on systems that
4133		 * keep POSIX locks as a non-scalable data
4134		 * structure in the kernel.
4135		 */
4136		if (parent_longlived) {
4137			/* Ensure no clear-if-first. */
4138			tdb->flags &= ~TDB_CLEAR_IF_FIRST;
4139		}
4140
4141		if (tdb_reopen(tdb) != 0)
4142			return -1;
4143	}
4144
4145	return 0;
4146}
4147
4148/**
4149 * Flush a database file from the page cache.
4150 **/
4151int tdb_flush(struct tdb_context *tdb)
4152{
4153	if (tdb->fd != -1)
4154		return fsync(tdb->fd);
4155	return 0;
4156}
4157