1# Author: Paul Kippes <kippesp@gmail.com>
2
3import unittest
4import sqlite3 as sqlite
5
6class DumpTests(unittest.TestCase):
7    def setUp(self):
8        self.cx = sqlite.connect(":memory:")
9        self.cu = self.cx.cursor()
10
11    def tearDown(self):
12        self.cx.close()
13
14    def CheckTableDump(self):
15        expected_sqls = [
16                """CREATE TABLE "index"("index" blob);"""
17                ,
18                """INSERT INTO "index" VALUES(X'01');"""
19                ,
20                """CREATE TABLE "quoted""table"("quoted""field" text);"""
21                ,
22                """INSERT INTO "quoted""table" VALUES('quoted''value');"""
23                ,
24                "CREATE TABLE t1(id integer primary key, s1 text, " \
25                "t1_i1 integer not null, i2 integer, unique (s1), " \
26                "constraint t1_idx1 unique (i2));"
27                ,
28                "INSERT INTO \"t1\" VALUES(1,'foo',10,20);"
29                ,
30                "INSERT INTO \"t1\" VALUES(2,'foo2',30,30);"
31                ,
32                u"INSERT INTO \"t1\" VALUES(3,'f\xc3\xb6',40,10);"
33                ,
34                "CREATE TABLE t2(id integer, t2_i1 integer, " \
35                "t2_i2 integer, primary key (id)," \
36                "foreign key(t2_i1) references t1(t1_i1));"
37                ,
38                "CREATE TRIGGER trigger_1 update of t1_i1 on t1 " \
39                "begin " \
40                "update t2 set t2_i1 = new.t1_i1 where t2_i1 = old.t1_i1; " \
41                "end;"
42                ,
43                "CREATE VIEW v1 as select * from t1 left join t2 " \
44                "using (id);"
45                ]
46        [self.cu.execute(s) for s in expected_sqls]
47        i = self.cx.iterdump()
48        actual_sqls = [s for s in i]
49        expected_sqls = ['BEGIN TRANSACTION;'] + expected_sqls + \
50            ['COMMIT;']
51        [self.assertEqual(expected_sqls[i], actual_sqls[i])
52            for i in xrange(len(expected_sqls))]
53
54    def CheckUnorderableRow(self):
55        # iterdump() should be able to cope with unorderable row types (issue #15545)
56        class UnorderableRow:
57            def __init__(self, cursor, row):
58                self.row = row
59            def __getitem__(self, index):
60                return self.row[index]
61        self.cx.row_factory = UnorderableRow
62        CREATE_ALPHA = """CREATE TABLE "alpha" ("one");"""
63        CREATE_BETA = """CREATE TABLE "beta" ("two");"""
64        expected = [
65            "BEGIN TRANSACTION;",
66            CREATE_ALPHA,
67            CREATE_BETA,
68            "COMMIT;"
69            ]
70        self.cu.execute(CREATE_BETA)
71        self.cu.execute(CREATE_ALPHA)
72        got = list(self.cx.iterdump())
73        self.assertEqual(expected, got)
74
75def suite():
76    return unittest.TestSuite(unittest.makeSuite(DumpTests, "Check"))
77
78def test():
79    runner = unittest.TextTestRunner()
80    runner.run(suite())
81
82if __name__ == "__main__":
83    test()
84