1import unittest 2import os 3 4from test_all import db, test_support, get_new_environment_path, get_new_database_path 5 6 7class DBSequenceTest(unittest.TestCase): 8 def setUp(self): 9 self.int_32_max = 0x100000000 10 self.homeDir = get_new_environment_path() 11 self.filename = "test" 12 13 self.dbenv = db.DBEnv() 14 self.dbenv.open(self.homeDir, db.DB_CREATE | db.DB_INIT_MPOOL, 0666) 15 self.d = db.DB(self.dbenv) 16 self.d.open(self.filename, db.DB_BTREE, db.DB_CREATE, 0666) 17 18 def tearDown(self): 19 if hasattr(self, 'seq'): 20 self.seq.close() 21 del self.seq 22 if hasattr(self, 'd'): 23 self.d.close() 24 del self.d 25 if hasattr(self, 'dbenv'): 26 self.dbenv.close() 27 del self.dbenv 28 29 test_support.rmtree(self.homeDir) 30 31 def test_get(self): 32 self.seq = db.DBSequence(self.d, flags=0) 33 start_value = 10 * self.int_32_max 34 self.assertEqual(0xA00000000, start_value) 35 self.assertEqual(None, self.seq.initial_value(start_value)) 36 self.assertEqual(None, self.seq.open(key='id', txn=None, flags=db.DB_CREATE)) 37 self.assertEqual(start_value, self.seq.get(5)) 38 self.assertEqual(start_value + 5, self.seq.get()) 39 40 def test_remove(self): 41 self.seq = db.DBSequence(self.d, flags=0) 42 self.assertEqual(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE)) 43 self.assertEqual(None, self.seq.remove(txn=None, flags=0)) 44 del self.seq 45 46 def test_get_key(self): 47 self.seq = db.DBSequence(self.d, flags=0) 48 key = 'foo' 49 self.assertEqual(None, self.seq.open(key=key, txn=None, flags=db.DB_CREATE)) 50 self.assertEqual(key, self.seq.get_key()) 51 52 def test_get_dbp(self): 53 self.seq = db.DBSequence(self.d, flags=0) 54 self.assertEqual(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE)) 55 self.assertEqual(self.d, self.seq.get_dbp()) 56 57 def test_cachesize(self): 58 self.seq = db.DBSequence(self.d, flags=0) 59 cashe_size = 10 60 self.assertEqual(None, self.seq.set_cachesize(cashe_size)) 61 self.assertEqual(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE)) 62 self.assertEqual(cashe_size, self.seq.get_cachesize()) 63 64 def test_flags(self): 65 self.seq = db.DBSequence(self.d, flags=0) 66 flag = db.DB_SEQ_WRAP; 67 self.assertEqual(None, self.seq.set_flags(flag)) 68 self.assertEqual(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE)) 69 self.assertEqual(flag, self.seq.get_flags() & flag) 70 71 def test_range(self): 72 self.seq = db.DBSequence(self.d, flags=0) 73 seq_range = (10 * self.int_32_max, 11 * self.int_32_max - 1) 74 self.assertEqual(None, self.seq.set_range(seq_range)) 75 self.seq.initial_value(seq_range[0]) 76 self.assertEqual(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE)) 77 self.assertEqual(seq_range, self.seq.get_range()) 78 79 def test_stat(self): 80 self.seq = db.DBSequence(self.d, flags=0) 81 self.assertEqual(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE)) 82 stat = self.seq.stat() 83 for param in ('nowait', 'min', 'max', 'value', 'current', 84 'flags', 'cache_size', 'last_value', 'wait'): 85 self.assertTrue(param in stat, "parameter %s isn't in stat info" % param) 86 87 if db.version() >= (4,7) : 88 # This code checks a crash solved in Berkeley DB 4.7 89 def test_stat_crash(self) : 90 d=db.DB() 91 d.open(None,dbtype=db.DB_HASH,flags=db.DB_CREATE) # In RAM 92 seq = db.DBSequence(d, flags=0) 93 94 self.assertRaises(db.DBNotFoundError, seq.open, 95 key='id', txn=None, flags=0) 96 97 self.assertRaises(db.DBInvalidArgError, seq.stat) 98 99 d.close() 100 101 def test_64bits(self) : 102 # We don't use both extremes because they are problematic 103 value_plus=(1L<<63)-2 104 self.assertEqual(9223372036854775806L,value_plus) 105 value_minus=(-1L<<63)+1 # Two complement 106 self.assertEqual(-9223372036854775807L,value_minus) 107 self.seq = db.DBSequence(self.d, flags=0) 108 self.assertEqual(None, self.seq.initial_value(value_plus-1)) 109 self.assertEqual(None, self.seq.open(key='id', txn=None, 110 flags=db.DB_CREATE)) 111 self.assertEqual(value_plus-1, self.seq.get(1)) 112 self.assertEqual(value_plus, self.seq.get(1)) 113 114 self.seq.remove(txn=None, flags=0) 115 116 self.seq = db.DBSequence(self.d, flags=0) 117 self.assertEqual(None, self.seq.initial_value(value_minus)) 118 self.assertEqual(None, self.seq.open(key='id', txn=None, 119 flags=db.DB_CREATE)) 120 self.assertEqual(value_minus, self.seq.get(1)) 121 self.assertEqual(value_minus+1, self.seq.get(1)) 122 123 def test_multiple_close(self): 124 self.seq = db.DBSequence(self.d) 125 self.seq.close() # You can close a Sequence multiple times 126 self.seq.close() 127 self.seq.close() 128 129def test_suite(): 130 suite = unittest.TestSuite() 131 suite.addTest(unittest.makeSuite(DBSequenceTest)) 132 return suite 133 134 135if __name__ == '__main__': 136 unittest.main(defaultTest='test_suite') 137