1#-*- coding: iso-8859-1 -*-
2# pysqlite2/test/regression.py: pysqlite regression tests
3#
4# Copyright (C) 2006-2007 Gerhard H�ring <gh@ghaering.de>
5#
6# This file is part of pysqlite.
7#
8# This software is provided 'as-is', without any express or implied
9# warranty.  In no event will the authors be held liable for any damages
10# arising from the use of this software.
11#
12# Permission is granted to anyone to use this software for any purpose,
13# including commercial applications, and to alter it and redistribute it
14# freely, subject to the following restrictions:
15#
16# 1. The origin of this software must not be misrepresented; you must not
17#    claim that you wrote the original software. If you use this software
18#    in a product, an acknowledgment in the product documentation would be
19#    appreciated but is not required.
20# 2. Altered source versions must be plainly marked as such, and must not be
21#    misrepresented as being the original software.
22# 3. This notice may not be removed or altered from any source distribution.
23
24import datetime
25import unittest
26import sqlite3 as sqlite
27
28class RegressionTests(unittest.TestCase):
29    def setUp(self):
30        self.con = sqlite.connect(":memory:")
31
32    def tearDown(self):
33        self.con.close()
34
35    def CheckPragmaUserVersion(self):
36        # This used to crash pysqlite because this pragma command returns NULL for the column name
37        cur = self.con.cursor()
38        cur.execute("pragma user_version")
39
40    def CheckPragmaSchemaVersion(self):
41        # This still crashed pysqlite <= 2.2.1
42        con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES)
43        try:
44            cur = self.con.cursor()
45            cur.execute("pragma schema_version")
46        finally:
47            cur.close()
48            con.close()
49
50    def CheckStatementReset(self):
51        # pysqlite 2.1.0 to 2.2.0 have the problem that not all statements are
52        # reset before a rollback, but only those that are still in the
53        # statement cache. The others are not accessible from the connection object.
54        con = sqlite.connect(":memory:", cached_statements=5)
55        cursors = [con.cursor() for x in xrange(5)]
56        cursors[0].execute("create table test(x)")
57        for i in range(10):
58            cursors[0].executemany("insert into test(x) values (?)", [(x,) for x in xrange(10)])
59
60        for i in range(5):
61            cursors[i].execute(" " * i + "select x from test")
62
63        con.rollback()
64
65    def CheckColumnNameWithSpaces(self):
66        cur = self.con.cursor()
67        cur.execute('select 1 as "foo bar [datetime]"')
68        self.assertEqual(cur.description[0][0], "foo bar")
69
70        cur.execute('select 1 as "foo baz"')
71        self.assertEqual(cur.description[0][0], "foo baz")
72
73    def CheckStatementFinalizationOnCloseDb(self):
74        # pysqlite versions <= 2.3.3 only finalized statements in the statement
75        # cache when closing the database. statements that were still
76        # referenced in cursors weren't closed and could provoke "
77        # "OperationalError: Unable to close due to unfinalised statements".
78        con = sqlite.connect(":memory:")
79        cursors = []
80        # default statement cache size is 100
81        for i in range(105):
82            cur = con.cursor()
83            cursors.append(cur)
84            cur.execute("select 1 x union select " + str(i))
85        con.close()
86
87    def CheckOnConflictRollback(self):
88        if sqlite.sqlite_version_info < (3, 2, 2):
89            return
90        con = sqlite.connect(":memory:")
91        con.execute("create table foo(x, unique(x) on conflict rollback)")
92        con.execute("insert into foo(x) values (1)")
93        try:
94            con.execute("insert into foo(x) values (1)")
95        except sqlite.DatabaseError:
96            pass
97        con.execute("insert into foo(x) values (2)")
98        try:
99            con.commit()
100        except sqlite.OperationalError:
101            self.fail("pysqlite knew nothing about the implicit ROLLBACK")
102
103    def CheckWorkaroundForBuggySqliteTransferBindings(self):
104        """
105        pysqlite would crash with older SQLite versions unless
106        a workaround is implemented.
107        """
108        self.con.execute("create table foo(bar)")
109        self.con.execute("drop table foo")
110        self.con.execute("create table foo(bar)")
111
112    def CheckEmptyStatement(self):
113        """
114        pysqlite used to segfault with SQLite versions 3.5.x. These return NULL
115        for "no-operation" statements
116        """
117        self.con.execute("")
118
119    def CheckUnicodeConnect(self):
120        """
121        With pysqlite 2.4.0 you needed to use a string or an APSW connection
122        object for opening database connections.
123
124        Formerly, both bytestrings and unicode strings used to work.
125
126        Let's make sure unicode strings work in the future.
127        """
128        con = sqlite.connect(u":memory:")
129        con.close()
130
131    def CheckTypeMapUsage(self):
132        """
133        pysqlite until 2.4.1 did not rebuild the row_cast_map when recompiling
134        a statement. This test exhibits the problem.
135        """
136        SELECT = "select * from foo"
137        con = sqlite.connect(":memory:",detect_types=sqlite.PARSE_DECLTYPES)
138        con.execute("create table foo(bar timestamp)")
139        con.execute("insert into foo(bar) values (?)", (datetime.datetime.now(),))
140        con.execute(SELECT)
141        con.execute("drop table foo")
142        con.execute("create table foo(bar integer)")
143        con.execute("insert into foo(bar) values (5)")
144        con.execute(SELECT)
145
146    def CheckRegisterAdapter(self):
147        """
148        See issue 3312.
149        """
150        self.assertRaises(TypeError, sqlite.register_adapter, {}, None)
151
152    def CheckSetIsolationLevel(self):
153        """
154        See issue 3312.
155        """
156        con = sqlite.connect(":memory:")
157        self.assertRaises(UnicodeEncodeError, setattr, con,
158                          "isolation_level", u"\xe9")
159
160    def CheckCursorConstructorCallCheck(self):
161        """
162        Verifies that cursor methods check whether base class __init__ was
163        called.
164        """
165        class Cursor(sqlite.Cursor):
166            def __init__(self, con):
167                pass
168
169        con = sqlite.connect(":memory:")
170        cur = Cursor(con)
171        try:
172            cur.execute("select 4+5").fetchall()
173            self.fail("should have raised ProgrammingError")
174        except sqlite.ProgrammingError:
175            pass
176        except:
177            self.fail("should have raised ProgrammingError")
178
179    def CheckConnectionConstructorCallCheck(self):
180        """
181        Verifies that connection methods check whether base class __init__ was
182        called.
183        """
184        class Connection(sqlite.Connection):
185            def __init__(self, name):
186                pass
187
188        con = Connection(":memory:")
189        try:
190            cur = con.cursor()
191            self.fail("should have raised ProgrammingError")
192        except sqlite.ProgrammingError:
193            pass
194        except:
195            self.fail("should have raised ProgrammingError")
196
197    def CheckCursorRegistration(self):
198        """
199        Verifies that subclassed cursor classes are correctly registered with
200        the connection object, too.  (fetch-across-rollback problem)
201        """
202        class Connection(sqlite.Connection):
203            def cursor(self):
204                return Cursor(self)
205
206        class Cursor(sqlite.Cursor):
207            def __init__(self, con):
208                sqlite.Cursor.__init__(self, con)
209
210        con = Connection(":memory:")
211        cur = con.cursor()
212        cur.execute("create table foo(x)")
213        cur.executemany("insert into foo(x) values (?)", [(3,), (4,), (5,)])
214        cur.execute("select x from foo")
215        con.rollback()
216        try:
217            cur.fetchall()
218            self.fail("should have raised InterfaceError")
219        except sqlite.InterfaceError:
220            pass
221        except:
222            self.fail("should have raised InterfaceError")
223
224    def CheckAutoCommit(self):
225        """
226        Verifies that creating a connection in autocommit mode works.
227        2.5.3 introduced a regression so that these could no longer
228        be created.
229        """
230        con = sqlite.connect(":memory:", isolation_level=None)
231
232    def CheckPragmaAutocommit(self):
233        """
234        Verifies that running a PRAGMA statement that does an autocommit does
235        work. This did not work in 2.5.3/2.5.4.
236        """
237        cur = self.con.cursor()
238        cur.execute("create table foo(bar)")
239        cur.execute("insert into foo(bar) values (5)")
240
241        cur.execute("pragma page_size")
242        row = cur.fetchone()
243
244    def CheckSetDict(self):
245        """
246        See http://bugs.python.org/issue7478
247
248        It was possible to successfully register callbacks that could not be
249        hashed. Return codes of PyDict_SetItem were not checked properly.
250        """
251        class NotHashable:
252            def __call__(self, *args, **kw):
253                pass
254            def __hash__(self):
255                raise TypeError()
256        var = NotHashable()
257        self.assertRaises(TypeError, self.con.create_function, var)
258        self.assertRaises(TypeError, self.con.create_aggregate, var)
259        self.assertRaises(TypeError, self.con.set_authorizer, var)
260        self.assertRaises(TypeError, self.con.set_progress_handler, var)
261
262    def CheckConnectionCall(self):
263        """
264        Call a connection with a non-string SQL request: check error handling
265        of the statement constructor.
266        """
267        self.assertRaises(sqlite.Warning, self.con, 1)
268
269    def CheckRecursiveCursorUse(self):
270        """
271        http://bugs.python.org/issue10811
272
273        Recursively using a cursor, such as when reusing it from a generator led to segfaults.
274        Now we catch recursive cursor usage and raise a ProgrammingError.
275        """
276        con = sqlite.connect(":memory:")
277
278        cur = con.cursor()
279        cur.execute("create table a (bar)")
280        cur.execute("create table b (baz)")
281
282        def foo():
283            cur.execute("insert into a (bar) values (?)", (1,))
284            yield 1
285
286        with self.assertRaises(sqlite.ProgrammingError):
287            cur.executemany("insert into b (baz) values (?)",
288                            ((i,) for i in foo()))
289
290    def CheckConvertTimestampMicrosecondPadding(self):
291        """
292        http://bugs.python.org/issue14720
293
294        The microsecond parsing of convert_timestamp() should pad with zeros,
295        since the microsecond string "456" actually represents "456000".
296        """
297
298        con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_DECLTYPES)
299        cur = con.cursor()
300        cur.execute("CREATE TABLE t (x TIMESTAMP)")
301
302        # Microseconds should be 456000
303        cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.456')")
304
305        # Microseconds should be truncated to 123456
306        cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.123456789')")
307
308        cur.execute("SELECT * FROM t")
309        values = [x[0] for x in cur.fetchall()]
310
311        self.assertEqual(values, [
312            datetime.datetime(2012, 4, 4, 15, 6, 0, 456000),
313            datetime.datetime(2012, 4, 4, 15, 6, 0, 123456),
314        ])
315
316    def CheckInvalidIsolationLevelType(self):
317        # isolation level is a string, not an integer
318        self.assertRaises(TypeError,
319                          sqlite.connect, ":memory:", isolation_level=123)
320
321
322    def CheckNullCharacter(self):
323        # Issue #21147
324        con = sqlite.connect(":memory:")
325        self.assertRaises(ValueError, con, "\0select 1")
326        self.assertRaises(ValueError, con, "select 1\0")
327        cur = con.cursor()
328        self.assertRaises(ValueError, cur.execute, " \0select 2")
329        self.assertRaises(ValueError, cur.execute, "select 2\0")
330
331    def CheckCommitCursorReset(self):
332        """
333        Connection.commit() did reset cursors, which made sqlite3
334        to return rows multiple times when fetched from cursors
335        after commit. See issues 10513 and 23129 for details.
336        """
337        con = sqlite.connect(":memory:")
338        con.executescript("""
339        create table t(c);
340        create table t2(c);
341        insert into t values(0);
342        insert into t values(1);
343        insert into t values(2);
344        """)
345
346        self.assertEqual(con.isolation_level, "")
347
348        counter = 0
349        for i, row in enumerate(con.execute("select c from t")):
350            con.execute("insert into t2(c) values (?)", (i,))
351            con.commit()
352            if counter == 0:
353                self.assertEqual(row[0], 0)
354            elif counter == 1:
355                self.assertEqual(row[0], 1)
356            elif counter == 2:
357                self.assertEqual(row[0], 2)
358            counter += 1
359        self.assertEqual(counter, 3, "should have returned exactly three rows")
360
361
362def suite():
363    regression_suite = unittest.makeSuite(RegressionTests, "Check")
364    return unittest.TestSuite((regression_suite,))
365
366def test():
367    runner = unittest.TextTestRunner()
368    runner.run(suite())
369
370if __name__ == "__main__":
371    test()
372