1"""TestCases for distributed transactions.
2"""
3
4import os
5import time
6import unittest
7
8from test_all import db, test_support, have_threads, verbose, \
9        get_new_environment_path, get_new_database_path
10
11
12#----------------------------------------------------------------------
13
14class DBReplication(unittest.TestCase) :
15    def setUp(self) :
16        self.homeDirMaster = get_new_environment_path()
17        self.homeDirClient = get_new_environment_path()
18
19        self.dbenvMaster = db.DBEnv()
20        self.dbenvClient = db.DBEnv()
21
22        # Must use "DB_THREAD" because the Replication Manager will
23        # be executed in other threads but will use the same environment.
24        # http://forums.oracle.com/forums/thread.jspa?threadID=645788&tstart=0
25        self.dbenvMaster.open(self.homeDirMaster, db.DB_CREATE | db.DB_INIT_TXN
26                | db.DB_INIT_LOG | db.DB_INIT_MPOOL | db.DB_INIT_LOCK |
27                db.DB_INIT_REP | db.DB_RECOVER | db.DB_THREAD, 0666)
28        self.dbenvClient.open(self.homeDirClient, db.DB_CREATE | db.DB_INIT_TXN
29                | db.DB_INIT_LOG | db.DB_INIT_MPOOL | db.DB_INIT_LOCK |
30                db.DB_INIT_REP | db.DB_RECOVER | db.DB_THREAD, 0666)
31
32        self.confirmed_master=self.client_startupdone=False
33        def confirmed_master(a,b,c) :
34            if b==db.DB_EVENT_REP_MASTER :
35                self.confirmed_master=True
36
37        def client_startupdone(a,b,c) :
38            if b==db.DB_EVENT_REP_STARTUPDONE :
39                self.client_startupdone=True
40
41        self.dbenvMaster.set_event_notify(confirmed_master)
42        self.dbenvClient.set_event_notify(client_startupdone)
43
44        #self.dbenvMaster.set_verbose(db.DB_VERB_REPLICATION, True)
45        #self.dbenvMaster.set_verbose(db.DB_VERB_FILEOPS_ALL, True)
46        #self.dbenvClient.set_verbose(db.DB_VERB_REPLICATION, True)
47        #self.dbenvClient.set_verbose(db.DB_VERB_FILEOPS_ALL, True)
48
49        self.dbMaster = self.dbClient = None
50
51
52    def tearDown(self):
53        if self.dbClient :
54            self.dbClient.close()
55        if self.dbMaster :
56            self.dbMaster.close()
57
58        # Here we assign dummy event handlers to allow GC of the test object.
59        # Since the dummy handler doesn't use any outer scope variable, it
60        # doesn't keep any reference to the test object.
61        def dummy(*args) :
62            pass
63        self.dbenvMaster.set_event_notify(dummy)
64        self.dbenvClient.set_event_notify(dummy)
65
66        self.dbenvClient.close()
67        self.dbenvMaster.close()
68        test_support.rmtree(self.homeDirClient)
69        test_support.rmtree(self.homeDirMaster)
70
71class DBReplicationManager(DBReplication) :
72    def test01_basic_replication(self) :
73        master_port = test_support.find_unused_port()
74        client_port = test_support.find_unused_port()
75        if db.version() >= (5, 2) :
76            self.site = self.dbenvMaster.repmgr_site("127.0.0.1", master_port)
77            self.site.set_config(db.DB_GROUP_CREATOR, True)
78            self.site.set_config(db.DB_LOCAL_SITE, True)
79            self.site2 = self.dbenvMaster.repmgr_site("127.0.0.1", client_port)
80
81            self.site3 = self.dbenvClient.repmgr_site("127.0.0.1", master_port)
82            self.site3.set_config(db.DB_BOOTSTRAP_HELPER, True)
83            self.site4 = self.dbenvClient.repmgr_site("127.0.0.1", client_port)
84            self.site4.set_config(db.DB_LOCAL_SITE, True)
85
86            d = {
87                    db.DB_BOOTSTRAP_HELPER: [False, False, True, False],
88                    db.DB_GROUP_CREATOR: [True, False, False, False],
89                    db.DB_LEGACY: [False, False, False, False],
90                    db.DB_LOCAL_SITE: [True, False, False, True],
91                    db.DB_REPMGR_PEER: [False, False, False, False ],
92                }
93
94            for i, j in d.items() :
95                for k, v in \
96                        zip([self.site, self.site2, self.site3, self.site4], j) :
97                    if v :
98                        self.assertTrue(k.get_config(i))
99                    else :
100                        self.assertFalse(k.get_config(i))
101
102            self.assertNotEqual(self.site.get_eid(), self.site2.get_eid())
103            self.assertNotEqual(self.site3.get_eid(), self.site4.get_eid())
104
105            for i, j in zip([self.site, self.site2, self.site3, self.site4], \
106                    [master_port, client_port, master_port, client_port]) :
107                addr = i.get_address()
108                self.assertEqual(addr, ("127.0.0.1", j))
109
110            for i in [self.site, self.site2] :
111                self.assertEqual(i.get_address(),
112                        self.dbenvMaster.repmgr_site_by_eid(i.get_eid()).get_address())
113            for i in [self.site3, self.site4] :
114                self.assertEqual(i.get_address(),
115                        self.dbenvClient.repmgr_site_by_eid(i.get_eid()).get_address())
116        else :
117            self.dbenvMaster.repmgr_set_local_site("127.0.0.1", master_port)
118            self.dbenvClient.repmgr_set_local_site("127.0.0.1", client_port)
119            self.dbenvMaster.repmgr_add_remote_site("127.0.0.1", client_port)
120            self.dbenvClient.repmgr_add_remote_site("127.0.0.1", master_port)
121
122            self.dbenvMaster.rep_set_nsites(2)
123            self.dbenvClient.rep_set_nsites(2)
124
125        self.dbenvMaster.rep_set_priority(10)
126        self.dbenvClient.rep_set_priority(0)
127
128        self.dbenvMaster.rep_set_timeout(db.DB_REP_CONNECTION_RETRY,100123)
129        self.dbenvClient.rep_set_timeout(db.DB_REP_CONNECTION_RETRY,100321)
130        self.assertEqual(self.dbenvMaster.rep_get_timeout(
131            db.DB_REP_CONNECTION_RETRY), 100123)
132        self.assertEqual(self.dbenvClient.rep_get_timeout(
133            db.DB_REP_CONNECTION_RETRY), 100321)
134
135        self.dbenvMaster.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 100234)
136        self.dbenvClient.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 100432)
137        self.assertEqual(self.dbenvMaster.rep_get_timeout(
138            db.DB_REP_ELECTION_TIMEOUT), 100234)
139        self.assertEqual(self.dbenvClient.rep_get_timeout(
140            db.DB_REP_ELECTION_TIMEOUT), 100432)
141
142        self.dbenvMaster.rep_set_timeout(db.DB_REP_ELECTION_RETRY, 100345)
143        self.dbenvClient.rep_set_timeout(db.DB_REP_ELECTION_RETRY, 100543)
144        self.assertEqual(self.dbenvMaster.rep_get_timeout(
145            db.DB_REP_ELECTION_RETRY), 100345)
146        self.assertEqual(self.dbenvClient.rep_get_timeout(
147            db.DB_REP_ELECTION_RETRY), 100543)
148
149        self.dbenvMaster.repmgr_set_ack_policy(db.DB_REPMGR_ACKS_ALL)
150        self.dbenvClient.repmgr_set_ack_policy(db.DB_REPMGR_ACKS_ALL)
151
152        self.dbenvMaster.repmgr_start(1, db.DB_REP_MASTER);
153        self.dbenvClient.repmgr_start(1, db.DB_REP_CLIENT);
154
155        self.assertEqual(self.dbenvMaster.rep_get_nsites(),2)
156        self.assertEqual(self.dbenvClient.rep_get_nsites(),2)
157        self.assertEqual(self.dbenvMaster.rep_get_priority(),10)
158        self.assertEqual(self.dbenvClient.rep_get_priority(),0)
159        self.assertEqual(self.dbenvMaster.repmgr_get_ack_policy(),
160                db.DB_REPMGR_ACKS_ALL)
161        self.assertEqual(self.dbenvClient.repmgr_get_ack_policy(),
162                db.DB_REPMGR_ACKS_ALL)
163
164        # The timeout is necessary in BDB 4.5, since DB_EVENT_REP_STARTUPDONE
165        # is not generated if the master has no new transactions.
166        # This is solved in BDB 4.6 (#15542).
167        import time
168        timeout = time.time()+60
169        while (time.time()<timeout) and not (self.confirmed_master and self.client_startupdone) :
170            time.sleep(0.02)
171        # self.client_startupdone does not always get set to True within
172        # the timeout.  On windows this may be a deep issue, on other
173        # platforms it is likely just a timing issue, especially on slow
174        # virthost buildbots (see issue 3892 for more).  Even though
175        # the timeout triggers, the rest of this test method usually passes
176        # (but not all of it always, see below).  So we just note the
177        # timeout on stderr and keep soldering on.
178        if time.time()>timeout:
179            import sys
180            print >> sys.stderr, ("XXX: timeout happened before"
181                "startup was confirmed - see issue 3892")
182            startup_timeout = True
183
184        d = self.dbenvMaster.repmgr_site_list()
185        self.assertEqual(len(d), 1)
186        d = d.values()[0]  # There is only one
187        self.assertEqual(d[0], "127.0.0.1")
188        self.assertEqual(d[1], client_port)
189        self.assertTrue((d[2]==db.DB_REPMGR_CONNECTED) or \
190                (d[2]==db.DB_REPMGR_DISCONNECTED))
191
192        d = self.dbenvClient.repmgr_site_list()
193        self.assertEqual(len(d), 1)
194        d = d.values()[0]  # There is only one
195        self.assertEqual(d[0], "127.0.0.1")
196        self.assertEqual(d[1], master_port)
197        self.assertTrue((d[2]==db.DB_REPMGR_CONNECTED) or \
198                (d[2]==db.DB_REPMGR_DISCONNECTED))
199
200        if db.version() >= (4,6) :
201            d = self.dbenvMaster.repmgr_stat(flags=db.DB_STAT_CLEAR);
202            self.assertTrue("msgs_queued" in d)
203
204        self.dbMaster=db.DB(self.dbenvMaster)
205        txn=self.dbenvMaster.txn_begin()
206        self.dbMaster.open("test", db.DB_HASH, db.DB_CREATE, 0666, txn=txn)
207        txn.commit()
208
209        import time,os.path
210        timeout=time.time()+10
211        while (time.time()<timeout) and \
212          not (os.path.exists(os.path.join(self.homeDirClient,"test"))) :
213            time.sleep(0.01)
214
215        self.dbClient=db.DB(self.dbenvClient)
216        while True :
217            txn=self.dbenvClient.txn_begin()
218            try :
219                self.dbClient.open("test", db.DB_HASH, flags=db.DB_RDONLY,
220                        mode=0666, txn=txn)
221            except db.DBRepHandleDeadError :
222                txn.abort()
223                self.dbClient.close()
224                self.dbClient=db.DB(self.dbenvClient)
225                continue
226
227            txn.commit()
228            break
229
230        txn=self.dbenvMaster.txn_begin()
231        self.dbMaster.put("ABC", "123", txn=txn)
232        txn.commit()
233        import time
234        timeout=time.time()+10
235        v=None
236        while (time.time()<timeout) and (v is None) :
237            txn=self.dbenvClient.txn_begin()
238            v=self.dbClient.get("ABC", txn=txn)
239            txn.commit()
240            if v is None :
241                time.sleep(0.02)
242        # If startup did not happen before the timeout above, then this test
243        # sometimes fails.  This happens randomly, which causes buildbot
244        # instability, but all the other bsddb tests pass.  Since bsddb3 in the
245        # stdlib is currently not getting active maintenance, and is gone in
246        # py3k, we just skip the end of the test in that case.
247        if time.time()>=timeout and startup_timeout:
248            self.skipTest("replication test skipped due to random failure, "
249                "see issue 3892")
250        self.assertTrue(time.time()<timeout)
251        self.assertEqual("123", v)
252
253        txn=self.dbenvMaster.txn_begin()
254        self.dbMaster.delete("ABC", txn=txn)
255        txn.commit()
256        timeout=time.time()+10
257        while (time.time()<timeout) and (v is not None) :
258            txn=self.dbenvClient.txn_begin()
259            v=self.dbClient.get("ABC", txn=txn)
260            txn.commit()
261            if v is None :
262                time.sleep(0.02)
263        self.assertTrue(time.time()<timeout)
264        self.assertEqual(None, v)
265
266class DBBaseReplication(DBReplication) :
267    def setUp(self) :
268        DBReplication.setUp(self)
269        def confirmed_master(a,b,c) :
270            if (b == db.DB_EVENT_REP_MASTER) or (b == db.DB_EVENT_REP_ELECTED) :
271                self.confirmed_master = True
272
273        def client_startupdone(a,b,c) :
274            if b == db.DB_EVENT_REP_STARTUPDONE :
275                self.client_startupdone = True
276
277        self.dbenvMaster.set_event_notify(confirmed_master)
278        self.dbenvClient.set_event_notify(client_startupdone)
279
280        import Queue
281        self.m2c = Queue.Queue()
282        self.c2m = Queue.Queue()
283
284        # There are only two nodes, so we don't need to
285        # do any routing decision
286        def m2c(dbenv, control, rec, lsnp, envid, flags) :
287            self.m2c.put((control, rec))
288
289        def c2m(dbenv, control, rec, lsnp, envid, flags) :
290            self.c2m.put((control, rec))
291
292        self.dbenvMaster.rep_set_transport(13,m2c)
293        self.dbenvMaster.rep_set_priority(10)
294        self.dbenvClient.rep_set_transport(3,c2m)
295        self.dbenvClient.rep_set_priority(0)
296
297        self.assertEqual(self.dbenvMaster.rep_get_priority(),10)
298        self.assertEqual(self.dbenvClient.rep_get_priority(),0)
299
300        #self.dbenvMaster.set_verbose(db.DB_VERB_REPLICATION, True)
301        #self.dbenvMaster.set_verbose(db.DB_VERB_FILEOPS_ALL, True)
302        #self.dbenvClient.set_verbose(db.DB_VERB_REPLICATION, True)
303        #self.dbenvClient.set_verbose(db.DB_VERB_FILEOPS_ALL, True)
304
305        def thread_master() :
306            return self.thread_do(self.dbenvMaster, self.c2m, 3,
307                    self.master_doing_election, True)
308
309        def thread_client() :
310            return self.thread_do(self.dbenvClient, self.m2c, 13,
311                    self.client_doing_election, False)
312
313        from threading import Thread
314        t_m=Thread(target=thread_master)
315        t_c=Thread(target=thread_client)
316        import sys
317        if sys.version_info[0] < 3 :
318            t_m.setDaemon(True)
319            t_c.setDaemon(True)
320        else :
321            t_m.daemon = True
322            t_c.daemon = True
323
324        self.t_m = t_m
325        self.t_c = t_c
326
327        self.dbMaster = self.dbClient = None
328
329        self.master_doing_election=[False]
330        self.client_doing_election=[False]
331
332
333    def tearDown(self):
334        if self.dbClient :
335            self.dbClient.close()
336        if self.dbMaster :
337            self.dbMaster.close()
338        self.m2c.put(None)
339        self.c2m.put(None)
340        self.t_m.join()
341        self.t_c.join()
342
343        # Here we assign dummy event handlers to allow GC of the test object.
344        # Since the dummy handler doesn't use any outer scope variable, it
345        # doesn't keep any reference to the test object.
346        def dummy(*args) :
347            pass
348        self.dbenvMaster.set_event_notify(dummy)
349        self.dbenvClient.set_event_notify(dummy)
350        self.dbenvMaster.rep_set_transport(13,dummy)
351        self.dbenvClient.rep_set_transport(3,dummy)
352
353        self.dbenvClient.close()
354        self.dbenvMaster.close()
355        test_support.rmtree(self.homeDirClient)
356        test_support.rmtree(self.homeDirMaster)
357
358    def basic_rep_threading(self) :
359        self.dbenvMaster.rep_start(flags=db.DB_REP_MASTER)
360        self.dbenvClient.rep_start(flags=db.DB_REP_CLIENT)
361
362        def thread_do(env, q, envid, election_status, must_be_master) :
363            while True :
364                v=q.get()
365                if v is None : return
366                env.rep_process_message(v[0], v[1], envid)
367
368        self.thread_do = thread_do
369
370        self.t_m.start()
371        self.t_c.start()
372
373    def test01_basic_replication(self) :
374        self.basic_rep_threading()
375
376        # The timeout is necessary in BDB 4.5, since DB_EVENT_REP_STARTUPDONE
377        # is not generated if the master has no new transactions.
378        # This is solved in BDB 4.6 (#15542).
379        import time
380        timeout = time.time()+60
381        while (time.time()<timeout) and not (self.confirmed_master and
382                self.client_startupdone) :
383            time.sleep(0.02)
384        self.assertTrue(time.time()<timeout)
385
386        self.dbMaster=db.DB(self.dbenvMaster)
387        txn=self.dbenvMaster.txn_begin()
388        self.dbMaster.open("test", db.DB_HASH, db.DB_CREATE, 0666, txn=txn)
389        txn.commit()
390
391        import time,os.path
392        timeout=time.time()+10
393        while (time.time()<timeout) and \
394          not (os.path.exists(os.path.join(self.homeDirClient,"test"))) :
395            time.sleep(0.01)
396
397        self.dbClient=db.DB(self.dbenvClient)
398        while True :
399            txn=self.dbenvClient.txn_begin()
400            try :
401                self.dbClient.open("test", db.DB_HASH, flags=db.DB_RDONLY,
402                        mode=0666, txn=txn)
403            except db.DBRepHandleDeadError :
404                txn.abort()
405                self.dbClient.close()
406                self.dbClient=db.DB(self.dbenvClient)
407                continue
408
409            txn.commit()
410            break
411
412        d = self.dbenvMaster.rep_stat(flags=db.DB_STAT_CLEAR);
413        self.assertTrue("master_changes" in d)
414
415        txn=self.dbenvMaster.txn_begin()
416        self.dbMaster.put("ABC", "123", txn=txn)
417        txn.commit()
418        import time
419        timeout=time.time()+10
420        v=None
421        while (time.time()<timeout) and (v is None) :
422            txn=self.dbenvClient.txn_begin()
423            v=self.dbClient.get("ABC", txn=txn)
424            txn.commit()
425            if v is None :
426                time.sleep(0.02)
427        self.assertTrue(time.time()<timeout)
428        self.assertEqual("123", v)
429
430        txn=self.dbenvMaster.txn_begin()
431        self.dbMaster.delete("ABC", txn=txn)
432        txn.commit()
433        timeout=time.time()+10
434        while (time.time()<timeout) and (v is not None) :
435            txn=self.dbenvClient.txn_begin()
436            v=self.dbClient.get("ABC", txn=txn)
437            txn.commit()
438            if v is None :
439                time.sleep(0.02)
440        self.assertTrue(time.time()<timeout)
441        self.assertEqual(None, v)
442
443    if db.version() >= (4,7) :
444        def test02_test_request(self) :
445            self.basic_rep_threading()
446            (minimum, maximum) = self.dbenvClient.rep_get_request()
447            self.dbenvClient.rep_set_request(minimum-1, maximum+1)
448            self.assertEqual(self.dbenvClient.rep_get_request(),
449                    (minimum-1, maximum+1))
450
451    if db.version() >= (4,6) :
452        def test03_master_election(self) :
453            # Get ready to hold an election
454            #self.dbenvMaster.rep_start(flags=db.DB_REP_MASTER)
455            self.dbenvMaster.rep_start(flags=db.DB_REP_CLIENT)
456            self.dbenvClient.rep_start(flags=db.DB_REP_CLIENT)
457
458            def thread_do(env, q, envid, election_status, must_be_master) :
459                while True :
460                    v=q.get()
461                    if v is None : return
462                    r = env.rep_process_message(v[0],v[1],envid)
463                    if must_be_master and self.confirmed_master :
464                        self.dbenvMaster.rep_start(flags = db.DB_REP_MASTER)
465                        must_be_master = False
466
467                    if r[0] == db.DB_REP_HOLDELECTION :
468                        def elect() :
469                            while True :
470                                try :
471                                    env.rep_elect(2, 1)
472                                    election_status[0] = False
473                                    break
474                                except db.DBRepUnavailError :
475                                    pass
476                        if not election_status[0] and not self.confirmed_master :
477                            from threading import Thread
478                            election_status[0] = True
479                            t=Thread(target=elect)
480                            import sys
481                            if sys.version_info[0] < 3 :
482                                t.setDaemon(True)
483                            else :
484                                t.daemon = True
485                            t.start()
486
487            self.thread_do = thread_do
488
489            self.t_m.start()
490            self.t_c.start()
491
492            self.dbenvMaster.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 50000)
493            self.dbenvClient.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 50000)
494            self.client_doing_election[0] = True
495            while True :
496                try :
497                    self.dbenvClient.rep_elect(2, 1)
498                    self.client_doing_election[0] = False
499                    break
500                except db.DBRepUnavailError :
501                    pass
502
503            self.assertTrue(self.confirmed_master)
504
505            # Race condition showed up after upgrading to Solaris 10 Update 10
506            # https://forums.oracle.com/forums/thread.jspa?messageID=9902860
507            # jcea@jcea.es: See private email from Paula Bingham (Oracle),
508            # in 20110929.
509            while not (self.dbenvClient.rep_stat()["startup_complete"]) :
510                pass
511
512    if db.version() >= (4,7) :
513        def test04_test_clockskew(self) :
514            fast, slow = 1234, 1230
515            self.dbenvMaster.rep_set_clockskew(fast, slow)
516            self.assertEqual((fast, slow),
517                    self.dbenvMaster.rep_get_clockskew())
518            self.basic_rep_threading()
519
520#----------------------------------------------------------------------
521
522def test_suite():
523    suite = unittest.TestSuite()
524    if db.version() >= (4, 6) :
525        dbenv = db.DBEnv()
526        try :
527            dbenv.repmgr_get_ack_policy()
528            ReplicationManager_available=True
529        except :
530            ReplicationManager_available=False
531        dbenv.close()
532        del dbenv
533        if ReplicationManager_available :
534            suite.addTest(unittest.makeSuite(DBReplicationManager))
535
536        if have_threads :
537            suite.addTest(unittest.makeSuite(DBBaseReplication))
538
539    return suite
540
541
542if __name__ == '__main__':
543    unittest.main(defaultTest='test_suite')
544