1 | # coding: utf-8 |
---|
2 | import datetime |
---|
3 | import json |
---|
4 | import time |
---|
5 | import warnings |
---|
6 | |
---|
7 | from unittest2 import SkipTest |
---|
8 | |
---|
9 | from pymysql import util |
---|
10 | import pymysql.cursors |
---|
11 | from pymysql.tests import base |
---|
12 | from pymysql.err import ProgrammingError |
---|
13 | |
---|
14 | |
---|
15 | __all__ = ["TestConversion", "TestCursor", "TestBulkInserts"] |
---|
16 | |
---|
17 | |
---|
18 | class TestConversion(base.PyMySQLTestCase): |
---|
19 | def test_datatypes(self): |
---|
20 | """ test every data type """ |
---|
21 | conn = self.connections[0] |
---|
22 | c = conn.cursor() |
---|
23 | c.execute("create table test_datatypes (b bit, i int, l bigint, f real, s varchar(32), u varchar(32), bb blob, d date, dt datetime, ts timestamp, td time, t time, st datetime)") |
---|
24 | try: |
---|
25 | # insert values |
---|
26 | |
---|
27 | v = (True, -3, 123456789012, 5.7, "hello'\" world", u"Espa\xc3\xb1ol", "binary\x00data".encode(conn.charset), datetime.date(1988,2,2), datetime.datetime(2014, 5, 15, 7, 45, 57), datetime.timedelta(5,6), datetime.time(16,32), time.localtime()) |
---|
28 | c.execute("insert into test_datatypes (b,i,l,f,s,u,bb,d,dt,td,t,st) values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)", v) |
---|
29 | c.execute("select b,i,l,f,s,u,bb,d,dt,td,t,st from test_datatypes") |
---|
30 | r = c.fetchone() |
---|
31 | self.assertEqual(util.int2byte(1), r[0]) |
---|
32 | self.assertEqual(v[1:10], r[1:10]) |
---|
33 | self.assertEqual(datetime.timedelta(0, 60 * (v[10].hour * 60 + v[10].minute)), r[10]) |
---|
34 | self.assertEqual(datetime.datetime(*v[-1][:6]), r[-1]) |
---|
35 | |
---|
36 | c.execute("delete from test_datatypes") |
---|
37 | |
---|
38 | # check nulls |
---|
39 | c.execute("insert into test_datatypes (b,i,l,f,s,u,bb,d,dt,td,t,st) values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)", [None] * 12) |
---|
40 | c.execute("select b,i,l,f,s,u,bb,d,dt,td,t,st from test_datatypes") |
---|
41 | r = c.fetchone() |
---|
42 | self.assertEqual(tuple([None] * 12), r) |
---|
43 | |
---|
44 | c.execute("delete from test_datatypes") |
---|
45 | |
---|
46 | # check sequences type |
---|
47 | for seq_type in (tuple, list, set, frozenset): |
---|
48 | c.execute("insert into test_datatypes (i, l) values (2,4), (6,8), (10,12)") |
---|
49 | seq = seq_type([2,6]) |
---|
50 | c.execute("select l from test_datatypes where i in %s order by i", (seq,)) |
---|
51 | r = c.fetchall() |
---|
52 | self.assertEqual(((4,),(8,)), r) |
---|
53 | c.execute("delete from test_datatypes") |
---|
54 | |
---|
55 | finally: |
---|
56 | c.execute("drop table test_datatypes") |
---|
57 | |
---|
58 | def test_dict(self): |
---|
59 | """ test dict escaping """ |
---|
60 | conn = self.connections[0] |
---|
61 | c = conn.cursor() |
---|
62 | c.execute("create table test_dict (a integer, b integer, c integer)") |
---|
63 | try: |
---|
64 | c.execute("insert into test_dict (a,b,c) values (%(a)s, %(b)s, %(c)s)", {"a":1,"b":2,"c":3}) |
---|
65 | c.execute("select a,b,c from test_dict") |
---|
66 | self.assertEqual((1,2,3), c.fetchone()) |
---|
67 | finally: |
---|
68 | c.execute("drop table test_dict") |
---|
69 | |
---|
70 | def test_string(self): |
---|
71 | conn = self.connections[0] |
---|
72 | c = conn.cursor() |
---|
73 | c.execute("create table test_dict (a text)") |
---|
74 | test_value = "I am a test string" |
---|
75 | try: |
---|
76 | c.execute("insert into test_dict (a) values (%s)", test_value) |
---|
77 | c.execute("select a from test_dict") |
---|
78 | self.assertEqual((test_value,), c.fetchone()) |
---|
79 | finally: |
---|
80 | c.execute("drop table test_dict") |
---|
81 | |
---|
82 | def test_integer(self): |
---|
83 | conn = self.connections[0] |
---|
84 | c = conn.cursor() |
---|
85 | c.execute("create table test_dict (a integer)") |
---|
86 | test_value = 12345 |
---|
87 | try: |
---|
88 | c.execute("insert into test_dict (a) values (%s)", test_value) |
---|
89 | c.execute("select a from test_dict") |
---|
90 | self.assertEqual((test_value,), c.fetchone()) |
---|
91 | finally: |
---|
92 | c.execute("drop table test_dict") |
---|
93 | |
---|
94 | def test_blob(self): |
---|
95 | """test binary data""" |
---|
96 | data = bytes(bytearray(range(256)) * 4) |
---|
97 | conn = self.connections[0] |
---|
98 | self.safe_create_table( |
---|
99 | conn, "test_blob", "create table test_blob (b blob)") |
---|
100 | |
---|
101 | with conn.cursor() as c: |
---|
102 | c.execute("insert into test_blob (b) values (%s)", (data,)) |
---|
103 | c.execute("select b from test_blob") |
---|
104 | self.assertEqual(data, c.fetchone()[0]) |
---|
105 | |
---|
106 | def test_untyped(self): |
---|
107 | """ test conversion of null, empty string """ |
---|
108 | conn = self.connections[0] |
---|
109 | c = conn.cursor() |
---|
110 | c.execute("select null,''") |
---|
111 | self.assertEqual((None,u''), c.fetchone()) |
---|
112 | c.execute("select '',null") |
---|
113 | self.assertEqual((u'',None), c.fetchone()) |
---|
114 | |
---|
115 | def test_timedelta(self): |
---|
116 | """ test timedelta conversion """ |
---|
117 | conn = self.connections[0] |
---|
118 | c = conn.cursor() |
---|
119 | c.execute("select time('12:30'), time('23:12:59'), time('23:12:59.05100'), time('-12:30'), time('-23:12:59'), time('-23:12:59.05100'), time('-00:30')") |
---|
120 | self.assertEqual((datetime.timedelta(0, 45000), |
---|
121 | datetime.timedelta(0, 83579), |
---|
122 | datetime.timedelta(0, 83579, 51000), |
---|
123 | -datetime.timedelta(0, 45000), |
---|
124 | -datetime.timedelta(0, 83579), |
---|
125 | -datetime.timedelta(0, 83579, 51000), |
---|
126 | -datetime.timedelta(0, 1800)), |
---|
127 | c.fetchone()) |
---|
128 | |
---|
129 | def test_datetime_microseconds(self): |
---|
130 | """ test datetime conversion w microseconds""" |
---|
131 | |
---|
132 | conn = self.connections[0] |
---|
133 | if not self.mysql_server_is(conn, (5, 6, 4)): |
---|
134 | raise SkipTest("target backend does not support microseconds") |
---|
135 | c = conn.cursor() |
---|
136 | dt = datetime.datetime(2013, 11, 12, 9, 9, 9, 123450) |
---|
137 | c.execute("create table test_datetime (id int, ts datetime(6))") |
---|
138 | try: |
---|
139 | c.execute( |
---|
140 | "insert into test_datetime values (%s, %s)", |
---|
141 | (1, dt) |
---|
142 | ) |
---|
143 | c.execute("select ts from test_datetime") |
---|
144 | self.assertEqual((dt,), c.fetchone()) |
---|
145 | finally: |
---|
146 | c.execute("drop table test_datetime") |
---|
147 | |
---|
148 | |
---|
149 | class TestCursor(base.PyMySQLTestCase): |
---|
150 | # this test case does not work quite right yet, however, |
---|
151 | # we substitute in None for the erroneous field which is |
---|
152 | # compatible with the DB-API 2.0 spec and has not broken |
---|
153 | # any unit tests for anything we've tried. |
---|
154 | |
---|
155 | #def test_description(self): |
---|
156 | # """ test description attribute """ |
---|
157 | # # result is from MySQLdb module |
---|
158 | # r = (('Host', 254, 11, 60, 60, 0, 0), |
---|
159 | # ('User', 254, 16, 16, 16, 0, 0), |
---|
160 | # ('Password', 254, 41, 41, 41, 0, 0), |
---|
161 | # ('Select_priv', 254, 1, 1, 1, 0, 0), |
---|
162 | # ('Insert_priv', 254, 1, 1, 1, 0, 0), |
---|
163 | # ('Update_priv', 254, 1, 1, 1, 0, 0), |
---|
164 | # ('Delete_priv', 254, 1, 1, 1, 0, 0), |
---|
165 | # ('Create_priv', 254, 1, 1, 1, 0, 0), |
---|
166 | # ('Drop_priv', 254, 1, 1, 1, 0, 0), |
---|
167 | # ('Reload_priv', 254, 1, 1, 1, 0, 0), |
---|
168 | # ('Shutdown_priv', 254, 1, 1, 1, 0, 0), |
---|
169 | # ('Process_priv', 254, 1, 1, 1, 0, 0), |
---|
170 | # ('File_priv', 254, 1, 1, 1, 0, 0), |
---|
171 | # ('Grant_priv', 254, 1, 1, 1, 0, 0), |
---|
172 | # ('References_priv', 254, 1, 1, 1, 0, 0), |
---|
173 | # ('Index_priv', 254, 1, 1, 1, 0, 0), |
---|
174 | # ('Alter_priv', 254, 1, 1, 1, 0, 0), |
---|
175 | # ('Show_db_priv', 254, 1, 1, 1, 0, 0), |
---|
176 | # ('Super_priv', 254, 1, 1, 1, 0, 0), |
---|
177 | # ('Create_tmp_table_priv', 254, 1, 1, 1, 0, 0), |
---|
178 | # ('Lock_tables_priv', 254, 1, 1, 1, 0, 0), |
---|
179 | # ('Execute_priv', 254, 1, 1, 1, 0, 0), |
---|
180 | # ('Repl_slave_priv', 254, 1, 1, 1, 0, 0), |
---|
181 | # ('Repl_client_priv', 254, 1, 1, 1, 0, 0), |
---|
182 | # ('Create_view_priv', 254, 1, 1, 1, 0, 0), |
---|
183 | # ('Show_view_priv', 254, 1, 1, 1, 0, 0), |
---|
184 | # ('Create_routine_priv', 254, 1, 1, 1, 0, 0), |
---|
185 | # ('Alter_routine_priv', 254, 1, 1, 1, 0, 0), |
---|
186 | # ('Create_user_priv', 254, 1, 1, 1, 0, 0), |
---|
187 | # ('Event_priv', 254, 1, 1, 1, 0, 0), |
---|
188 | # ('Trigger_priv', 254, 1, 1, 1, 0, 0), |
---|
189 | # ('ssl_type', 254, 0, 9, 9, 0, 0), |
---|
190 | # ('ssl_cipher', 252, 0, 65535, 65535, 0, 0), |
---|
191 | # ('x509_issuer', 252, 0, 65535, 65535, 0, 0), |
---|
192 | # ('x509_subject', 252, 0, 65535, 65535, 0, 0), |
---|
193 | # ('max_questions', 3, 1, 11, 11, 0, 0), |
---|
194 | # ('max_updates', 3, 1, 11, 11, 0, 0), |
---|
195 | # ('max_connections', 3, 1, 11, 11, 0, 0), |
---|
196 | # ('max_user_connections', 3, 1, 11, 11, 0, 0)) |
---|
197 | # conn = self.connections[0] |
---|
198 | # c = conn.cursor() |
---|
199 | # c.execute("select * from mysql.user") |
---|
200 | # |
---|
201 | # self.assertEqual(r, c.description) |
---|
202 | |
---|
203 | def test_fetch_no_result(self): |
---|
204 | """ test a fetchone() with no rows """ |
---|
205 | conn = self.connections[0] |
---|
206 | c = conn.cursor() |
---|
207 | c.execute("create table test_nr (b varchar(32))") |
---|
208 | try: |
---|
209 | data = "pymysql" |
---|
210 | c.execute("insert into test_nr (b) values (%s)", (data,)) |
---|
211 | self.assertEqual(None, c.fetchone()) |
---|
212 | finally: |
---|
213 | c.execute("drop table test_nr") |
---|
214 | |
---|
215 | def test_aggregates(self): |
---|
216 | """ test aggregate functions """ |
---|
217 | conn = self.connections[0] |
---|
218 | c = conn.cursor() |
---|
219 | try: |
---|
220 | c.execute('create table test_aggregates (i integer)') |
---|
221 | for i in range(0, 10): |
---|
222 | c.execute('insert into test_aggregates (i) values (%s)', (i,)) |
---|
223 | c.execute('select sum(i) from test_aggregates') |
---|
224 | r, = c.fetchone() |
---|
225 | self.assertEqual(sum(range(0,10)), r) |
---|
226 | finally: |
---|
227 | c.execute('drop table test_aggregates') |
---|
228 | |
---|
229 | def test_single_tuple(self): |
---|
230 | """ test a single tuple """ |
---|
231 | conn = self.connections[0] |
---|
232 | c = conn.cursor() |
---|
233 | self.safe_create_table( |
---|
234 | conn, 'mystuff', |
---|
235 | "create table mystuff (id integer primary key)") |
---|
236 | c.execute("insert into mystuff (id) values (1)") |
---|
237 | c.execute("insert into mystuff (id) values (2)") |
---|
238 | c.execute("select id from mystuff where id in %s", ((1,),)) |
---|
239 | self.assertEqual([(1,)], list(c.fetchall())) |
---|
240 | c.close() |
---|
241 | |
---|
242 | def test_json(self): |
---|
243 | args = self.databases[0].copy() |
---|
244 | args["charset"] = "utf8mb4" |
---|
245 | conn = pymysql.connect(**args) |
---|
246 | if not self.mysql_server_is(conn, (5, 7, 0)): |
---|
247 | raise SkipTest("JSON type is not supported on MySQL <= 5.6") |
---|
248 | |
---|
249 | self.safe_create_table(conn, "test_json", """\ |
---|
250 | create table test_json ( |
---|
251 | id int not null, |
---|
252 | json JSON not null, |
---|
253 | primary key (id) |
---|
254 | );""") |
---|
255 | cur = conn.cursor() |
---|
256 | |
---|
257 | json_str = u'{"hello": "こんにちは"}' |
---|
258 | cur.execute("INSERT INTO test_json (id, `json`) values (42, %s)", (json_str,)) |
---|
259 | cur.execute("SELECT `json` from `test_json` WHERE `id`=42") |
---|
260 | res = cur.fetchone()[0] |
---|
261 | self.assertEqual(json.loads(res), json.loads(json_str)) |
---|
262 | |
---|
263 | cur.execute("SELECT CAST(%s AS JSON) AS x", (json_str,)) |
---|
264 | res = cur.fetchone()[0] |
---|
265 | self.assertEqual(json.loads(res), json.loads(json_str)) |
---|
266 | |
---|
267 | |
---|
268 | class TestBulkInserts(base.PyMySQLTestCase): |
---|
269 | |
---|
270 | cursor_type = pymysql.cursors.DictCursor |
---|
271 | |
---|
272 | def setUp(self): |
---|
273 | super(TestBulkInserts, self).setUp() |
---|
274 | self.conn = conn = self.connections[0] |
---|
275 | c = conn.cursor(self.cursor_type) |
---|
276 | |
---|
277 | # create a table ane some data to query |
---|
278 | self.safe_create_table(conn, 'bulkinsert', """\ |
---|
279 | CREATE TABLE bulkinsert |
---|
280 | ( |
---|
281 | id int(11), |
---|
282 | name char(20), |
---|
283 | age int, |
---|
284 | height int, |
---|
285 | PRIMARY KEY (id) |
---|
286 | ) |
---|
287 | """) |
---|
288 | |
---|
289 | def _verify_records(self, data): |
---|
290 | conn = self.connections[0] |
---|
291 | cursor = conn.cursor() |
---|
292 | cursor.execute("SELECT id, name, age, height from bulkinsert") |
---|
293 | result = cursor.fetchall() |
---|
294 | self.assertEqual(sorted(data), sorted(result)) |
---|
295 | |
---|
296 | def test_bulk_insert(self): |
---|
297 | conn = self.connections[0] |
---|
298 | cursor = conn.cursor() |
---|
299 | |
---|
300 | data = [(0, "bob", 21, 123), (1, "jim", 56, 45), (2, "fred", 100, 180)] |
---|
301 | cursor.executemany("insert into bulkinsert (id, name, age, height) " |
---|
302 | "values (%s,%s,%s,%s)", data) |
---|
303 | self.assertEqual( |
---|
304 | cursor._last_executed, bytearray( |
---|
305 | b"insert into bulkinsert (id, name, age, height) values " |
---|
306 | b"(0,'bob',21,123),(1,'jim',56,45),(2,'fred',100,180)")) |
---|
307 | cursor.execute('commit') |
---|
308 | self._verify_records(data) |
---|
309 | |
---|
310 | def test_bulk_insert_multiline_statement(self): |
---|
311 | conn = self.connections[0] |
---|
312 | cursor = conn.cursor() |
---|
313 | data = [(0, "bob", 21, 123), (1, "jim", 56, 45), (2, "fred", 100, 180)] |
---|
314 | cursor.executemany("""insert |
---|
315 | into bulkinsert (id, name, |
---|
316 | age, height) |
---|
317 | values (%s, |
---|
318 | %s , %s, |
---|
319 | %s ) |
---|
320 | """, data) |
---|
321 | self.assertEqual(cursor._last_executed.strip(), bytearray(b"""insert |
---|
322 | into bulkinsert (id, name, |
---|
323 | age, height) |
---|
324 | values (0, |
---|
325 | 'bob' , 21, |
---|
326 | 123 ),(1, |
---|
327 | 'jim' , 56, |
---|
328 | 45 ),(2, |
---|
329 | 'fred' , 100, |
---|
330 | 180 )""")) |
---|
331 | cursor.execute('commit') |
---|
332 | self._verify_records(data) |
---|
333 | |
---|
334 | def test_bulk_insert_single_record(self): |
---|
335 | conn = self.connections[0] |
---|
336 | cursor = conn.cursor() |
---|
337 | data = [(0, "bob", 21, 123)] |
---|
338 | cursor.executemany("insert into bulkinsert (id, name, age, height) " |
---|
339 | "values (%s,%s,%s,%s)", data) |
---|
340 | cursor.execute('commit') |
---|
341 | self._verify_records(data) |
---|
342 | |
---|
343 | def test_issue_288(self): |
---|
344 | """executemany should work with "insert ... on update" """ |
---|
345 | conn = self.connections[0] |
---|
346 | cursor = conn.cursor() |
---|
347 | data = [(0, "bob", 21, 123), (1, "jim", 56, 45), (2, "fred", 100, 180)] |
---|
348 | cursor.executemany("""insert |
---|
349 | into bulkinsert (id, name, |
---|
350 | age, height) |
---|
351 | values (%s, |
---|
352 | %s , %s, |
---|
353 | %s ) on duplicate key update |
---|
354 | age = values(age) |
---|
355 | """, data) |
---|
356 | self.assertEqual(cursor._last_executed.strip(), bytearray(b"""insert |
---|
357 | into bulkinsert (id, name, |
---|
358 | age, height) |
---|
359 | values (0, |
---|
360 | 'bob' , 21, |
---|
361 | 123 ),(1, |
---|
362 | 'jim' , 56, |
---|
363 | 45 ),(2, |
---|
364 | 'fred' , 100, |
---|
365 | 180 ) on duplicate key update |
---|
366 | age = values(age)""")) |
---|
367 | cursor.execute('commit') |
---|
368 | self._verify_records(data) |
---|
369 | |
---|
370 | def test_warnings(self): |
---|
371 | con = self.connections[0] |
---|
372 | cur = con.cursor() |
---|
373 | with warnings.catch_warnings(record=True) as ws: |
---|
374 | warnings.simplefilter("always") |
---|
375 | cur.execute("drop table if exists no_exists_table") |
---|
376 | self.assertEqual(len(ws), 1) |
---|
377 | self.assertEqual(ws[0].category, pymysql.Warning) |
---|
378 | if u"no_exists_table" not in str(ws[0].message): |
---|
379 | self.fail("'no_exists_table' not in %s" % (str(ws[0].message),)) |
---|