1#-*- coding: iso-8859-1 -*-
2# pysqlite2/test/regression.py: pysqlite regression tests
3#
4# Copyright (C) 2006-2007 Gerhard H�ring <gh@ghaering.de>
5#
6# This file is part of pysqlite.
7#
8# This software is provided 'as-is', without any express or implied
9# warranty.  In no event will the authors be held liable for any damages
10# arising from the use of this software.
11#
12# Permission is granted to anyone to use this software for any purpose,
13# including commercial applications, and to alter it and redistribute it
14# freely, subject to the following restrictions:
15#
16# 1. The origin of this software must not be misrepresented; you must not
17#    claim that you wrote the original software. If you use this software
18#    in a product, an acknowledgment in the product documentation would be
19#    appreciated but is not required.
20# 2. Altered source versions must be plainly marked as such, and must not be
21#    misrepresented as being the original software.
22# 3. This notice may not be removed or altered from any source distribution.
23
24import datetime
25import unittest
26import sqlite3 as sqlite
27
28class RegressionTests(unittest.TestCase):
29    def setUp(self):
30        self.con = sqlite.connect(":memory:")
31
32    def tearDown(self):
33        self.con.close()
34
35    def CheckPragmaUserVersion(self):
36        # This used to crash pysqlite because this pragma command returns NULL for the column name
37        cur = self.con.cursor()
38        cur.execute("pragma user_version")
39
40    def CheckPragmaSchemaVersion(self):
41        # This still crashed pysqlite <= 2.2.1
42        con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES)
43        try:
44            cur = self.con.cursor()
45            cur.execute("pragma schema_version")
46        finally:
47            cur.close()
48            con.close()
49
50    def CheckStatementReset(self):
51        # pysqlite 2.1.0 to 2.2.0 have the problem that not all statements are
52        # reset before a rollback, but only those that are still in the
53        # statement cache. The others are not accessible from the connection object.
54        con = sqlite.connect(":memory:", cached_statements=5)
55        cursors = [con.cursor() for x in xrange(5)]
56        cursors[0].execute("create table test(x)")
57        for i in range(10):
58            cursors[0].executemany("insert into test(x) values (?)", [(x,) for x in xrange(10)])
59
60        for i in range(5):
61            cursors[i].execute(" " * i + "select x from test")
62
63        con.rollback()
64
65    def CheckColumnNameWithSpaces(self):
66        cur = self.con.cursor()
67        cur.execute('select 1 as "foo bar [datetime]"')
68        self.assertEqual(cur.description[0][0], "foo bar")
69
70        cur.execute('select 1 as "foo baz"')
71        self.assertEqual(cur.description[0][0], "foo baz")
72
73    def CheckStatementFinalizationOnCloseDb(self):
74        # pysqlite versions <= 2.3.3 only finalized statements in the statement
75        # cache when closing the database. statements that were still
76        # referenced in cursors weren't closed an could provoke "
77        # "OperationalError: Unable to close due to unfinalised statements".
78        con = sqlite.connect(":memory:")
79        cursors = []
80        # default statement cache size is 100
81        for i in range(105):
82            cur = con.cursor()
83            cursors.append(cur)
84            cur.execute("select 1 x union select " + str(i))
85        con.close()
86
87    def CheckOnConflictRollback(self):
88        if sqlite.sqlite_version_info < (3, 2, 2):
89            return
90        con = sqlite.connect(":memory:")
91        con.execute("create table foo(x, unique(x) on conflict rollback)")
92        con.execute("insert into foo(x) values (1)")
93        try:
94            con.execute("insert into foo(x) values (1)")
95        except sqlite.DatabaseError:
96            pass
97        con.execute("insert into foo(x) values (2)")
98        try:
99            con.commit()
100        except sqlite.OperationalError:
101            self.fail("pysqlite knew nothing about the implicit ROLLBACK")
102
103    def CheckWorkaroundForBuggySqliteTransferBindings(self):
104        """
105        pysqlite would crash with older SQLite versions unless
106        a workaround is implemented.
107        """
108        self.con.execute("create table foo(bar)")
109        self.con.execute("drop table foo")
110        self.con.execute("create table foo(bar)")
111
112    def CheckEmptyStatement(self):
113        """
114        pysqlite used to segfault with SQLite versions 3.5.x. These return NULL
115        for "no-operation" statements
116        """
117        self.con.execute("")
118
119    def CheckUnicodeConnect(self):
120        """
121        With pysqlite 2.4.0 you needed to use a string or a APSW connection
122        object for opening database connections.
123
124        Formerly, both bytestrings and unicode strings used to work.
125
126        Let's make sure unicode strings work in the future.
127        """
128        con = sqlite.connect(u":memory:")
129        con.close()
130
131    def CheckTypeMapUsage(self):
132        """
133        pysqlite until 2.4.1 did not rebuild the row_cast_map when recompiling
134        a statement. This test exhibits the problem.
135        """
136        SELECT = "select * from foo"
137        con = sqlite.connect(":memory:",detect_types=sqlite.PARSE_DECLTYPES)
138        con.execute("create table foo(bar timestamp)")
139        con.execute("insert into foo(bar) values (?)", (datetime.datetime.now(),))
140        con.execute(SELECT)
141        con.execute("drop table foo")
142        con.execute("create table foo(bar integer)")
143        con.execute("insert into foo(bar) values (5)")
144        con.execute(SELECT)
145
146    def CheckRegisterAdapter(self):
147        """
148        See issue 3312.
149        """
150        self.assertRaises(TypeError, sqlite.register_adapter, {}, None)
151
152    def CheckSetIsolationLevel(self):
153        """
154        See issue 3312.
155        """
156        con = sqlite.connect(":memory:")
157        self.assertRaises(UnicodeEncodeError, setattr, con,
158                          "isolation_level", u"\xe9")
159
160    def CheckCursorConstructorCallCheck(self):
161        """
162        Verifies that cursor methods check wether base class __init__ was called.
163        """
164        class Cursor(sqlite.Cursor):
165            def __init__(self, con):
166                pass
167
168        con = sqlite.connect(":memory:")
169        cur = Cursor(con)
170        try:
171            cur.execute("select 4+5").fetchall()
172            self.fail("should have raised ProgrammingError")
173        except sqlite.ProgrammingError:
174            pass
175        except:
176            self.fail("should have raised ProgrammingError")
177
178    def CheckConnectionConstructorCallCheck(self):
179        """
180        Verifies that connection methods check wether base class __init__ was called.
181        """
182        class Connection(sqlite.Connection):
183            def __init__(self, name):
184                pass
185
186        con = Connection(":memory:")
187        try:
188            cur = con.cursor()
189            self.fail("should have raised ProgrammingError")
190        except sqlite.ProgrammingError:
191            pass
192        except:
193            self.fail("should have raised ProgrammingError")
194
195    def CheckCursorRegistration(self):
196        """
197        Verifies that subclassed cursor classes are correctly registered with
198        the connection object, too.  (fetch-across-rollback problem)
199        """
200        class Connection(sqlite.Connection):
201            def cursor(self):
202                return Cursor(self)
203
204        class Cursor(sqlite.Cursor):
205            def __init__(self, con):
206                sqlite.Cursor.__init__(self, con)
207
208        con = Connection(":memory:")
209        cur = con.cursor()
210        cur.execute("create table foo(x)")
211        cur.executemany("insert into foo(x) values (?)", [(3,), (4,), (5,)])
212        cur.execute("select x from foo")
213        con.rollback()
214        try:
215            cur.fetchall()
216            self.fail("should have raised InterfaceError")
217        except sqlite.InterfaceError:
218            pass
219        except:
220            self.fail("should have raised InterfaceError")
221
222    def CheckAutoCommit(self):
223        """
224        Verifies that creating a connection in autocommit mode works.
225        2.5.3 introduced a regression so that these could no longer
226        be created.
227        """
228        con = sqlite.connect(":memory:", isolation_level=None)
229
230    def CheckPragmaAutocommit(self):
231        """
232        Verifies that running a PRAGMA statement that does an autocommit does
233        work. This did not work in 2.5.3/2.5.4.
234        """
235        cur = self.con.cursor()
236        cur.execute("create table foo(bar)")
237        cur.execute("insert into foo(bar) values (5)")
238
239        cur.execute("pragma page_size")
240        row = cur.fetchone()
241
242    def CheckSetDict(self):
243        """
244        See http://bugs.python.org/issue7478
245
246        It was possible to successfully register callbacks that could not be
247        hashed. Return codes of PyDict_SetItem were not checked properly.
248        """
249        class NotHashable:
250            def __call__(self, *args, **kw):
251                pass
252            def __hash__(self):
253                raise TypeError()
254        var = NotHashable()
255        self.assertRaises(TypeError, self.con.create_function, var)
256        self.assertRaises(TypeError, self.con.create_aggregate, var)
257        self.assertRaises(TypeError, self.con.set_authorizer, var)
258        self.assertRaises(TypeError, self.con.set_progress_handler, var)
259
260    def CheckConnectionCall(self):
261        """
262        Call a connection with a non-string SQL request: check error handling
263        of the statement constructor.
264        """
265        self.assertRaises(sqlite.Warning, self.con, 1)
266
267    def CheckRecursiveCursorUse(self):
268        """
269        http://bugs.python.org/issue10811
270
271        Recursively using a cursor, such as when reusing it from a generator led to segfaults.
272        Now we catch recursive cursor usage and raise a ProgrammingError.
273        """
274        con = sqlite.connect(":memory:")
275
276        cur = con.cursor()
277        cur.execute("create table a (bar)")
278        cur.execute("create table b (baz)")
279
280        def foo():
281            cur.execute("insert into a (bar) values (?)", (1,))
282            yield 1
283
284        with self.assertRaises(sqlite.ProgrammingError):
285            cur.executemany("insert into b (baz) values (?)",
286                            ((i,) for i in foo()))
287
288    def CheckConvertTimestampMicrosecondPadding(self):
289        """
290        http://bugs.python.org/issue14720
291
292        The microsecond parsing of convert_timestamp() should pad with zeros,
293        since the microsecond string "456" actually represents "456000".
294        """
295
296        con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_DECLTYPES)
297        cur = con.cursor()
298        cur.execute("CREATE TABLE t (x TIMESTAMP)")
299
300        # Microseconds should be 456000
301        cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.456')")
302
303        # Microseconds should be truncated to 123456
304        cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.123456789')")
305
306        cur.execute("SELECT * FROM t")
307        values = [x[0] for x in cur.fetchall()]
308
309        self.assertEqual(values, [
310            datetime.datetime(2012, 4, 4, 15, 6, 0, 456000),
311            datetime.datetime(2012, 4, 4, 15, 6, 0, 123456),
312        ])
313
314
315def suite():
316    regression_suite = unittest.makeSuite(RegressionTests, "Check")
317    return unittest.TestSuite((regression_suite,))
318
319def test():
320    runner = unittest.TextTestRunner()
321    runner.run(suite())
322
323if __name__ == "__main__":
324    test()
325