source: OpenRLabs-Git/deploy/rlabs-docker/web2py-rlabs/gluon/contrib/pymysql/tests/test_issues.py

main
Last change on this file was 42bd667, checked in by David Fuertes <dfuertes@…>, 4 years ago

Historial Limpio

  • Property mode set to 100755
File size: 19.8 KB
Line 
1import datetime
2import time
3import warnings
4import sys
5
6import pymysql
7from pymysql import cursors
8from pymysql._compat import text_type
9from pymysql.tests import base
10import unittest2
11
12try:
13    import imp
14    reload = imp.reload
15except AttributeError:
16    pass
17
18
19__all__ = ["TestOldIssues", "TestNewIssues", "TestGitHubIssues"]
20
21class TestOldIssues(base.PyMySQLTestCase):
22    def test_issue_3(self):
23        """ undefined methods datetime_or_None, date_or_None """
24        conn = self.connections[0]
25        c = conn.cursor()
26        with warnings.catch_warnings():
27            warnings.filterwarnings("ignore")
28            c.execute("drop table if exists issue3")
29        c.execute("create table issue3 (d date, t time, dt datetime, ts timestamp)")
30        try:
31            c.execute("insert into issue3 (d, t, dt, ts) values (%s,%s,%s,%s)", (None, None, None, None))
32            c.execute("select d from issue3")
33            self.assertEqual(None, c.fetchone()[0])
34            c.execute("select t from issue3")
35            self.assertEqual(None, c.fetchone()[0])
36            c.execute("select dt from issue3")
37            self.assertEqual(None, c.fetchone()[0])
38            c.execute("select ts from issue3")
39            self.assertTrue(isinstance(c.fetchone()[0], datetime.datetime))
40        finally:
41            c.execute("drop table issue3")
42
43    def test_issue_4(self):
44        """ can't retrieve TIMESTAMP fields """
45        conn = self.connections[0]
46        c = conn.cursor()
47        with warnings.catch_warnings():
48            warnings.filterwarnings("ignore")
49            c.execute("drop table if exists issue4")
50        c.execute("create table issue4 (ts timestamp)")
51        try:
52            c.execute("insert into issue4 (ts) values (now())")
53            c.execute("select ts from issue4")
54            self.assertTrue(isinstance(c.fetchone()[0], datetime.datetime))
55        finally:
56            c.execute("drop table issue4")
57
58    def test_issue_5(self):
59        """ query on information_schema.tables fails """
60        con = self.connections[0]
61        cur = con.cursor()
62        cur.execute("select * from information_schema.tables")
63
64    def test_issue_6(self):
65        """ exception: TypeError: ord() expected a character, but string of length 0 found """
66        # ToDo: this test requires access to db 'mysql'.
67        kwargs = self.databases[0].copy()
68        kwargs['db'] = "mysql"
69        conn = pymysql.connect(**kwargs)
70        c = conn.cursor()
71        c.execute("select * from user")
72        conn.close()
73
74    def test_issue_8(self):
75        """ Primary Key and Index error when selecting data """
76        conn = self.connections[0]
77        c = conn.cursor()
78        with warnings.catch_warnings():
79            warnings.filterwarnings("ignore")
80            c.execute("drop table if exists test")
81        c.execute("""CREATE TABLE `test` (`station` int(10) NOT NULL DEFAULT '0', `dh`
82datetime NOT NULL DEFAULT '2015-01-01 00:00:00', `echeance` int(1) NOT NULL
83DEFAULT '0', `me` double DEFAULT NULL, `mo` double DEFAULT NULL, PRIMARY
84KEY (`station`,`dh`,`echeance`)) ENGINE=MyISAM DEFAULT CHARSET=latin1;""")
85        try:
86            self.assertEqual(0, c.execute("SELECT * FROM test"))
87            c.execute("ALTER TABLE `test` ADD INDEX `idx_station` (`station`)")
88            self.assertEqual(0, c.execute("SELECT * FROM test"))
89        finally:
90            c.execute("drop table test")
91
92    def test_issue_9(self):
93        """ sets DeprecationWarning in Python 2.6 """
94        try:
95            reload(pymysql)
96        except DeprecationWarning:
97            self.fail()
98
99    def test_issue_13(self):
100        """ can't handle large result fields """
101        conn = self.connections[0]
102        cur = conn.cursor()
103        with warnings.catch_warnings():
104            warnings.filterwarnings("ignore")
105            cur.execute("drop table if exists issue13")
106        try:
107            cur.execute("create table issue13 (t text)")
108            # ticket says 18k
109            size = 18*1024
110            cur.execute("insert into issue13 (t) values (%s)", ("x" * size,))
111            cur.execute("select t from issue13")
112            # use assertTrue so that obscenely huge error messages don't print
113            r = cur.fetchone()[0]
114            self.assertTrue("x" * size == r)
115        finally:
116            cur.execute("drop table issue13")
117
118    def test_issue_15(self):
119        """ query should be expanded before perform character encoding """
120        conn = self.connections[0]
121        c = conn.cursor()
122        with warnings.catch_warnings():
123            warnings.filterwarnings("ignore")
124            c.execute("drop table if exists issue15")
125        c.execute("create table issue15 (t varchar(32))")
126        try:
127            c.execute("insert into issue15 (t) values (%s)", (u'\xe4\xf6\xfc',))
128            c.execute("select t from issue15")
129            self.assertEqual(u'\xe4\xf6\xfc', c.fetchone()[0])
130        finally:
131            c.execute("drop table issue15")
132
133    def test_issue_16(self):
134        """ Patch for string and tuple escaping """
135        conn = self.connections[0]
136        c = conn.cursor()
137        with warnings.catch_warnings():
138            warnings.filterwarnings("ignore")
139            c.execute("drop table if exists issue16")
140        c.execute("create table issue16 (name varchar(32) primary key, email varchar(32))")
141        try:
142            c.execute("insert into issue16 (name, email) values ('pete', 'floydophone')")
143            c.execute("select email from issue16 where name=%s", ("pete",))
144            self.assertEqual("floydophone", c.fetchone()[0])
145        finally:
146            c.execute("drop table issue16")
147
148    @unittest2.skip("test_issue_17() requires a custom, legacy MySQL configuration and will not be run.")
149    def test_issue_17(self):
150        """could not connect mysql use passwod"""
151        conn = self.connections[0]
152        host = self.databases[0]["host"]
153        db = self.databases[0]["db"]
154        c = conn.cursor()
155
156        # grant access to a table to a user with a password
157        try:
158            with warnings.catch_warnings():
159                warnings.filterwarnings("ignore")
160                c.execute("drop table if exists issue17")
161            c.execute("create table issue17 (x varchar(32) primary key)")
162            c.execute("insert into issue17 (x) values ('hello, world!')")
163            c.execute("grant all privileges on %s.issue17 to 'issue17user'@'%%' identified by '1234'" % db)
164            conn.commit()
165
166            conn2 = pymysql.connect(host=host, user="issue17user", passwd="1234", db=db)
167            c2 = conn2.cursor()
168            c2.execute("select x from issue17")
169            self.assertEqual("hello, world!", c2.fetchone()[0])
170        finally:
171            c.execute("drop table issue17")
172
173class TestNewIssues(base.PyMySQLTestCase):
174    def test_issue_34(self):
175        try:
176            pymysql.connect(host="localhost", port=1237, user="root")
177            self.fail()
178        except pymysql.OperationalError as e:
179            self.assertEqual(2003, e.args[0])
180        except Exception:
181            self.fail()
182
183    def test_issue_33(self):
184        conn = pymysql.connect(charset="utf8", **self.databases[0])
185        self.safe_create_table(conn, u'hei\xdfe',
186                               u'create table hei\xdfe (name varchar(32))')
187        c = conn.cursor()
188        c.execute(u"insert into hei\xdfe (name) values ('Pi\xdfata')")
189        c.execute(u"select name from hei\xdfe")
190        self.assertEqual(u"Pi\xdfata", c.fetchone()[0])
191
192    @unittest2.skip("This test requires manual intervention")
193    def test_issue_35(self):
194        conn = self.connections[0]
195        c = conn.cursor()
196        print("sudo killall -9 mysqld within the next 10 seconds")
197        try:
198            c.execute("select sleep(10)")
199            self.fail()
200        except pymysql.OperationalError as e:
201            self.assertEqual(2013, e.args[0])
202
203    def test_issue_36(self):
204        # connection 0 is super user, connection 1 isn't
205        conn = self.connections[1]
206        c = conn.cursor()
207        c.execute("show processlist")
208        kill_id = None
209        for row in c.fetchall():
210            id = row[0]
211            info = row[7]
212            if info == "show processlist":
213                kill_id = id
214                break
215        self.assertEqual(kill_id, conn.thread_id())
216        # now nuke the connection
217        self.connections[0].kill(kill_id)
218        # make sure this connection has broken
219        try:
220            c.execute("show tables")
221            self.fail()
222        except Exception:
223            pass
224        c.close()
225        conn.close()
226
227        # check the process list from the other connection
228        try:
229            # Wait since Travis-CI sometimes fail this test.
230            time.sleep(0.1)
231
232            c = self.connections[0].cursor()
233            c.execute("show processlist")
234            ids = [row[0] for row in c.fetchall()]
235            self.assertFalse(kill_id in ids)
236        finally:
237            del self.connections[1]
238
239    def test_issue_37(self):
240        conn = self.connections[0]
241        c = conn.cursor()
242        self.assertEqual(1, c.execute("SELECT @foo"))
243        self.assertEqual((None,), c.fetchone())
244        self.assertEqual(0, c.execute("SET @foo = 'bar'"))
245        c.execute("set @foo = 'bar'")
246
247    def test_issue_38(self):
248        conn = self.connections[0]
249        c = conn.cursor()
250        datum = "a" * 1024 * 1023 # reduced size for most default mysql installs
251
252        try:
253            with warnings.catch_warnings():
254                warnings.filterwarnings("ignore")
255                c.execute("drop table if exists issue38")
256            c.execute("create table issue38 (id integer, data mediumblob)")
257            c.execute("insert into issue38 values (1, %s)", (datum,))
258        finally:
259            c.execute("drop table issue38")
260
261    def disabled_test_issue_54(self):
262        conn = self.connections[0]
263        c = conn.cursor()
264        with warnings.catch_warnings():
265            warnings.filterwarnings("ignore")
266            c.execute("drop table if exists issue54")
267        big_sql = "select * from issue54 where "
268        big_sql += " and ".join("%d=%d" % (i,i) for i in range(0, 100000))
269
270        try:
271            c.execute("create table issue54 (id integer primary key)")
272            c.execute("insert into issue54 (id) values (7)")
273            c.execute(big_sql)
274            self.assertEqual(7, c.fetchone()[0])
275        finally:
276            c.execute("drop table issue54")
277
278class TestGitHubIssues(base.PyMySQLTestCase):
279    def test_issue_66(self):
280        """ 'Connection' object has no attribute 'insert_id' """
281        conn = self.connections[0]
282        c = conn.cursor()
283        self.assertEqual(0, conn.insert_id())
284        try:
285            with warnings.catch_warnings():
286                warnings.filterwarnings("ignore")
287                c.execute("drop table if exists issue66")
288            c.execute("create table issue66 (id integer primary key auto_increment, x integer)")
289            c.execute("insert into issue66 (x) values (1)")
290            c.execute("insert into issue66 (x) values (1)")
291            self.assertEqual(2, conn.insert_id())
292        finally:
293            c.execute("drop table issue66")
294
295    def test_issue_79(self):
296        """ Duplicate field overwrites the previous one in the result of DictCursor """
297        conn = self.connections[0]
298        c = conn.cursor(pymysql.cursors.DictCursor)
299
300        with warnings.catch_warnings():
301            warnings.filterwarnings("ignore")
302            c.execute("drop table if exists a")
303            c.execute("drop table if exists b")
304        c.execute("""CREATE TABLE a (id int, value int)""")
305        c.execute("""CREATE TABLE b (id int, value int)""")
306
307        a=(1,11)
308        b=(1,22)
309        try:
310            c.execute("insert into a values (%s, %s)", a)
311            c.execute("insert into b values (%s, %s)", b)
312
313            c.execute("SELECT * FROM a inner join b on a.id = b.id")
314            r = c.fetchall()[0]
315            self.assertEqual(r['id'], 1)
316            self.assertEqual(r['value'], 11)
317            self.assertEqual(r['b.value'], 22)
318        finally:
319            c.execute("drop table a")
320            c.execute("drop table b")
321
322    def test_issue_95(self):
323        """ Leftover trailing OK packet for "CALL my_sp" queries """
324        conn = self.connections[0]
325        cur = conn.cursor()
326        with warnings.catch_warnings():
327            warnings.filterwarnings("ignore")
328            cur.execute("DROP PROCEDURE IF EXISTS `foo`")
329        cur.execute("""CREATE PROCEDURE `foo` ()
330        BEGIN
331            SELECT 1;
332        END""")
333        try:
334            cur.execute("""CALL foo()""")
335            cur.execute("""SELECT 1""")
336            self.assertEqual(cur.fetchone()[0], 1)
337        finally:
338            with warnings.catch_warnings():
339                warnings.filterwarnings("ignore")
340                cur.execute("DROP PROCEDURE IF EXISTS `foo`")
341
342    def test_issue_114(self):
343        """ autocommit is not set after reconnecting with ping() """
344        conn = pymysql.connect(charset="utf8", **self.databases[0])
345        conn.autocommit(False)
346        c = conn.cursor()
347        c.execute("""select @@autocommit;""")
348        self.assertFalse(c.fetchone()[0])
349        conn.close()
350        conn.ping()
351        c.execute("""select @@autocommit;""")
352        self.assertFalse(c.fetchone()[0])
353        conn.close()
354
355        # Ensure autocommit() is still working
356        conn = pymysql.connect(charset="utf8", **self.databases[0])
357        c = conn.cursor()
358        c.execute("""select @@autocommit;""")
359        self.assertFalse(c.fetchone()[0])
360        conn.close()
361        conn.ping()
362        conn.autocommit(True)
363        c.execute("""select @@autocommit;""")
364        self.assertTrue(c.fetchone()[0])
365        conn.close()
366
367    def test_issue_175(self):
368        """ The number of fields returned by server is read in wrong way """
369        conn = self.connections[0]
370        cur = conn.cursor()
371        for length in (200, 300):
372            columns = ', '.join('c{0} integer'.format(i) for i in range(length))
373            sql = 'create table test_field_count ({0})'.format(columns)
374            try:
375                cur.execute(sql)
376                cur.execute('select * from test_field_count')
377                assert len(cur.description) == length
378            finally:
379                with warnings.catch_warnings():
380                    warnings.filterwarnings("ignore")
381                    cur.execute('drop table if exists test_field_count')
382
383    def test_issue_321(self):
384        """ Test iterable as query argument. """
385        conn = pymysql.connect(charset="utf8", **self.databases[0])
386        self.safe_create_table(
387            conn, "issue321",
388            "create table issue321 (value_1 varchar(1), value_2 varchar(1))")
389
390        sql_insert = "insert into issue321 (value_1, value_2) values (%s, %s)"
391        sql_dict_insert = ("insert into issue321 (value_1, value_2) "
392                           "values (%(value_1)s, %(value_2)s)")
393        sql_select = ("select * from issue321 where "
394                      "value_1 in %s and value_2=%s")
395        data = [
396            [(u"a", ), u"\u0430"],
397            [[u"b"], u"\u0430"],
398            {"value_1": [[u"c"]], "value_2": u"\u0430"}
399        ]
400        cur = conn.cursor()
401        self.assertEqual(cur.execute(sql_insert, data[0]), 1)
402        self.assertEqual(cur.execute(sql_insert, data[1]), 1)
403        self.assertEqual(cur.execute(sql_dict_insert, data[2]), 1)
404        self.assertEqual(
405            cur.execute(sql_select, [(u"a", u"b", u"c"), u"\u0430"]), 3)
406        self.assertEqual(cur.fetchone(), (u"a", u"\u0430"))
407        self.assertEqual(cur.fetchone(), (u"b", u"\u0430"))
408        self.assertEqual(cur.fetchone(), (u"c", u"\u0430"))
409
410    def test_issue_364(self):
411        """ Test mixed unicode/binary arguments in executemany. """
412        conn = pymysql.connect(charset="utf8", **self.databases[0])
413        self.safe_create_table(
414            conn, "issue364",
415            "create table issue364 (value_1 binary(3), value_2 varchar(3)) "
416            "engine=InnoDB default charset=utf8")
417
418        sql = "insert into issue364 (value_1, value_2) values (%s, %s)"
419        usql = u"insert into issue364 (value_1, value_2) values (%s, %s)"
420        values = [pymysql.Binary(b"\x00\xff\x00"), u"\xe4\xf6\xfc"]
421
422        # test single insert and select
423        cur = conn.cursor()
424        cur.execute(sql, args=values)
425        cur.execute("select * from issue364")
426        self.assertEqual(cur.fetchone(), tuple(values))
427
428        # test single insert unicode query
429        cur.execute(usql, args=values)
430
431        # test multi insert and select
432        cur.executemany(sql, args=(values, values, values))
433        cur.execute("select * from issue364")
434        for row in cur.fetchall():
435            self.assertEqual(row, tuple(values))
436
437        # test multi insert with unicode query
438        cur.executemany(usql, args=(values, values, values))
439
440    def test_issue_363(self):
441        """ Test binary / geometry types. """
442        conn = pymysql.connect(charset="utf8", **self.databases[0])
443        self.safe_create_table(
444            conn, "issue363",
445            "CREATE TABLE issue363 ( "
446            "id INTEGER PRIMARY KEY, geom LINESTRING NOT NULL, "
447            "SPATIAL KEY geom (geom)) "
448            "ENGINE=MyISAM default charset=utf8")
449
450        cur = conn.cursor()
451        query = ("INSERT INTO issue363 (id, geom) VALUES"
452                 "(1998, GeomFromText('LINESTRING(1.1 1.1,2.2 2.2)'))")
453        # From MySQL 5.7, ST_GeomFromText is added and GeomFromText is deprecated.
454        if self.mysql_server_is(conn, (5, 7, 0)):
455            with self.assertWarns(pymysql.err.Warning) as cm:
456                cur.execute(query)
457        else:
458            cur.execute(query)
459
460        # select WKT
461        query = "SELECT AsText(geom) FROM issue363"
462        if self.mysql_server_is(conn, (5, 7, 0)):
463            with self.assertWarns(pymysql.err.Warning) as cm:
464                cur.execute(query)
465        else:
466            cur.execute(query)
467        row = cur.fetchone()
468        self.assertEqual(row, ("LINESTRING(1.1 1.1,2.2 2.2)", ))
469
470        # select WKB
471        query = "SELECT AsBinary(geom) FROM issue363"
472        if self.mysql_server_is(conn, (5, 7, 0)):
473            with self.assertWarns(pymysql.err.Warning) as cm:
474                cur.execute(query)
475        else:
476            cur.execute(query)
477        row = cur.fetchone()
478        self.assertEqual(row,
479                         (b"\x01\x02\x00\x00\x00\x02\x00\x00\x00"
480                          b"\x9a\x99\x99\x99\x99\x99\xf1?"
481                          b"\x9a\x99\x99\x99\x99\x99\xf1?"
482                          b"\x9a\x99\x99\x99\x99\x99\x01@"
483                          b"\x9a\x99\x99\x99\x99\x99\x01@", ))
484
485        # select internal binary
486        cur.execute("SELECT geom FROM issue363")
487        row = cur.fetchone()
488        # don't assert the exact internal binary value, as it could
489        # vary across implementations
490        self.assertTrue(isinstance(row[0], bytes))
491
492    def test_issue_491(self):
493        """ Test warning propagation """
494        conn = pymysql.connect(charset="utf8", **self.databases[0])
495
496        with warnings.catch_warnings():
497            # Ignore all warnings other than pymysql generated ones
498            warnings.simplefilter("ignore")
499            warnings.simplefilter("error", category=pymysql.Warning)
500
501            # verify for both buffered and unbuffered cursor types
502            for cursor_class in (cursors.Cursor, cursors.SSCursor):
503                c = conn.cursor(cursor_class)
504                try:
505                    c.execute("SELECT CAST('124b' AS SIGNED)")
506                    c.fetchall()
507                except pymysql.Warning as e:
508                    # Warnings should have errorcode and string message, just like exceptions
509                    self.assertEqual(len(e.args), 2)
510                    self.assertEqual(e.args[0], 1292)
511                    self.assertTrue(isinstance(e.args[1], text_type))
512                else:
513                    self.fail("Should raise Warning")
514                finally:
515                    c.close()
Note: See TracBrowser for help on using the repository browser.