source: OpenRLabs-Git/deploy/rlabs-docker/web2py-rlabs/gluon/contrib/pymysql/tests/test_basic.py

main
Last change on this file was 42bd667, checked in by David Fuertes <dfuertes@…>, 4 years ago

Historial Limpio

  • Property mode set to 100755
File size: 14.1 KB
Line 
1# coding: utf-8
2import datetime
3import json
4import time
5import warnings
6
7from unittest2 import SkipTest
8
9from pymysql import util
10import pymysql.cursors
11from pymysql.tests import base
12from pymysql.err import ProgrammingError
13
14
15__all__ = ["TestConversion", "TestCursor", "TestBulkInserts"]
16
17
18class TestConversion(base.PyMySQLTestCase):
19    def test_datatypes(self):
20        """ test every data type """
21        conn = self.connections[0]
22        c = conn.cursor()
23        c.execute("create table test_datatypes (b bit, i int, l bigint, f real, s varchar(32), u varchar(32), bb blob, d date, dt datetime, ts timestamp, td time, t time, st datetime)")
24        try:
25            # insert values
26
27            v = (True, -3, 123456789012, 5.7, "hello'\" world", u"Espa\xc3\xb1ol", "binary\x00data".encode(conn.charset), datetime.date(1988,2,2), datetime.datetime(2014, 5, 15, 7, 45, 57), datetime.timedelta(5,6), datetime.time(16,32), time.localtime())
28            c.execute("insert into test_datatypes (b,i,l,f,s,u,bb,d,dt,td,t,st) values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)", v)
29            c.execute("select b,i,l,f,s,u,bb,d,dt,td,t,st from test_datatypes")
30            r = c.fetchone()
31            self.assertEqual(util.int2byte(1), r[0])
32            self.assertEqual(v[1:10], r[1:10])
33            self.assertEqual(datetime.timedelta(0, 60 * (v[10].hour * 60 + v[10].minute)), r[10])
34            self.assertEqual(datetime.datetime(*v[-1][:6]), r[-1])
35
36            c.execute("delete from test_datatypes")
37
38            # check nulls
39            c.execute("insert into test_datatypes (b,i,l,f,s,u,bb,d,dt,td,t,st) values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)", [None] * 12)
40            c.execute("select b,i,l,f,s,u,bb,d,dt,td,t,st from test_datatypes")
41            r = c.fetchone()
42            self.assertEqual(tuple([None] * 12), r)
43
44            c.execute("delete from test_datatypes")
45
46            # check sequences type
47            for seq_type in (tuple, list, set, frozenset):
48                c.execute("insert into test_datatypes (i, l) values (2,4), (6,8), (10,12)")
49                seq = seq_type([2,6])
50                c.execute("select l from test_datatypes where i in %s order by i", (seq,))
51                r = c.fetchall()
52                self.assertEqual(((4,),(8,)), r)
53                c.execute("delete from test_datatypes")
54
55        finally:
56            c.execute("drop table test_datatypes")
57
58    def test_dict(self):
59        """ test dict escaping """
60        conn = self.connections[0]
61        c = conn.cursor()
62        c.execute("create table test_dict (a integer, b integer, c integer)")
63        try:
64            c.execute("insert into test_dict (a,b,c) values (%(a)s, %(b)s, %(c)s)", {"a":1,"b":2,"c":3})
65            c.execute("select a,b,c from test_dict")
66            self.assertEqual((1,2,3), c.fetchone())
67        finally:
68            c.execute("drop table test_dict")
69
70    def test_string(self):
71        conn = self.connections[0]
72        c = conn.cursor()
73        c.execute("create table test_dict (a text)")
74        test_value = "I am a test string"
75        try:
76            c.execute("insert into test_dict (a) values (%s)", test_value)
77            c.execute("select a from test_dict")
78            self.assertEqual((test_value,), c.fetchone())
79        finally:
80            c.execute("drop table test_dict")
81
82    def test_integer(self):
83        conn = self.connections[0]
84        c = conn.cursor()
85        c.execute("create table test_dict (a integer)")
86        test_value = 12345
87        try:
88            c.execute("insert into test_dict (a) values (%s)", test_value)
89            c.execute("select a from test_dict")
90            self.assertEqual((test_value,), c.fetchone())
91        finally:
92            c.execute("drop table test_dict")
93
94    def test_blob(self):
95        """test binary data"""
96        data = bytes(bytearray(range(256)) * 4)
97        conn = self.connections[0]
98        self.safe_create_table(
99            conn, "test_blob", "create table test_blob (b blob)")
100
101        with conn.cursor() as c:
102            c.execute("insert into test_blob (b) values (%s)", (data,))
103            c.execute("select b from test_blob")
104            self.assertEqual(data, c.fetchone()[0])
105
106    def test_untyped(self):
107        """ test conversion of null, empty string """
108        conn = self.connections[0]
109        c = conn.cursor()
110        c.execute("select null,''")
111        self.assertEqual((None,u''), c.fetchone())
112        c.execute("select '',null")
113        self.assertEqual((u'',None), c.fetchone())
114
115    def test_timedelta(self):
116        """ test timedelta conversion """
117        conn = self.connections[0]
118        c = conn.cursor()
119        c.execute("select time('12:30'), time('23:12:59'), time('23:12:59.05100'), time('-12:30'), time('-23:12:59'), time('-23:12:59.05100'), time('-00:30')")
120        self.assertEqual((datetime.timedelta(0, 45000),
121                          datetime.timedelta(0, 83579),
122                          datetime.timedelta(0, 83579, 51000),
123                          -datetime.timedelta(0, 45000),
124                          -datetime.timedelta(0, 83579),
125                          -datetime.timedelta(0, 83579, 51000),
126                          -datetime.timedelta(0, 1800)),
127                         c.fetchone())
128
129    def test_datetime_microseconds(self):
130        """ test datetime conversion w microseconds"""
131
132        conn = self.connections[0]
133        if not self.mysql_server_is(conn, (5, 6, 4)):
134            raise SkipTest("target backend does not support microseconds")
135        c = conn.cursor()
136        dt = datetime.datetime(2013, 11, 12, 9, 9, 9, 123450)
137        c.execute("create table test_datetime (id int, ts datetime(6))")
138        try:
139            c.execute(
140                "insert into test_datetime values (%s, %s)",
141                (1, dt)
142            )
143            c.execute("select ts from test_datetime")
144            self.assertEqual((dt,), c.fetchone())
145        finally:
146            c.execute("drop table test_datetime")
147
148
149class TestCursor(base.PyMySQLTestCase):
150    # this test case does not work quite right yet, however,
151    # we substitute in None for the erroneous field which is
152    # compatible with the DB-API 2.0 spec and has not broken
153    # any unit tests for anything we've tried.
154
155    #def test_description(self):
156    #    """ test description attribute """
157    #    # result is from MySQLdb module
158    #    r = (('Host', 254, 11, 60, 60, 0, 0),
159    #         ('User', 254, 16, 16, 16, 0, 0),
160    #         ('Password', 254, 41, 41, 41, 0, 0),
161    #         ('Select_priv', 254, 1, 1, 1, 0, 0),
162    #         ('Insert_priv', 254, 1, 1, 1, 0, 0),
163    #         ('Update_priv', 254, 1, 1, 1, 0, 0),
164    #         ('Delete_priv', 254, 1, 1, 1, 0, 0),
165    #         ('Create_priv', 254, 1, 1, 1, 0, 0),
166    #         ('Drop_priv', 254, 1, 1, 1, 0, 0),
167    #         ('Reload_priv', 254, 1, 1, 1, 0, 0),
168    #         ('Shutdown_priv', 254, 1, 1, 1, 0, 0),
169    #         ('Process_priv', 254, 1, 1, 1, 0, 0),
170    #         ('File_priv', 254, 1, 1, 1, 0, 0),
171    #         ('Grant_priv', 254, 1, 1, 1, 0, 0),
172    #         ('References_priv', 254, 1, 1, 1, 0, 0),
173    #         ('Index_priv', 254, 1, 1, 1, 0, 0),
174    #         ('Alter_priv', 254, 1, 1, 1, 0, 0),
175    #         ('Show_db_priv', 254, 1, 1, 1, 0, 0),
176    #         ('Super_priv', 254, 1, 1, 1, 0, 0),
177    #         ('Create_tmp_table_priv', 254, 1, 1, 1, 0, 0),
178    #         ('Lock_tables_priv', 254, 1, 1, 1, 0, 0),
179    #         ('Execute_priv', 254, 1, 1, 1, 0, 0),
180    #         ('Repl_slave_priv', 254, 1, 1, 1, 0, 0),
181    #         ('Repl_client_priv', 254, 1, 1, 1, 0, 0),
182    #         ('Create_view_priv', 254, 1, 1, 1, 0, 0),
183    #         ('Show_view_priv', 254, 1, 1, 1, 0, 0),
184    #         ('Create_routine_priv', 254, 1, 1, 1, 0, 0),
185    #         ('Alter_routine_priv', 254, 1, 1, 1, 0, 0),
186    #         ('Create_user_priv', 254, 1, 1, 1, 0, 0),
187    #         ('Event_priv', 254, 1, 1, 1, 0, 0),
188    #         ('Trigger_priv', 254, 1, 1, 1, 0, 0),
189    #         ('ssl_type', 254, 0, 9, 9, 0, 0),
190    #         ('ssl_cipher', 252, 0, 65535, 65535, 0, 0),
191    #         ('x509_issuer', 252, 0, 65535, 65535, 0, 0),
192    #         ('x509_subject', 252, 0, 65535, 65535, 0, 0),
193    #         ('max_questions', 3, 1, 11, 11, 0, 0),
194    #         ('max_updates', 3, 1, 11, 11, 0, 0),
195    #         ('max_connections', 3, 1, 11, 11, 0, 0),
196    #         ('max_user_connections', 3, 1, 11, 11, 0, 0))
197    #    conn = self.connections[0]
198    #    c = conn.cursor()
199    #    c.execute("select * from mysql.user")
200    #
201    #    self.assertEqual(r, c.description)
202
203    def test_fetch_no_result(self):
204        """ test a fetchone() with no rows """
205        conn = self.connections[0]
206        c = conn.cursor()
207        c.execute("create table test_nr (b varchar(32))")
208        try:
209            data = "pymysql"
210            c.execute("insert into test_nr (b) values (%s)", (data,))
211            self.assertEqual(None, c.fetchone())
212        finally:
213            c.execute("drop table test_nr")
214
215    def test_aggregates(self):
216        """ test aggregate functions """
217        conn = self.connections[0]
218        c = conn.cursor()
219        try:
220            c.execute('create table test_aggregates (i integer)')
221            for i in range(0, 10):
222                c.execute('insert into test_aggregates (i) values (%s)', (i,))
223            c.execute('select sum(i) from test_aggregates')
224            r, = c.fetchone()
225            self.assertEqual(sum(range(0,10)), r)
226        finally:
227            c.execute('drop table test_aggregates')
228
229    def test_single_tuple(self):
230        """ test a single tuple """
231        conn = self.connections[0]
232        c = conn.cursor()
233        self.safe_create_table(
234            conn, 'mystuff',
235            "create table mystuff (id integer primary key)")
236        c.execute("insert into mystuff (id) values (1)")
237        c.execute("insert into mystuff (id) values (2)")
238        c.execute("select id from mystuff where id in %s", ((1,),))
239        self.assertEqual([(1,)], list(c.fetchall()))
240        c.close()
241
242    def test_json(self):
243        args = self.databases[0].copy()
244        args["charset"] = "utf8mb4"
245        conn = pymysql.connect(**args)
246        if not self.mysql_server_is(conn, (5, 7, 0)):
247            raise SkipTest("JSON type is not supported on MySQL <= 5.6")
248
249        self.safe_create_table(conn, "test_json", """\
250create table test_json (
251    id int not null,
252    json JSON not null,
253    primary key (id)
254);""")
255        cur = conn.cursor()
256
257        json_str = u'{"hello": "こんにちは"}'
258        cur.execute("INSERT INTO test_json (id, `json`) values (42, %s)", (json_str,))
259        cur.execute("SELECT `json` from `test_json` WHERE `id`=42")
260        res = cur.fetchone()[0]
261        self.assertEqual(json.loads(res), json.loads(json_str))
262
263        cur.execute("SELECT CAST(%s AS JSON) AS x", (json_str,))
264        res = cur.fetchone()[0]
265        self.assertEqual(json.loads(res), json.loads(json_str))
266
267
268class TestBulkInserts(base.PyMySQLTestCase):
269
270    cursor_type = pymysql.cursors.DictCursor
271
272    def setUp(self):
273        super(TestBulkInserts, self).setUp()
274        self.conn = conn = self.connections[0]
275        c = conn.cursor(self.cursor_type)
276
277        # create a table ane some data to query
278        self.safe_create_table(conn, 'bulkinsert', """\
279CREATE TABLE bulkinsert
280(
281id int(11),
282name char(20),
283age int,
284height int,
285PRIMARY KEY (id)
286)
287""")
288
289    def _verify_records(self, data):
290        conn = self.connections[0]
291        cursor = conn.cursor()
292        cursor.execute("SELECT id, name, age, height from bulkinsert")
293        result = cursor.fetchall()
294        self.assertEqual(sorted(data), sorted(result))
295
296    def test_bulk_insert(self):
297        conn = self.connections[0]
298        cursor = conn.cursor()
299
300        data = [(0, "bob", 21, 123), (1, "jim", 56, 45), (2, "fred", 100, 180)]
301        cursor.executemany("insert into bulkinsert (id, name, age, height) "
302                           "values (%s,%s,%s,%s)", data)
303        self.assertEqual(
304            cursor._last_executed, bytearray(
305            b"insert into bulkinsert (id, name, age, height) values "
306            b"(0,'bob',21,123),(1,'jim',56,45),(2,'fred',100,180)"))
307        cursor.execute('commit')
308        self._verify_records(data)
309
310    def test_bulk_insert_multiline_statement(self):
311        conn = self.connections[0]
312        cursor = conn.cursor()
313        data = [(0, "bob", 21, 123), (1, "jim", 56, 45), (2, "fred", 100, 180)]
314        cursor.executemany("""insert
315into bulkinsert (id, name,
316age, height)
317values (%s,
318%s , %s,
319%s )
320 """, data)
321        self.assertEqual(cursor._last_executed.strip(), bytearray(b"""insert
322into bulkinsert (id, name,
323age, height)
324values (0,
325'bob' , 21,
326123 ),(1,
327'jim' , 56,
32845 ),(2,
329'fred' , 100,
330180 )"""))
331        cursor.execute('commit')
332        self._verify_records(data)
333
334    def test_bulk_insert_single_record(self):
335        conn = self.connections[0]
336        cursor = conn.cursor()
337        data = [(0, "bob", 21, 123)]
338        cursor.executemany("insert into bulkinsert (id, name, age, height) "
339                           "values (%s,%s,%s,%s)", data)
340        cursor.execute('commit')
341        self._verify_records(data)
342
343    def test_issue_288(self):
344        """executemany should work with "insert ... on update" """
345        conn = self.connections[0]
346        cursor = conn.cursor()
347        data = [(0, "bob", 21, 123), (1, "jim", 56, 45), (2, "fred", 100, 180)]
348        cursor.executemany("""insert
349into bulkinsert (id, name,
350age, height)
351values (%s,
352%s , %s,
353%s ) on duplicate key update
354age = values(age)
355 """, data)
356        self.assertEqual(cursor._last_executed.strip(), bytearray(b"""insert
357into bulkinsert (id, name,
358age, height)
359values (0,
360'bob' , 21,
361123 ),(1,
362'jim' , 56,
36345 ),(2,
364'fred' , 100,
365180 ) on duplicate key update
366age = values(age)"""))
367        cursor.execute('commit')
368        self._verify_records(data)
369
370    def test_warnings(self):
371        con = self.connections[0]
372        cur = con.cursor()
373        with warnings.catch_warnings(record=True) as ws:
374            warnings.simplefilter("always")
375            cur.execute("drop table if exists no_exists_table")
376        self.assertEqual(len(ws), 1)
377        self.assertEqual(ws[0].category, pymysql.Warning)
378        if u"no_exists_table" not in str(ws[0].message):
379            self.fail("'no_exists_table' not in %s" % (str(ws[0].message),))
Note: See TracBrowser for help on using the repository browser.