1import unittest
2import os
3
4from test_all import db, test_support, get_new_environment_path, get_new_database_path
5
6
7class DBSequenceTest(unittest.TestCase):
8    def setUp(self):
9        self.int_32_max = 0x100000000
10        self.homeDir = get_new_environment_path()
11        self.filename = "test"
12
13        self.dbenv = db.DBEnv()
14        self.dbenv.open(self.homeDir, db.DB_CREATE | db.DB_INIT_MPOOL, 0666)
15        self.d = db.DB(self.dbenv)
16        self.d.open(self.filename, db.DB_BTREE, db.DB_CREATE, 0666)
17
18    def tearDown(self):
19        if hasattr(self, 'seq'):
20            self.seq.close()
21            del self.seq
22        if hasattr(self, 'd'):
23            self.d.close()
24            del self.d
25        if hasattr(self, 'dbenv'):
26            self.dbenv.close()
27            del self.dbenv
28
29        test_support.rmtree(self.homeDir)
30
31    def test_get(self):
32        self.seq = db.DBSequence(self.d, flags=0)
33        start_value = 10 * self.int_32_max
34        self.assertEqual(0xA00000000, start_value)
35        self.assertEqual(None, self.seq.initial_value(start_value))
36        self.assertEqual(None, self.seq.open(key='id', txn=None, flags=db.DB_CREATE))
37        self.assertEqual(start_value, self.seq.get(5))
38        self.assertEqual(start_value + 5, self.seq.get())
39
40    def test_remove(self):
41        self.seq = db.DBSequence(self.d, flags=0)
42        self.assertEqual(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE))
43        self.assertEqual(None, self.seq.remove(txn=None, flags=0))
44        del self.seq
45
46    def test_get_key(self):
47        self.seq = db.DBSequence(self.d, flags=0)
48        key = 'foo'
49        self.assertEqual(None, self.seq.open(key=key, txn=None, flags=db.DB_CREATE))
50        self.assertEqual(key, self.seq.get_key())
51
52    def test_get_dbp(self):
53        self.seq = db.DBSequence(self.d, flags=0)
54        self.assertEqual(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE))
55        self.assertEqual(self.d, self.seq.get_dbp())
56
57    def test_cachesize(self):
58        self.seq = db.DBSequence(self.d, flags=0)
59        cashe_size = 10
60        self.assertEqual(None, self.seq.set_cachesize(cashe_size))
61        self.assertEqual(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE))
62        self.assertEqual(cashe_size, self.seq.get_cachesize())
63
64    def test_flags(self):
65        self.seq = db.DBSequence(self.d, flags=0)
66        flag = db.DB_SEQ_WRAP;
67        self.assertEqual(None, self.seq.set_flags(flag))
68        self.assertEqual(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE))
69        self.assertEqual(flag, self.seq.get_flags() & flag)
70
71    def test_range(self):
72        self.seq = db.DBSequence(self.d, flags=0)
73        seq_range = (10 * self.int_32_max, 11 * self.int_32_max - 1)
74        self.assertEqual(None, self.seq.set_range(seq_range))
75        self.seq.initial_value(seq_range[0])
76        self.assertEqual(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE))
77        self.assertEqual(seq_range, self.seq.get_range())
78
79    def test_stat(self):
80        self.seq = db.DBSequence(self.d, flags=0)
81        self.assertEqual(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE))
82        stat = self.seq.stat()
83        for param in ('nowait', 'min', 'max', 'value', 'current',
84                      'flags', 'cache_size', 'last_value', 'wait'):
85            self.assertTrue(param in stat, "parameter %s isn't in stat info" % param)
86
87    if db.version() >= (4,7) :
88        # This code checks a crash solved in Berkeley DB 4.7
89        def test_stat_crash(self) :
90            d=db.DB()
91            d.open(None,dbtype=db.DB_HASH,flags=db.DB_CREATE)  # In RAM
92            seq = db.DBSequence(d, flags=0)
93
94            self.assertRaises(db.DBNotFoundError, seq.open,
95                    key='id', txn=None, flags=0)
96
97            self.assertRaises(db.DBInvalidArgError, seq.stat)
98
99            d.close()
100
101    def test_64bits(self) :
102        # We don't use both extremes because they are problematic
103        value_plus=(1L<<63)-2
104        self.assertEqual(9223372036854775806L,value_plus)
105        value_minus=(-1L<<63)+1  # Two complement
106        self.assertEqual(-9223372036854775807L,value_minus)
107        self.seq = db.DBSequence(self.d, flags=0)
108        self.assertEqual(None, self.seq.initial_value(value_plus-1))
109        self.assertEqual(None, self.seq.open(key='id', txn=None,
110            flags=db.DB_CREATE))
111        self.assertEqual(value_plus-1, self.seq.get(1))
112        self.assertEqual(value_plus, self.seq.get(1))
113
114        self.seq.remove(txn=None, flags=0)
115
116        self.seq = db.DBSequence(self.d, flags=0)
117        self.assertEqual(None, self.seq.initial_value(value_minus))
118        self.assertEqual(None, self.seq.open(key='id', txn=None,
119            flags=db.DB_CREATE))
120        self.assertEqual(value_minus, self.seq.get(1))
121        self.assertEqual(value_minus+1, self.seq.get(1))
122
123    def test_multiple_close(self):
124        self.seq = db.DBSequence(self.d)
125        self.seq.close()  # You can close a Sequence multiple times
126        self.seq.close()
127        self.seq.close()
128
129def test_suite():
130    suite = unittest.TestSuite()
131    suite.addTest(unittest.makeSuite(DBSequenceTest))
132    return suite
133
134
135if __name__ == '__main__':
136    unittest.main(defaultTest='test_suite')
137