1/*
2 * Copyright 2001-2004 Brandon Long
3 * All Rights Reserved.
4 *
5 * ClearSilver Templating System
6 *
7 * This code is made available under the terms of the ClearSilver License.
8 * http://www.clearsilver.net/license.hdf
9 *
10 */
11/*
12 * The wdb is a wrapper around the sleepycat db library which adds
13 * a relatively simple data/column definition.  In many respects, this
14 * code is way more complicated than it ever needed to be, but it works,
15 * so I'm loathe to "fix" it.
16 *
17 * One of they key features of this is the ability to update the
18 * "schema" of the wdb without changing all of the existing rows of
19 * data.
20 */
21
22#include "cs_config.h"
23
24#include <unistd.h>
25#include <stdlib.h>
26#include <stdarg.h>
27#include <errno.h>
28#include <string.h>
29#include <limits.h>
30#include <db.h>
31#include <ctype.h>
32
33#include "neo_misc.h"
34#include "neo_err.h"
35#include "dict.h"
36#include "ulist.h"
37#include "skiplist.h"
38#include "wdb.h"
39
40
41#define DEFN_VERSION_1 "WDB-VERSION-200006301"
42#define PACK_VERSION_1 1
43
44static void string_rstrip (char *s)
45{
46  size_t len;
47
48  len = strlen(s);
49  len--;
50  while (len > 0 && isspace(s[len]))
51  {
52    s[len] = '\0';
53    len--;
54  }
55  return;
56}
57
58static int to_hex (unsigned char ch, unsigned char *s)
59{
60  unsigned int uvalue = ch;
61
62  s[1] = "0123456789ABCDEF"[uvalue % 16];
63  uvalue = (uvalue / 16);  s[0] = "0123456789ABCDEF"[uvalue % 16];
64  return 0;
65}
66
67int Index_hex[128] = {
68  -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1,
69  -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1,
70  -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1,
71  0, 1, 2, 3,  4, 5, 6, 7,  8, 9,-1,-1, -1,-1,-1,-1,
72  -1,10,11,12, 13,14,15,-1, -1,-1,-1,-1, -1,-1,-1,-1,
73  -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1,  -1,10,11,12, 13,14,15,-1, -1,-1,-1,-1, -1,-1,-1,-1,
74  -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1, -1,-1,-1,-1
75};
76
77#define hexval(c) Index_hex[(unsigned int)(c)]
78
79/* Encoding means any non-printable characters and : and % */
80static NEOERR *wdb_encode_str_alloc (const char *s, char **o)
81{
82  int x = 0;
83  int c = 0;
84  char *out;
85  unsigned char ch;
86
87  while (s[x])
88  {
89    ch = (unsigned char) s[x];
90    if ((ch < 32) || (ch > 127) || (ch == ':') || (ch == '%') || isspace(ch))
91      c++;
92    x++;
93  }
94
95  out = (char *) malloc (sizeof (char) * (x + c * 3) + 1);
96  if (out == NULL)
97    return nerr_raise (NERR_NOMEM,
98	"Unable to allocate memory for encoding %s", s);
99
100  x = 0;
101  c = 0;
102
103  *o = out;
104
105  while (s[x])
106  {
107    ch = (unsigned char) s[x];
108    if ((ch < 32) || (ch > 127) || (ch == ':') || (ch == '%') || isspace(ch))
109    {
110      out[c++] = '%';
111      to_hex (s[x], &out[c]);
112      c+=2;
113    }
114    else
115    {
116      out[c++] = s[x];
117    }
118    x++;
119  }
120  out[c] = '\0';
121
122  return STATUS_OK;
123}
124
125static NEOERR *wdb_decode_str_alloc (const char *s, char **o)
126{
127  int x = 0;
128  int c = 0;
129  char *out;
130  unsigned char ch;
131
132
133  x = strlen(s);
134  /* Overkill, the decoded string will be smaller */
135  out = (char *) malloc (sizeof (char) * (x + 1));
136  if (out == NULL)
137    return nerr_raise (NERR_NOMEM,
138	"Unable to allocate memory for decoding %s", s);
139
140  x = 0;
141  c = 0;
142
143  while (s[x])
144  {
145    if (s[x] == '%')
146    {
147      x++;
148      ch = hexval(s[x]) << 4;
149      x++;
150      ch |= hexval(s[x]);
151      out[c++] = ch;
152    }
153    else
154    {
155      out[c++] = s[x];
156    }
157    x++;
158  }
159  out[c] = '\0';
160  *o = out;
161
162  return STATUS_OK;
163}
164
165static void free_cb (void *value, void *rock)
166{
167  free (value);
168}
169
170static void free_col_cb (void *value, void *rock)
171{
172  WDBColumn *col;
173
174  col = (WDBColumn *)value;
175
176  free (col->name);
177  free (col);
178}
179
180static NEOERR *wdb_alloc (WDB **wdb, int flags)
181{
182  WDB *my_wdb;
183  NEOERR *err = STATUS_OK;
184
185  my_wdb = (WDB *) calloc (1, sizeof (WDB));
186
187  if (my_wdb == NULL)
188    return nerr_raise (NERR_NOMEM, "Unable to allocate memory for WDB");
189
190  do
191  {
192    err = dictCreate (&(my_wdb->attrs), 0, 2, 5, 0, 0, free_cb, NULL);
193    if (err != STATUS_OK) break;
194    err = dictCreate (&(my_wdb->cols), 0, 2, 5, 0, 0, free_col_cb, NULL);
195    if (err != STATUS_OK) break;
196    err = uListInit (&(my_wdb->cols_l), 0, 0);
197    if (err != STATUS_OK) break;
198    err = skipNewList(&(my_wdb->ondisk), 0, 4, 2, 0, NULL, NULL);
199    if (err != STATUS_OK) break;
200
201    *wdb = my_wdb;
202
203    return STATUS_OK;
204  } while (0);
205
206  wdb_destroy(&my_wdb);
207  return nerr_pass (err);
208}
209
210
211#define STATE_REQUIRED 1
212#define STATE_ATTRIBUTES 2
213#define STATE_COLUMN_DEF 3
214
215static NEOERR *wdb_load_defn_v1 (WDB *wdb, FILE *fp)
216{
217  char line[1024];
218  int state = 1;
219  char *k, *v;
220  NEOERR *err = STATUS_OK;
221  int colindex = 1;
222  WDBColumn *col;
223
224  while (fgets(line, sizeof(line), fp) != NULL)
225  {
226    string_rstrip(line);
227    switch (state)
228    {
229      case STATE_REQUIRED:
230	if (!strcmp(line, "attributes"))
231	  state = STATE_ATTRIBUTES;
232	else if (!strcmp(line, "columns"))
233	  state = STATE_COLUMN_DEF;
234	else
235	{
236	  k = line;
237	  v = strchr(line, ':');
238	  /* HACK */
239	  if (!strcmp(k, "name") && ((v == NULL) || (v[1] == '\0')))
240	  {
241	    v = "dNone";
242	  }
243	  else
244	  {
245	    if (v == NULL)
246	      return nerr_raise (NERR_PARSE, "Error parsing %s", line);
247	    if (v[1] == '\0')
248	      return nerr_raise (NERR_PARSE, "Error parsing %s", line);
249	  }
250	  v[0] = '\0';
251	  v++;
252	  if (!strcmp(k, "key"))
253	  {
254	    err = wdb_decode_str_alloc (v, &(wdb->key));
255	    if (err) return nerr_pass(err);
256	  }
257	  else if (!strcmp(k, "name"))
258	  {
259	    err = wdb_decode_str_alloc (v, &(wdb->name));
260	    if (err) return nerr_pass(err);
261	  }
262	  else if (!strcmp(k, "ondisk"))
263	  {
264	    wdb->last_ondisk = atoi (v);
265	  }
266	}
267	break;
268      case STATE_ATTRIBUTES:
269	if (!strcmp(line, "columns"))
270	  state = STATE_COLUMN_DEF;
271	else
272	{
273	  k = line;
274	  v = strchr(line, ':');
275	  if (v == NULL)
276	    return nerr_raise (NERR_PARSE, "Error parsing %s", line);
277	  v[0] = '\0';
278	  v++;
279	  err = wdb_decode_str_alloc (k, &k);
280	  if (err) return nerr_pass(err);
281	  err = wdb_decode_str_alloc (v, &v);
282	  if (err) return nerr_pass(err);
283	  err = dictSetValue(wdb->attrs, k, v);
284	  free(k);
285	  if (err)
286	    return nerr_pass_ctx(err, "Error parsing %s", line);
287	}
288	break;
289      case STATE_COLUMN_DEF:
290	k = line;
291	v = strchr(line, ':');
292	if (v == NULL)
293	  return nerr_raise (NERR_PARSE, "Error parsing %s", line);
294	if (v[1] == '\0')
295	  return nerr_raise (NERR_PARSE, "Error parsing %s", line);
296	v[0] = '\0';
297	v++;
298	err = wdb_decode_str_alloc (k, &k);
299	if (err) return nerr_pass(err);
300	col = (WDBColumn *) calloc (1, sizeof (WDBColumn));
301	col->name = k;
302	col->inmem_index = colindex++;
303	col->type = *v;
304	v+=2;
305	col->ondisk_index = atoi(v);
306	err = dictSetValue(wdb->cols, k, col);
307	if (err)
308	  return nerr_raise (NERR_PARSE, "Error parsing %s", line);
309	err = uListAppend(wdb->cols_l, col);
310	if (err) return nerr_pass(err);
311	/* stupid skiplist will assert */
312	if (col->ondisk_index == 0)
313	{
314	  return nerr_raise (NERR_ASSERT, "Invalid ondisk mapping for %s", k);
315	}
316	err = skipInsert (wdb->ondisk, col->ondisk_index,
317	    (void *)(col->inmem_index), 0);
318	if (err)
319	  return nerr_pass_ctx(err, "Unable to update ondisk mapping for %s", k);
320	break;
321      default:
322	return nerr_raise (NERR_ASSERT, "Invalid state %d", state);
323    }
324  }
325  return STATUS_OK;
326}
327
328static NEOERR *wdb_save_defn_v1 (WDB *wdb, FILE *fp)
329{
330  NEOERR *err = STATUS_OK;
331  WDBColumn *col;
332  char *s = NULL;
333  char *key = NULL;
334  int r, x, len;
335  char *k = NULL;
336  char *v = NULL;
337
338  /* Write version string */
339  r = fprintf (fp, "%s\n", DEFN_VERSION_1);
340  if (!r) goto save_err;
341
342  err = wdb_encode_str_alloc (wdb->name, &s);
343  if (err) goto save_err;
344  r = fprintf (fp, "name:%s\n", s);
345  if (!r) goto save_err;
346  free (s);
347
348  err = wdb_encode_str_alloc (wdb->key, &s);
349  if (err != STATUS_OK) goto save_err;
350  r = fprintf (fp, "key:%s\n", s);
351  if (!r) goto save_err;
352  free (s);
353  s = NULL;
354
355  r = fprintf (fp, "ondisk:%d\n", wdb->last_ondisk);
356  if (!r) goto save_err;
357
358  r = fprintf (fp, "attributes\n");
359  if (!r) goto save_err;
360  key = NULL;
361  s = (char *) dictNext (wdb->attrs, &key, NULL);
362  while (s)
363  {
364    err = wdb_encode_str_alloc (key, &k);
365    if (err != STATUS_OK) goto save_err;
366    err = wdb_encode_str_alloc (s, &v);
367    if (err != STATUS_OK) goto save_err;
368    r = fprintf (fp, "%s:%s\n", k, v);
369    if (!r) goto save_err;
370    free (k);
371    free (v);
372    k = NULL;
373    v = NULL;
374    s = (char *) dictNext (wdb->attrs, &key, NULL);
375  }
376  s = NULL;
377
378  r = fprintf (fp, "columns\n");
379  if (!r) goto save_err;
380
381  len = uListLength(wdb->cols_l);
382
383  for (x = 0; x < len; x++)
384  {
385    err = uListGet (wdb->cols_l, x, (void *)&col);
386    if (err) goto save_err;
387    err = wdb_encode_str_alloc (col->name, &s);
388    if (err != STATUS_OK) goto save_err;
389    r = fprintf (fp, "%s:%c:%d\n", s, col->type, col->ondisk_index);
390    if (!r) goto save_err;
391    free(s);
392    s = NULL;
393  }
394
395  return STATUS_OK;
396
397save_err:
398  if (s != NULL) free (s);
399  if (k != NULL) free (k);
400  if (v != NULL) free (v);
401  if (err == STATUS_OK) return nerr_pass(err);
402  return nerr_raise (r, "Unable to save defn");
403}
404
405static NEOERR *wdb_load_defn (WDB *wdb, const char *name)
406{
407  char path[_POSIX_PATH_MAX];
408  char line[1024];
409  FILE *fp;
410  NEOERR *err = STATUS_OK;
411
412  snprintf (path, sizeof(path), "%s.wdf", name);
413  fp = fopen (path, "r");
414  if (fp == NULL)
415  {
416    if (errno == ENOENT)
417      return nerr_raise (NERR_NOT_FOUND, "Unable to open defn %s", name);
418    return nerr_raise_errno (NERR_IO, "Unable to open defn %s", name);
419  }
420
421  /* Read Version string */
422  if (fgets (line, sizeof(line), fp) == NULL)
423  {
424    fclose(fp);
425    return nerr_raise_errno (NERR_IO, "Unable to read defn %s", name);
426  }
427  string_rstrip(line);
428
429  if (!strcmp(line, DEFN_VERSION_1))
430  {
431    err = wdb_load_defn_v1(wdb, fp);
432    fclose(fp);
433    if (err) return nerr_pass(err);
434  }
435  else
436  {
437    fclose(fp);
438    return nerr_raise (NERR_ASSERT, "Unknown defn version %s: %s", line, name);
439  }
440
441  wdb->table_version = rand();
442
443  return STATUS_OK;
444}
445
446static NEOERR *wdb_save_defn (WDB *wdb, const char *name)
447{
448  char path[_POSIX_PATH_MAX];
449  char path2[_POSIX_PATH_MAX];
450  FILE *fp;
451  NEOERR *err = STATUS_OK;
452  int r;
453
454  snprintf (path, sizeof(path), "%s.wdf.new", name);
455  snprintf (path2, sizeof(path2), "%s.wdf", name);
456  fp = fopen (path, "w");
457  if (fp == NULL)
458    return nerr_raise_errno (NERR_IO, "Unable to open defn %s", name);
459
460  err = wdb_save_defn_v1 (wdb, fp);
461  fclose (fp);
462  if (err != STATUS_OK)
463  {
464    unlink (path);
465    return nerr_pass (err);
466  }
467
468  r = unlink (path2);
469  if (r == -1 && errno != ENOENT)
470    return nerr_raise_errno (NERR_IO, "Unable to unlink %s", path2);
471  r = link (path, path2);
472  if (r == -1)
473    return nerr_raise_errno (NERR_IO, "Unable to link %s to %s", path, path2);
474  r = unlink (path);
475
476  wdb->defn_dirty = 0;
477  wdb->table_version = rand();
478
479  return STATUS_OK;
480}
481
482NEOERR *wdb_open (WDB **wdb, const char *name, int flags)
483{
484  WDB *my_wdb;
485  char path[_POSIX_PATH_MAX];
486  NEOERR *err = STATUS_OK;
487  int r;
488
489  *wdb = NULL;
490
491  err = wdb_alloc (&my_wdb, flags);
492  if (err) return nerr_pass(err);
493
494  my_wdb->path = strdup (name);
495  if (err)
496  {
497    wdb_destroy (&my_wdb);
498    return nerr_pass(err);
499  }
500
501  err = wdb_load_defn (my_wdb, name);
502  if (err)
503  {
504    wdb_destroy (&my_wdb);
505    return nerr_pass(err);
506  }
507
508  snprintf (path, sizeof(path), "%s.wdb", name);
509  r = db_open(path, DB_BTREE, 0, 0, NULL, NULL, &(my_wdb->db));
510  if (r)
511  {
512    wdb_destroy (&my_wdb);
513    return nerr_raise (NERR_DB, "Unable to open database %s: %d", name, r);
514  }
515
516  *wdb = my_wdb;
517
518  return STATUS_OK;
519}
520
521NEOERR *wdb_save (WDB *wdb)
522{
523  if (wdb->defn_dirty)
524  {
525    wdb_save_defn (wdb, wdb->path);
526  }
527  return STATUS_OK;
528}
529
530void wdb_destroy (WDB **wdb)
531{
532  WDB *my_wdb;
533
534  my_wdb = *wdb;
535
536  if (my_wdb == NULL) return;
537
538  if (my_wdb->defn_dirty)
539  {
540    wdb_save_defn (my_wdb, my_wdb->path);
541  }
542
543  if (my_wdb->attrs != NULL)
544  {
545    dictDestroy (my_wdb->attrs);
546  }
547
548  if (my_wdb->cols != NULL)
549  {
550    dictDestroy (my_wdb->cols);
551  }
552
553  if (my_wdb->cols_l != NULL)
554  {
555    uListDestroy(&(my_wdb->cols_l), 0);
556  }
557
558  if (my_wdb->ondisk != NULL)
559  {
560    skipFreeList(my_wdb->ondisk);
561  }
562
563  if (my_wdb->db != NULL)
564  {
565    my_wdb->db->close (my_wdb->db, 0);
566    my_wdb->db = NULL;
567  }
568
569  if (my_wdb->path != NULL)
570  {
571    free(my_wdb->path);
572    my_wdb->path = NULL;
573  }
574  if (my_wdb->name != NULL)
575  {
576    free(my_wdb->name);
577    my_wdb->name = NULL;
578  }
579  if (my_wdb->key != NULL)
580  {
581    free(my_wdb->key);
582    my_wdb->key = NULL;
583  }
584
585  free (my_wdb);
586  *wdb = NULL;
587
588  return;
589}
590
591#define PACK_UB4(pdata, plen, pmax, pn) \
592{ \
593  if (plen + 4 > pmax) \
594  { \
595    pmax *= 2; \
596    pdata = realloc ((void *)pdata, pmax); \
597    if (pdata == NULL) goto pack_err; \
598  } \
599  pdata[plen++] = (0x0ff & (pn >> 0)); \
600  pdata[plen++] = (0x0ff & (pn >> 8)); \
601  pdata[plen++] = (0x0ff & (pn >> 16)); \
602  pdata[plen++] = (0x0ff & (pn >> 24)); \
603}
604
605#define UNPACK_UB4(pdata, plen, pn, pd) \
606{ \
607  if (pn + 4 > plen) \
608    goto pack_err; \
609  pd = ((0x0ff & pdata[pn+0])<<0) | ((0x0ff & pdata[pn+1])<<8) | \
610       ((0x0ff & pdata[pn+2])<<16) | ((0x0ff & pdata[pn+3])<<24); \
611  pn+=4; \
612}
613
614#define PACK_BYTE(pdata, plen, pmax, pn) \
615{ \
616  if (plen + 1 > pmax) \
617  { \
618    pmax *= 2; \
619    pdata = realloc ((void *)pdata, pmax); \
620    if (pdata == NULL) goto pack_err; \
621  } \
622  pdata[plen++] = (0x0ff & (pn >> 0)); \
623}
624
625#define UNPACK_BYTE(pdata, plen, pn, pd) \
626{ \
627  if (pn + 1 > plen) \
628    goto pack_err; \
629  pd = pdata[pn++]; \
630}
631
632#define PACK_STRING(pdata, plen, pmax, dl, ds) \
633{ \
634  if (plen + 4 + dl > pmax) \
635  { \
636    while (plen + 4 + dl > pmax) \
637      pmax *= 2; \
638    pdata = realloc ((void *)pdata, pmax); \
639    if (pdata == NULL) goto pack_err; \
640  } \
641  pdata[plen++] = (0x0ff & (dl >> 0)); \
642  pdata[plen++] = (0x0ff & (dl >> 8)); \
643  pdata[plen++] = (0x0ff & (dl >> 16)); \
644  pdata[plen++] = (0x0ff & (dl >> 24)); \
645  memcpy (&pdata[plen], ds, dl); \
646  plen+=dl;\
647}
648
649#define UNPACK_STRING(pdata, plen, pn, ps) \
650{ \
651  int pl; \
652  if (pn + 4 > plen) \
653    goto pack_err; \
654  pl = ((0x0ff & pdata[pn+0])<<0) | ((0x0ff & pdata[pn+1])<<8) | \
655       ((0x0ff & pdata[pn+2])<<16) | ((0x0ff & pdata[pn+3])<<24); \
656  pn+=4; \
657  if (pl) \
658  { \
659    ps = (char *)malloc(sizeof(char)*(pl+1)); \
660    if (ps == NULL) \
661      goto pack_err; \
662    memcpy (ps, &pdata[pn], pl); \
663    ps[pl] = '\0'; \
664    pn += pl; \
665  } else { \
666    ps = NULL; \
667  } \
668}
669
670/* A VERSION_1 Row consists of the following data:
671 * UB4 VERSION
672 * UB4 DATA COUNT
673 * DATA where
674 *   UB4 ONDISK INDEX
675 *   UB1 TYPE
676 *   if INT, then UB4
677 *   if STR, then UB4 length and length UB1s
678 */
679
680static NEOERR *pack_row (WDB *wdb, WDBRow *row, void **rdata, int *rdlen)
681{
682  char *data;
683  int x, len, dlen, dmax;
684  char *s;
685  int n;
686  WDBColumn *col;
687  NEOERR *err;
688
689  *rdata = NULL;
690  *rdlen = 0;
691  /* allocate */
692  data = (char *)malloc(sizeof (char) * 1024);
693  if (data == NULL)
694    return nerr_raise (NERR_NOMEM, "Unable to allocate memory to pack row");
695
696  dmax = 1024;
697  dlen = 0;
698
699  PACK_UB4 (data, dlen, dmax, PACK_VERSION_1);
700/*  PACK_UB4 (data, dlen, dmax, time(NULL)); */
701
702  len = uListLength(wdb->cols_l);
703  if (len > row->data_count)
704    len = row->data_count;
705  PACK_UB4 (data, dlen, dmax, len);
706
707  for (x = 0; x < len; x++)
708  {
709    err = uListGet (wdb->cols_l, x, (void *)&col);
710    if (err) goto pack_err;
711    PACK_UB4 (data, dlen, dmax, col->ondisk_index);
712    PACK_BYTE (data, dlen, dmax, col->type);
713    switch (col->type)
714    {
715      case WDB_TYPE_INT:
716	n = (int)(row->data[x]);
717	PACK_UB4 (data, dlen, dmax, n);
718	break;
719      case WDB_TYPE_STR:
720	s = (char *)(row->data[x]);
721	if (s == NULL)
722	{
723	  s = "";
724	}
725	n = strlen(s);
726	PACK_STRING (data, dlen, dmax, n, s);
727	break;
728      default:
729	free (data);
730	return nerr_raise (NERR_ASSERT, "Unknown type %d", col->type);
731    }
732  }
733
734  *rdata = data;
735  *rdlen = dlen;
736  return STATUS_OK;
737
738pack_err:
739  if (data != NULL)
740    free (data);
741  if (err == STATUS_OK)
742    return nerr_raise(NERR_NOMEM, "Unable to allocate memory for pack_row");
743  return nerr_pass(err);
744}
745
746static NEOERR *unpack_row (WDB *wdb, void *rdata, int dlen, WDBRow *row)
747{
748  unsigned char *data = rdata;
749  int version, n;
750  int count, x, ondisk_index, type, d_int, inmem_index;
751  char *s;
752
753  n = 0;
754
755  UNPACK_UB4(data, dlen, n, version);
756
757  switch (version)
758  {
759    case PACK_VERSION_1:
760      UNPACK_UB4(data, dlen, n, count);
761      for (x = 0; x<count; x++)
762      {
763	UNPACK_UB4 (data, dlen, n, ondisk_index);
764	UNPACK_BYTE (data, dlen, n, type);
765	inmem_index = (int) skipSearch (wdb->ondisk, ondisk_index, NULL);
766
767	switch (type)
768	{
769	  case WDB_TYPE_INT:
770	    UNPACK_UB4 (data, dlen, n, d_int);
771	    if (inmem_index != 0)
772	      row->data[inmem_index-1] = (void *) d_int;
773	    break;
774	  case WDB_TYPE_STR:
775	    UNPACK_STRING (data, dlen, n, s);
776	    if (inmem_index != 0)
777	      row->data[inmem_index-1] = s;
778	    break;
779	  default:
780	    return nerr_raise (NERR_ASSERT, "Unknown type %d for col %d", type, ondisk_index);
781	}
782      }
783      break;
784    default:
785      return nerr_raise (NERR_ASSERT, "Unknown version %d", version);
786  }
787
788  return STATUS_OK;
789pack_err:
790  return nerr_raise(NERR_PARSE, "Unable to unpack row %s", row->key_value);
791}
792
793NEOERR *wdb_column_insert (WDB *wdb, int loc, const char *key, char type)
794{
795  NEOERR *err;
796  WDBColumn *col, *ocol;
797  int x, len;
798
799  col = (WDBColumn *) dictSearch (wdb->cols, key, NULL);
800
801  if (col != NULL)
802    return nerr_raise (NERR_DUPLICATE,
803	"Duplicate key %s:%d", key, col->inmem_index);
804
805  col = (WDBColumn *) calloc (1, sizeof (WDBColumn));
806  if (col == NULL)
807  {
808    return nerr_raise (NERR_NOMEM,
809	"Unable to allocate memory for creation of col %s:%d", key, loc);
810  }
811
812  col->name = strdup(key);
813  if (col->name == NULL)
814  {
815    free(col);
816    return nerr_raise (NERR_NOMEM,
817	"Unable to allocate memory for creation of col %s:%d", key, loc);
818  }
819  col->type = type;
820  col->ondisk_index = wdb->last_ondisk++;
821  /* -1 == append */
822  if (loc == -1)
823  {
824    err = dictSetValue(wdb->cols, key, col);
825    if (err)
826    {
827      free (col->name);
828      free (col);
829      return nerr_pass_ctx (err,
830	  "Unable to insert for creation of col %s:%d", key, loc);
831    }
832    err = uListAppend (wdb->cols_l, (void *)col);
833    if (err) return nerr_pass(err);
834    x = uListLength (wdb->cols_l);
835    col->inmem_index = x;
836    err = skipInsert (wdb->ondisk, col->ondisk_index,
837	            (void *)(col->inmem_index), 0);
838    if (err)
839      return nerr_pass_ctx (err, "Unable to update ondisk mapping for %s", key);
840  }
841  else
842  {
843    /* We are inserting this in middle, so the skipList ondisk is now
844     * invalid, as is the inmem_index for all cols */
845    err = dictSetValue(wdb->cols, key, col);
846    if (err)
847    {
848      free (col->name);
849      free (col);
850      return nerr_pass_ctx (err,
851	  "Unable to insert for creation of col %s:%d", key, loc);
852    }
853    err = uListInsert (wdb->cols_l, loc, (void *)col);
854    if (err) return nerr_pass(err);
855    len = uListLength (wdb->cols_l);
856    /* Fix up inmem_index and ondisk skipList */
857    for (x = 0; x < len; x++)
858    {
859      err = uListGet (wdb->cols_l, x, (void *)&ocol);
860      if (err) return nerr_pass(err);
861      ocol->inmem_index = x + 1;
862      err = skipInsert (wdb->ondisk, ocol->ondisk_index,
863	              (void *)(ocol->inmem_index), TRUE);
864      if (err)
865	return nerr_pass_ctx (err, "Unable to update ondisk mapping for %s", key);
866    }
867  }
868
869  wdb->defn_dirty = 1;
870  wdb->table_version = rand();
871
872  return STATUS_OK;
873}
874
875NEOERR *wdb_column_update (WDB *wdb, const char *oldkey, const char *newkey)
876{
877  WDBColumn *ocol, *col;
878  WDBColumn *vcol;
879  NEOERR *err = STATUS_OK;
880  int x, len, r;
881
882  ocol = (WDBColumn *) dictSearch (wdb->cols, oldkey, NULL);
883
884  if (ocol == NULL)
885    return nerr_raise (NERR_NOT_FOUND,
886	"Unable to find column for key %s", oldkey);
887
888  col = (WDBColumn *) calloc (1, sizeof (WDBColumn));
889  if (col == NULL)
890  {
891    return nerr_raise (NERR_NOMEM,
892	"Unable to allocate memory for column update %s", newkey);
893  }
894
895  *col = *ocol;
896  col->name = strdup(newkey);
897  if (col->name == NULL)
898  {
899    free(col);
900    return nerr_raise (NERR_NOMEM,
901	"Unable to allocate memory for column update %s", oldkey);
902  }
903  len = uListLength(wdb->cols_l);
904  for (x = 0; x < len; x++)
905  {
906    err = uListGet (wdb->cols_l, x, (void *)&vcol);
907    if (err) return nerr_pass(err);
908    if (!strcmp(vcol->name, oldkey))
909    {
910      err = uListSet (wdb->cols_l, x, (void *)col);
911      if (err) return nerr_pass(err);
912      break;
913    }
914  }
915  if (x>len)
916  {
917    return nerr_raise (NERR_ASSERT, "Unable to find cols_l for key %s", oldkey);
918  }
919
920  r = dictRemove (wdb->cols, oldkey); /* Only failure is key not found */
921  err = dictSetValue(wdb->cols, newkey, col);
922  if (err)
923  {
924    free (col->name);
925    free (col);
926    return nerr_pass_ctx (err,
927	"Unable to insert for update of col %s->%s", oldkey, newkey);
928  }
929
930  wdb->defn_dirty = 1;
931  wdb->table_version = rand();
932
933  return STATUS_OK;
934}
935
936NEOERR *wdb_column_delete (WDB *wdb, const char *name)
937{
938  WDBColumn *col;
939  NEOERR *err = STATUS_OK;
940  int len, x, r;
941
942  len = uListLength(wdb->cols_l);
943  for (x = 0; x < len; x++)
944  {
945    err = uListGet (wdb->cols_l, x, (void *)&col);
946    if (err) return nerr_pass(err);
947    if (!strcmp(col->name, name))
948    {
949      err = uListDelete (wdb->cols_l, x, NULL);
950      if (err) return nerr_pass(err);
951      break;
952    }
953  }
954
955  r = dictRemove (wdb->cols, name); /* Only failure is key not found */
956  if (!r)
957  {
958    return nerr_raise (NERR_NOT_FOUND,
959	"Unable to find column for key %s", name);
960  }
961  wdb->defn_dirty = 1;
962  wdb->table_version = rand();
963
964  return STATUS_OK;
965}
966
967NEOERR *wdb_column_exchange (WDB *wdb, const char *key1, const char *key2)
968{
969  return nerr_raise (NERR_ASSERT,
970      "wdb_column_exchange: Not Implemented");
971}
972
973/* Not that there's that much point in changing the key name ... */
974NEOERR *wdb_update (WDB *wdb, const char *name, const char *key)
975{
976  if (name != NULL && strcmp(wdb->name, name))
977  {
978    if (wdb->name != NULL)
979      free(wdb->name);
980    wdb->name = strdup(name);
981    if (wdb->name == NULL)
982      return nerr_raise (NERR_NOMEM,
983	  "Unable to allocate memory to update name to %s", name);
984    wdb->defn_dirty = 1;
985    wdb->table_version = rand();
986  }
987  if (key != NULL && strcmp(wdb->key, key))
988  {
989    if (wdb->key != NULL)
990      free(wdb->key);
991    wdb->key = strdup(key);
992    if (wdb->key == NULL)
993    {
994      wdb->defn_dirty = 0;
995      return nerr_raise (NERR_NOMEM,
996	  "Unable to allocate memory to update key to %s", key);
997    }
998    wdb->defn_dirty = 1;
999    wdb->table_version = rand();
1000  }
1001  return STATUS_OK;
1002}
1003
1004NEOERR *wdb_create (WDB **wdb, const char *path, const char *name,
1005                    const char *key, ULIST *col_def, int flags)
1006{
1007  WDB *my_wdb;
1008  char d_path[_POSIX_PATH_MAX];
1009  NEOERR *err = STATUS_OK;
1010  int x, len, r;
1011  char *s;
1012
1013  *wdb = NULL;
1014
1015  err = wdb_alloc (&my_wdb, flags);
1016  if (err) return nerr_pass(err);
1017
1018  my_wdb->name = strdup (name);
1019  my_wdb->key = strdup (key);
1020  my_wdb->path = strdup(path);
1021  if (my_wdb->name == NULL || my_wdb->key == NULL || my_wdb->path == NULL)
1022  {
1023    wdb_destroy (&my_wdb);
1024    return nerr_raise (NERR_NOMEM,
1025	"Unable to allocate memory for creation of %s", name);
1026  }
1027
1028  /* ondisk must start at one because of skipList */
1029  my_wdb->last_ondisk = 1;
1030  len = uListLength(col_def);
1031  for (x = 0; x < len; x++)
1032  {
1033    err = uListGet (col_def, x, (void *)&s);
1034    if (err)
1035    {
1036      wdb_destroy (&my_wdb);
1037      return nerr_pass(err);
1038    }
1039    err = wdb_column_insert (my_wdb, -1, s, WDB_TYPE_STR);
1040    my_wdb->defn_dirty = 0; /* So we don't save on error destroy */
1041    if (err)
1042    {
1043      wdb_destroy (&my_wdb);
1044      return nerr_pass(err);
1045    }
1046  }
1047
1048  err = wdb_save_defn (my_wdb, path);
1049  if (err)
1050  {
1051    wdb_destroy (&my_wdb);
1052    return nerr_pass(err);
1053  }
1054
1055  snprintf (d_path, sizeof(d_path), "%s.wdb", path);
1056  r = db_open(d_path, DB_BTREE, DB_CREATE | DB_TRUNCATE, 0, NULL, NULL, &(my_wdb->db));
1057  if (r)
1058  {
1059    wdb_destroy (&my_wdb);
1060    return nerr_raise (NERR_DB, "Unable to create db file %s: %d", d_path, r);
1061  }
1062
1063  *wdb = my_wdb;
1064
1065  return STATUS_OK;
1066}
1067
1068NEOERR *wdb_attr_next (WDB *wdb, char **key, char **value)
1069{
1070  *value = (char *) dictNext (wdb->attrs, key, NULL);
1071
1072  return STATUS_OK;
1073}
1074
1075NEOERR *wdb_attr_get (WDB *wdb, const char *key, char **value)
1076{
1077  void *v;
1078
1079  v = dictSearch (wdb->attrs, key, NULL);
1080
1081  if (v == NULL)
1082    return nerr_raise (NERR_NOT_FOUND, "Unable to find attr %s", key);
1083
1084  *value = (char *)v;
1085
1086  return STATUS_OK;
1087}
1088
1089NEOERR *wdb_attr_set (WDB *wdb, const char *key, const char *value)
1090{
1091  NEOERR *err = STATUS_OK;
1092  char *v;
1093
1094  v = strdup(value);
1095  if (v == NULL)
1096    return nerr_raise (NERR_NOMEM, "No memory for new attr");
1097
1098  err = dictSetValue(wdb->attrs, key, v);
1099  if (err)
1100    return nerr_pass_ctx (err, "Unable to set attr %s", key);
1101
1102  wdb->defn_dirty = 1;
1103
1104  return STATUS_OK;
1105}
1106
1107NEOERR *wdbr_get (WDB *wdb, WDBRow *row, const char *key, void **value)
1108{
1109  WDBColumn *col;
1110  void *v;
1111
1112  col = (WDBColumn *) dictSearch (wdb->cols, key, NULL);
1113
1114  if (col == NULL)
1115    return nerr_raise (NERR_NOT_FOUND, "Unable to find key %s", key);
1116
1117  if (col->inmem_index-1 > row->data_count)
1118    return nerr_raise (NERR_ASSERT, "Index for key %s is greater than row data, was table altered?", key);
1119
1120  v = row->data[col->inmem_index-1];
1121
1122  *value = v;
1123
1124  return STATUS_OK;
1125}
1126
1127NEOERR *wdbr_set (WDB *wdb, WDBRow *row, const char *key, void *value)
1128{
1129  WDBColumn *col;
1130
1131  col = (WDBColumn *) dictSearch (wdb->cols, key, NULL);
1132
1133  if (col == NULL)
1134    return nerr_raise (NERR_NOT_FOUND, "Unable to find key %s", key);
1135
1136  if (col->inmem_index-1 > row->data_count)
1137    return nerr_raise (NERR_ASSERT, "Index for key %s is greater than row data, was table altered?", key);
1138
1139  if (col->type == WDB_TYPE_STR && row->data[col->inmem_index-1] != NULL)
1140  {
1141    free (row->data[col->inmem_index-1]);
1142  }
1143  row->data[col->inmem_index-1] = value;
1144
1145  return STATUS_OK;
1146}
1147
1148static NEOERR *alloc_row (WDB *wdb, WDBRow **row)
1149{
1150  WDBRow *my_row;
1151  int len;
1152
1153  *row = NULL;
1154
1155  len = uListLength (wdb->cols_l);
1156
1157  my_row = (WDBRow *) calloc (1, sizeof (WDBRow) + len * (sizeof (void *)));
1158  if (my_row == NULL)
1159    return nerr_raise (NERR_NOMEM, "No memory for new row");
1160
1161  my_row->data_count = len;
1162  my_row->table_version = wdb->table_version;
1163
1164  *row = my_row;
1165
1166  return STATUS_OK;
1167}
1168
1169NEOERR *wdbr_destroy (WDB *wdb, WDBRow **row)
1170{
1171  WDBColumn *col;
1172  WDBRow *my_row;
1173  int len, x;
1174  NEOERR *err;
1175
1176  err = STATUS_OK;
1177  if (*row == NULL)
1178    return err;
1179
1180  my_row = *row;
1181
1182  /* Verify this row maps to this table, or else we could do something
1183   * bad */
1184
1185  if (wdb->table_version != my_row->table_version)
1186    return nerr_raise (NERR_ASSERT, "Row %s doesn't match current table", my_row->key_value);
1187
1188  if (my_row->key_value != NULL)
1189    free (my_row->key_value);
1190
1191  len = uListLength(wdb->cols_l);
1192
1193  for (x = 0; x < len; x++)
1194  {
1195    if (my_row->data[x] != NULL)
1196    {
1197      err = uListGet (wdb->cols_l, x, (void *)&col);
1198      if (err) break;
1199      switch (col->type)
1200      {
1201	case WDB_TYPE_INT:
1202	  break;
1203	case WDB_TYPE_STR:
1204	  free (my_row->data[x]);
1205	  break;
1206	default:
1207	  return nerr_raise (NERR_ASSERT, "Unknown type %d", col->type);
1208      }
1209    }
1210  }
1211
1212  free (my_row);
1213  *row = NULL;
1214
1215  return nerr_pass(err);
1216}
1217
1218NEOERR *wdbr_lookup (WDB *wdb, const char *key, WDBRow **row)
1219{
1220  DBT dkey, data;
1221  NEOERR *err = STATUS_OK;
1222  WDBRow *my_row;
1223  int r;
1224
1225  *row = NULL;
1226
1227  memset(&dkey, 0, sizeof(dkey));
1228  memset(&data, 0, sizeof(data));
1229
1230  dkey.flags = DB_DBT_USERMEM;
1231  data.flags = DB_DBT_MALLOC;
1232
1233  dkey.data = (void *)key;
1234  dkey.size = strlen(key);
1235
1236  r = wdb->db->get (wdb->db, NULL, &dkey, &data, 0);
1237  if (r == DB_NOTFOUND)
1238    return nerr_raise (NERR_NOT_FOUND, "Unable to find key %s", key);
1239  else if (r)
1240    return nerr_raise (NERR_DB, "Error retrieving key %s: %d", key, r);
1241
1242  /* allocate row */
1243  err = alloc_row (wdb, &my_row);
1244  if (err != STATUS_OK)
1245  {
1246    free (data.data);
1247    return nerr_pass(err);
1248  }
1249
1250  my_row->key_value = strdup(key);
1251  if (my_row->key_value == NULL)
1252  {
1253    free (data.data);
1254    free (my_row);
1255    return nerr_raise (NERR_NOMEM, "No memory for new row");
1256  }
1257
1258  /* unpack row */
1259  err = unpack_row (wdb, data.data, data.size, my_row);
1260  free (data.data);
1261  if (err)
1262  {
1263    free (my_row);
1264    return nerr_pass(err);
1265  }
1266
1267  *row = my_row;
1268
1269  return STATUS_OK;
1270}
1271
1272NEOERR *wdbr_create (WDB *wdb, const char *key, WDBRow **row)
1273{
1274  WDBRow *my_row;
1275  NEOERR *err = STATUS_OK;
1276
1277  *row = NULL;
1278
1279  /* allocate row */
1280  err = alloc_row (wdb, &my_row);
1281  if (err) return nerr_pass(err);
1282
1283  my_row->key_value = strdup(key);
1284  if (my_row->key_value == NULL)
1285  {
1286    wdbr_destroy (wdb, &my_row);
1287    return nerr_raise (NERR_NOMEM, "No memory for new row");
1288  }
1289
1290  *row = my_row;
1291
1292  return STATUS_OK;
1293}
1294
1295NEOERR *wdbr_save (WDB *wdb, WDBRow *row, int flags)
1296{
1297  DBT dkey, data;
1298  int dflags = 0;
1299  NEOERR *err = STATUS_OK;
1300  int r;
1301
1302  memset(&dkey, 0, sizeof(dkey));
1303  memset(&data, 0, sizeof(data));
1304
1305  dkey.data = row->key_value;
1306  dkey.size = strlen(row->key_value);
1307
1308  err = pack_row (wdb, row, &(data.data), &data.size);
1309  if (err != STATUS_OK) return nerr_pass(err);
1310
1311  if (flags & WDBR_INSERT)
1312  {
1313    dflags = DB_NOOVERWRITE;
1314  }
1315
1316  r = wdb->db->put (wdb->db, NULL, &dkey, &data, dflags);
1317  free (data.data);
1318  if (r == DB_KEYEXIST)
1319    return nerr_raise (NERR_DUPLICATE, "Key %s already exists", row->key_value);
1320  if (r)
1321    return nerr_raise (NERR_DB, "Error saving key %s: %d",
1322	row->key_value, r);
1323
1324  return STATUS_OK;
1325}
1326
1327NEOERR *wdbr_delete (WDB *wdb, const char *key)
1328{
1329  DBT dkey;
1330  int r;
1331
1332  memset(&dkey, 0, sizeof(dkey));
1333
1334  dkey.flags = DB_DBT_USERMEM;
1335
1336  dkey.data = (void *)key;
1337  dkey.size = strlen(key);
1338
1339  r = wdb->db->del (wdb->db, NULL, &dkey, 0);
1340  if (r == DB_NOTFOUND)
1341    return nerr_raise (NERR_NOT_FOUND, "Key %s not found", key);
1342  else if (r)
1343    return nerr_raise (NERR_DB, "Error deleting key %s: %d", key, r);
1344
1345
1346  return STATUS_OK;
1347}
1348
1349NEOERR *wdbr_dump (WDB *wdb, WDBRow *row)
1350{
1351  int x;
1352
1353  ne_warn ("version: %d", row->table_version);
1354  ne_warn ("key: %s", row->key_value);
1355  ne_warn ("count: %d", row->data_count);
1356  for (x=0; x < row->data_count; x++)
1357    ne_warn ("data[%d]: %s", x, row->data[x]);
1358
1359  return STATUS_OK;
1360}
1361
1362NEOERR *wdbc_create (WDB *wdb, WDBCursor **cursor)
1363{
1364  DBC *db_cursor;
1365  WDBCursor *new_cursor;
1366  int r;
1367
1368  *cursor = NULL;
1369
1370#if (DB_VERSION_MINOR==4)
1371  r = (wdb->db)->cursor (wdb->db, NULL, &db_cursor);
1372#else
1373  r = (wdb->db)->cursor (wdb->db, NULL, &db_cursor, 0);
1374#endif
1375  if (r)
1376    return nerr_raise (NERR_DB, "Unable to create cursor: %d", r);
1377
1378  new_cursor = (WDBCursor *) calloc (1, sizeof (WDBCursor));
1379  if (new_cursor == NULL)
1380  {
1381    db_cursor->c_close (db_cursor);
1382    return nerr_raise (NERR_NOMEM, "Unable to create cursor");
1383  }
1384
1385  new_cursor->table_version = wdb->table_version;
1386  new_cursor->db_cursor = db_cursor;
1387
1388  *cursor = new_cursor;
1389
1390  return STATUS_OK;
1391}
1392
1393NEOERR *wdbc_destroy (WDB *wdb, WDBCursor **cursor)
1394{
1395  if (*cursor != NULL)
1396  {
1397    (*cursor)->db_cursor->c_close ((*cursor)->db_cursor);
1398    free (*cursor);
1399    *cursor = NULL;
1400  }
1401  return STATUS_OK;
1402}
1403
1404NEOERR *wdbr_next (WDB *wdb, WDBCursor *cursor, WDBRow **row, int flags)
1405{
1406  DBT dkey, data;
1407  WDBRow *my_row;
1408  NEOERR *err = STATUS_OK;
1409  int r;
1410
1411  *row = NULL;
1412
1413  if (wdb->table_version != cursor->table_version)
1414  {
1415    return nerr_raise (NERR_ASSERT, "Cursor doesn't match database");
1416  }
1417
1418  memset(&dkey, 0, sizeof(dkey));
1419  memset(&data, 0, sizeof(data));
1420  dkey.flags = DB_DBT_MALLOC;
1421  data.flags = DB_DBT_MALLOC;
1422
1423  /* First call */
1424  if (flags & WDBC_FIRST)
1425  {
1426    r = cursor->db_cursor->c_get (cursor->db_cursor, &dkey, &data, DB_FIRST);
1427    if (r == DB_NOTFOUND)
1428      return nerr_raise (NERR_NOT_FOUND, "Cursor empty");
1429    else if (r)
1430      return nerr_raise (NERR_DB, "Unable to get first item from cursor: %d",
1431	  r);
1432  }
1433  else
1434  {
1435    r = cursor->db_cursor->c_get (cursor->db_cursor, &dkey, &data, DB_NEXT);
1436    if (r == DB_NOTFOUND)
1437      return STATUS_OK;
1438    else if (r)
1439      return nerr_raise (NERR_DB, "Unable to get next item from cursor: %d", r);
1440  }
1441
1442  /* allocate row */
1443  err = alloc_row (wdb, &my_row);
1444  if (err)
1445  {
1446    free (data.data);
1447    return nerr_pass(err);
1448  }
1449
1450  my_row->key_value = (char *) malloc (dkey.size + 1);
1451  if (my_row->key_value == NULL)
1452  {
1453    free (data.data);
1454    free (my_row);
1455    return nerr_raise (NERR_NOMEM, "No memory for new row");
1456  }
1457
1458  memcpy (my_row->key_value, dkey.data, dkey.size);
1459  my_row->key_value[dkey.size] = '\0';
1460
1461  /* unpack row */
1462  err = unpack_row (wdb, data.data, data.size, my_row);
1463  free (data.data);
1464  free (dkey.data);
1465  if (err)
1466  {
1467    free (my_row);
1468    return nerr_pass(err);
1469  }
1470
1471  *row = my_row;
1472
1473  return STATUS_OK;
1474}
1475
1476NEOERR *wdbr_find (WDB *wdb, WDBCursor *cursor, const char *key, WDBRow **row)
1477{
1478  DBT dkey, data;
1479  WDBRow *my_row;
1480  NEOERR *err = STATUS_OK;
1481  int r;
1482
1483  *row = NULL;
1484
1485  if (wdb->table_version != cursor->table_version)
1486  {
1487    return nerr_raise (NERR_ASSERT, "Cursor doesn't match database");
1488  }
1489
1490  memset(&dkey, 0, sizeof(dkey));
1491  memset(&data, 0, sizeof(data));
1492  dkey.flags = DB_DBT_USERMEM;
1493  data.flags = DB_DBT_MALLOC;
1494
1495  dkey.data = (void *)key;
1496  dkey.size = strlen(key);
1497
1498  r = cursor->db_cursor->c_get (cursor->db_cursor, &dkey, &data, DB_SET_RANGE);
1499  if (r == DB_NOTFOUND)
1500    return STATUS_OK;
1501  else if (r)
1502    return nerr_raise (r, "Unable to get find item for key %s", key);
1503
1504  /* allocate row */
1505  err = alloc_row (wdb, &my_row);
1506  if (err)
1507  {
1508    free (data.data);
1509    return nerr_pass(err);
1510  }
1511
1512  my_row->key_value = (char *) malloc (dkey.size + 1);
1513  if (my_row->key_value == NULL)
1514  {
1515    free (data.data);
1516    free (my_row);
1517    return nerr_raise (NERR_NOMEM, "No memory for new row");
1518  }
1519
1520  memcpy (my_row->key_value, dkey.data, dkey.size);
1521  my_row->key_value[dkey.size] = '\0';
1522
1523  /* unpack row */
1524  err = unpack_row (wdb, data.data, data.size, my_row);
1525  free (data.data);
1526  if (err)
1527  {
1528    free (my_row);
1529    return nerr_pass(err);
1530  }
1531
1532  *row = my_row;
1533
1534  return STATUS_OK;
1535}
1536
1537NEOERR *wdb_keys (WDB *wdb, char **primary_key, ULIST **data)
1538{
1539  NEOERR *err;
1540  int x, len;
1541  WDBColumn *col;
1542  ULIST *my_data;
1543  char *my_key = NULL;
1544  char *my_col = NULL;
1545
1546  *data = NULL;
1547  *primary_key = NULL;
1548  my_key = strdup(wdb->key);
1549  if (my_key == NULL)
1550    return nerr_raise (NERR_NOMEM, "Unable to allocate memory for keys");
1551
1552  len = uListLength(wdb->cols_l);
1553  err = uListInit (&my_data, len, 0);
1554  if (err != STATUS_OK)
1555  {
1556    free(my_key);
1557    return nerr_pass(err);
1558  }
1559
1560  for (x = 0; x < len; x++)
1561  {
1562    err = uListGet (wdb->cols_l, x, (void *)&col);
1563    if (err) goto key_err;
1564    my_col = strdup(col->name);
1565    if (my_col == NULL)
1566    {
1567      err = nerr_raise (NERR_NOMEM, "Unable to allocate memory for keys");
1568      goto key_err;
1569    }
1570    err = uListAppend (my_data, my_col);
1571    my_col = NULL;
1572    if (err) goto key_err;
1573  }
1574
1575  *data = my_data;
1576  *primary_key = my_key;
1577  return STATUS_OK;
1578
1579key_err:
1580  if (my_key != NULL) free (my_key);
1581  if (my_col != NULL) free (my_col);
1582  *primary_key = NULL;
1583  uListDestroy (&my_data, 0);
1584  return nerr_pass(err);
1585}
1586
1587/*
1588 * Known Issues:
1589 *  - Probably need to store the actual key value in the packed row..
1590 *    Maybe not, because any cursor you use on a sleepycat db will
1591 *    return the key...
1592 * - um, memory.  Especially when dealing with rows, need to keep track
1593 *   of when we allocate, when we dealloc, and who owns that memory to
1594 *   free it
1595 * - function to delete entry from wdb
1596 */
1597