1#! /usr/bin/env python
2"""Test script for the bsddb C module by Roger E. Masse
3   Adapted to unittest format and expanded scope by Raymond Hettinger
4"""
5import os, sys
6import unittest
7from test import test_support
8
9# Skip test if _bsddb wasn't built.
10test_support.import_module('_bsddb')
11
12bsddb = test_support.import_module('bsddb', deprecated=True)
13# Just so we know it's imported:
14test_support.import_module('dbhash', deprecated=True)
15
16
17class TestBSDDB(unittest.TestCase):
18    openflag = 'c'
19
20    def setUp(self):
21        self.f = self.openmethod[0](self.fname, self.openflag, cachesize=32768)
22        self.d = dict(q='Guido', w='van', e='Rossum', r='invented', t='Python', y='')
23        for k, v in self.d.iteritems():
24            self.f[k] = v
25
26    def tearDown(self):
27        self.f.sync()
28        self.f.close()
29        if self.fname is None:
30            return
31        try:
32            os.remove(self.fname)
33        except os.error:
34            pass
35
36    def test_getitem(self):
37        for k, v in self.d.iteritems():
38            self.assertEqual(self.f[k], v)
39
40    def test_len(self):
41        self.assertEqual(len(self.f), len(self.d))
42
43    def test_change(self):
44        self.f['r'] = 'discovered'
45        self.assertEqual(self.f['r'], 'discovered')
46        self.assertIn('r', self.f.keys())
47        self.assertIn('discovered', self.f.values())
48
49    def test_close_and_reopen(self):
50        if self.fname is None:
51            # if we're using an in-memory only db, we can't reopen it
52            # so finish here.
53            return
54        self.f.close()
55        self.f = self.openmethod[0](self.fname, 'w')
56        for k, v in self.d.iteritems():
57            self.assertEqual(self.f[k], v)
58
59    def assertSetEquals(self, seqn1, seqn2):
60        self.assertEqual(set(seqn1), set(seqn2))
61
62    def test_mapping_iteration_methods(self):
63        f = self.f
64        d = self.d
65        self.assertSetEquals(d, f)
66        self.assertSetEquals(d.keys(), f.keys())
67        self.assertSetEquals(d.values(), f.values())
68        self.assertSetEquals(d.items(), f.items())
69        self.assertSetEquals(d.iterkeys(), f.iterkeys())
70        self.assertSetEquals(d.itervalues(), f.itervalues())
71        self.assertSetEquals(d.iteritems(), f.iteritems())
72
73    def test_iter_while_modifying_values(self):
74        di = iter(self.d)
75        while 1:
76            try:
77                key = di.next()
78                self.d[key] = 'modified '+key
79            except StopIteration:
80                break
81
82        # it should behave the same as a dict.  modifying values
83        # of existing keys should not break iteration.  (adding
84        # or removing keys should)
85        loops_left = len(self.f)
86        fi = iter(self.f)
87        while 1:
88            try:
89                key = fi.next()
90                self.f[key] = 'modified '+key
91                loops_left -= 1
92            except StopIteration:
93                break
94        self.assertEqual(loops_left, 0)
95
96        self.test_mapping_iteration_methods()
97
98    def test_iter_abort_on_changed_size(self):
99        def DictIterAbort():
100            di = iter(self.d)
101            while 1:
102                try:
103                    di.next()
104                    self.d['newkey'] = 'SPAM'
105                except StopIteration:
106                    break
107        self.assertRaises(RuntimeError, DictIterAbort)
108
109        def DbIterAbort():
110            fi = iter(self.f)
111            while 1:
112                try:
113                    fi.next()
114                    self.f['newkey'] = 'SPAM'
115                except StopIteration:
116                    break
117        self.assertRaises(RuntimeError, DbIterAbort)
118
119    def test_iteritems_abort_on_changed_size(self):
120        def DictIteritemsAbort():
121            di = self.d.iteritems()
122            while 1:
123                try:
124                    di.next()
125                    self.d['newkey'] = 'SPAM'
126                except StopIteration:
127                    break
128        self.assertRaises(RuntimeError, DictIteritemsAbort)
129
130        def DbIteritemsAbort():
131            fi = self.f.iteritems()
132            while 1:
133                try:
134                    key, value = fi.next()
135                    del self.f[key]
136                except StopIteration:
137                    break
138        self.assertRaises(RuntimeError, DbIteritemsAbort)
139
140    def test_iteritems_while_modifying_values(self):
141        di = self.d.iteritems()
142        while 1:
143            try:
144                k, v = di.next()
145                self.d[k] = 'modified '+v
146            except StopIteration:
147                break
148
149        # it should behave the same as a dict.  modifying values
150        # of existing keys should not break iteration.  (adding
151        # or removing keys should)
152        loops_left = len(self.f)
153        fi = self.f.iteritems()
154        while 1:
155            try:
156                k, v = fi.next()
157                self.f[k] = 'modified '+v
158                loops_left -= 1
159            except StopIteration:
160                break
161        self.assertEqual(loops_left, 0)
162
163        self.test_mapping_iteration_methods()
164
165    def test_first_next_looping(self):
166        items = [self.f.first()]
167        for i in xrange(1, len(self.f)):
168            items.append(self.f.next())
169        self.assertSetEquals(items, self.d.items())
170
171    def test_previous_last_looping(self):
172        items = [self.f.last()]
173        for i in xrange(1, len(self.f)):
174            items.append(self.f.previous())
175        self.assertSetEquals(items, self.d.items())
176
177    def test_first_while_deleting(self):
178        # Test for bug 1725856
179        self.assertTrue(len(self.d) >= 2, "test requires >=2 items")
180        for _ in self.d:
181            key = self.f.first()[0]
182            del self.f[key]
183        self.assertEqual([], self.f.items(), "expected empty db after test")
184
185    def test_last_while_deleting(self):
186        # Test for bug 1725856's evil twin
187        self.assertTrue(len(self.d) >= 2, "test requires >=2 items")
188        for _ in self.d:
189            key = self.f.last()[0]
190            del self.f[key]
191        self.assertEqual([], self.f.items(), "expected empty db after test")
192
193    def test_set_location(self):
194        self.assertEqual(self.f.set_location('e'), ('e', self.d['e']))
195
196    def test_contains(self):
197        for k in self.d:
198            self.assertIn(k, self.f)
199        self.assertNotIn('not here', self.f)
200
201    def test_has_key(self):
202        for k in self.d:
203            self.assertTrue(self.f.has_key(k))
204        self.assertTrue(not self.f.has_key('not here'))
205
206    def test_clear(self):
207        self.f.clear()
208        self.assertEqual(len(self.f), 0)
209
210    def test__no_deadlock_first(self, debug=0):
211        # do this so that testers can see what function we're in in
212        # verbose mode when we deadlock.
213        sys.stdout.flush()
214
215        # in pybsddb's _DBWithCursor this causes an internal DBCursor
216        # object is created.  Other test_ methods in this class could
217        # inadvertently cause the deadlock but an explicit test is needed.
218        if debug: print "A"
219        k,v = self.f.first()
220        if debug: print "B", k
221        self.f[k] = "deadlock.  do not pass go.  do not collect $200."
222        if debug: print "C"
223        # if the bsddb implementation leaves the DBCursor open during
224        # the database write and locking+threading support is enabled
225        # the cursor's read lock will deadlock the write lock request..
226
227        # test the iterator interface
228        if True:
229            if debug: print "D"
230            i = self.f.iteritems()
231            k,v = i.next()
232            if debug: print "E"
233            self.f[k] = "please don't deadlock"
234            if debug: print "F"
235            while 1:
236                try:
237                    k,v = i.next()
238                except StopIteration:
239                    break
240            if debug: print "F2"
241
242            i = iter(self.f)
243            if debug: print "G"
244            while i:
245                try:
246                    if debug: print "H"
247                    k = i.next()
248                    if debug: print "I"
249                    self.f[k] = "deadlocks-r-us"
250                    if debug: print "J"
251                except StopIteration:
252                    i = None
253            if debug: print "K"
254
255        # test the legacy cursor interface mixed with writes
256        self.assertIn(self.f.first()[0], self.d)
257        k = self.f.next()[0]
258        self.assertIn(k, self.d)
259        self.f[k] = "be gone with ye deadlocks"
260        self.assertTrue(self.f[k], "be gone with ye deadlocks")
261
262    def test_for_cursor_memleak(self):
263        # do the bsddb._DBWithCursor iterator internals leak cursors?
264        nc1 = len(self.f._cursor_refs)
265        # create iterator
266        i = self.f.iteritems()
267        nc2 = len(self.f._cursor_refs)
268        # use the iterator (should run to the first yield, creating the cursor)
269        k, v = i.next()
270        nc3 = len(self.f._cursor_refs)
271        # destroy the iterator; this should cause the weakref callback
272        # to remove the cursor object from self.f._cursor_refs
273        del i
274        nc4 = len(self.f._cursor_refs)
275
276        self.assertEqual(nc1, nc2)
277        self.assertEqual(nc1, nc4)
278        self.assertTrue(nc3 == nc1+1)
279
280    def test_popitem(self):
281        k, v = self.f.popitem()
282        self.assertIn(k, self.d)
283        self.assertIn(v, self.d.values())
284        self.assertNotIn(k, self.f)
285        self.assertEqual(len(self.d)-1, len(self.f))
286
287    def test_pop(self):
288        k = 'w'
289        v = self.f.pop(k)
290        self.assertEqual(v, self.d[k])
291        self.assertNotIn(k, self.f)
292        self.assertNotIn(v, self.f.values())
293        self.assertEqual(len(self.d)-1, len(self.f))
294
295    def test_get(self):
296        self.assertEqual(self.f.get('NotHere'), None)
297        self.assertEqual(self.f.get('NotHere', 'Default'), 'Default')
298        self.assertEqual(self.f.get('q', 'Default'), self.d['q'])
299
300    def test_setdefault(self):
301        self.assertEqual(self.f.setdefault('new', 'dog'), 'dog')
302        self.assertEqual(self.f.setdefault('r', 'cat'), self.d['r'])
303
304    def test_update(self):
305        new = dict(y='life', u='of', i='brian')
306        self.f.update(new)
307        self.d.update(new)
308        for k, v in self.d.iteritems():
309            self.assertEqual(self.f[k], v)
310
311    def test_keyordering(self):
312        if self.openmethod[0] is not bsddb.btopen:
313            return
314        keys = self.d.keys()
315        keys.sort()
316        self.assertEqual(self.f.first()[0], keys[0])
317        self.assertEqual(self.f.next()[0], keys[1])
318        self.assertEqual(self.f.last()[0], keys[-1])
319        self.assertEqual(self.f.previous()[0], keys[-2])
320        self.assertEqual(list(self.f), keys)
321
322class TestBTree(TestBSDDB):
323    fname = test_support.TESTFN
324    openmethod = [bsddb.btopen]
325
326class TestBTree_InMemory(TestBSDDB):
327    fname = None
328    openmethod = [bsddb.btopen]
329
330class TestBTree_InMemory_Truncate(TestBSDDB):
331    fname = None
332    openflag = 'n'
333    openmethod = [bsddb.btopen]
334
335class TestHashTable(TestBSDDB):
336    fname = test_support.TESTFN
337    openmethod = [bsddb.hashopen]
338
339class TestHashTable_InMemory(TestBSDDB):
340    fname = None
341    openmethod = [bsddb.hashopen]
342
343##         # (bsddb.rnopen,'Record Numbers'), 'put' for RECNO for bsddb 1.85
344##         #                                   appears broken... at least on
345##         #                                   Solaris Intel - rmasse 1/97
346
347def test_main(verbose=None):
348    test_support.run_unittest(
349        TestBTree,
350        TestHashTable,
351        TestBTree_InMemory,
352        TestHashTable_InMemory,
353        TestBTree_InMemory_Truncate,
354    )
355
356if __name__ == "__main__":
357    test_main(verbose=True)
358