1 | # -*- coding: utf-8 -*- |
---|
2 | """ |
---|
3 | Basic unit tests |
---|
4 | """ |
---|
5 | |
---|
6 | from __future__ import print_function |
---|
7 | import os |
---|
8 | import glob |
---|
9 | import datetime |
---|
10 | import json |
---|
11 | import pickle |
---|
12 | |
---|
13 | from pydal._compat import basestring, StringIO, integer_types, xrange, BytesIO, to_bytes |
---|
14 | from pydal import DAL, Field |
---|
15 | from pydal.helpers.classes import SQLALL, OpRow |
---|
16 | from pydal.objects import Table, Expression, Row |
---|
17 | from ._compat import unittest |
---|
18 | from ._adapt import ( |
---|
19 | DEFAULT_URI, |
---|
20 | IS_POSTGRESQL, |
---|
21 | IS_SQLITE, |
---|
22 | IS_MSSQL, |
---|
23 | IS_MYSQL, |
---|
24 | IS_TERADATA, |
---|
25 | IS_NOSQL, |
---|
26 | IS_ORACLE, |
---|
27 | ) |
---|
28 | |
---|
29 | from ._helpers import DALtest |
---|
30 | |
---|
31 | long = integer_types[-1] |
---|
32 | |
---|
33 | print("Testing against %s engine (%s)" % (DEFAULT_URI.partition(":")[0], DEFAULT_URI)) |
---|
34 | |
---|
35 | |
---|
36 | ALLOWED_DATATYPES = [ |
---|
37 | "string", |
---|
38 | "text", |
---|
39 | "integer", |
---|
40 | "boolean", |
---|
41 | "double", |
---|
42 | "blob", |
---|
43 | "date", |
---|
44 | "time", |
---|
45 | "datetime", |
---|
46 | "upload", |
---|
47 | "password", |
---|
48 | "json", |
---|
49 | "bigint", |
---|
50 | ] |
---|
51 | |
---|
52 | |
---|
53 | def setUpModule(): |
---|
54 | if IS_MYSQL or IS_TERADATA or IS_ORACLE: |
---|
55 | db = DAL(DEFAULT_URI, check_reserved=["all"]) |
---|
56 | |
---|
57 | def clean_table(db, tablename): |
---|
58 | try: |
---|
59 | db.define_table(tablename) |
---|
60 | except Exception as e: |
---|
61 | pass |
---|
62 | try: |
---|
63 | db[tablename].drop() |
---|
64 | except Exception as e: |
---|
65 | pass |
---|
66 | |
---|
67 | for tablename in [ |
---|
68 | "tt", |
---|
69 | "t0", |
---|
70 | "t1", |
---|
71 | "t2", |
---|
72 | "t3", |
---|
73 | "t4", |
---|
74 | "easy_name", |
---|
75 | "tt_archive", |
---|
76 | "pet_farm", |
---|
77 | "person", |
---|
78 | ]: |
---|
79 | clean_table(db, tablename) |
---|
80 | db.close() |
---|
81 | |
---|
82 | |
---|
83 | def tearDownModule(): |
---|
84 | if os.path.isfile("sql.log"): |
---|
85 | os.unlink("sql.log") |
---|
86 | for a in glob.glob("*.table"): |
---|
87 | os.unlink(a) |
---|
88 | |
---|
89 | |
---|
90 | class TestFields(DALtest): |
---|
91 | def testFieldName(self): |
---|
92 | """ |
---|
93 | - a "str" something |
---|
94 | - not a method or property of Table |
---|
95 | - "dotted-notation" friendly: |
---|
96 | - a valid python identifier |
---|
97 | - not a python keyword |
---|
98 | - not starting with underscore or an integer |
---|
99 | - not containing dots |
---|
100 | |
---|
101 | Basically, anything alphanumeric, no symbols, only underscore as |
---|
102 | punctuation |
---|
103 | """ |
---|
104 | |
---|
105 | # Check that Fields cannot start with underscores |
---|
106 | self.assertRaises(SyntaxError, Field, "_abc", "string") |
---|
107 | |
---|
108 | # Check that Fields cannot contain punctuation other than underscores |
---|
109 | self.assertRaises(SyntaxError, Field, "a.bc", "string") |
---|
110 | |
---|
111 | # Check that Fields cannot be a name of a method or property of Table |
---|
112 | for x in ["drop", "on", "truncate"]: |
---|
113 | self.assertRaises(SyntaxError, Field, x, "string") |
---|
114 | |
---|
115 | # Check that Fields allows underscores in the body of a field name. |
---|
116 | self.assertTrue( |
---|
117 | Field("a_bc", "string"), |
---|
118 | "Field isn't allowing underscores in fieldnames. It should.", |
---|
119 | ) |
---|
120 | |
---|
121 | # Check that Field names don't allow a python keyword |
---|
122 | self.assertRaises(SyntaxError, Field, "True", "string") |
---|
123 | self.assertRaises(SyntaxError, Field, "elif", "string") |
---|
124 | self.assertRaises(SyntaxError, Field, "while", "string") |
---|
125 | |
---|
126 | # Check that Field names don't allow a non-valid python identifier |
---|
127 | non_valid_examples = ["1x", "xx$%@%", "xx yy", "yy\na", "yy\n"] |
---|
128 | for a in non_valid_examples: |
---|
129 | self.assertRaises(SyntaxError, Field, a, "string") |
---|
130 | |
---|
131 | # Check that Field names don't allow a unicode string |
---|
132 | non_valid_examples = non_valid_examples = [ |
---|
133 | "ℙƴ☂ℌøἤ", |
---|
134 | u"ℙƴ☂ℌøἤ", |
---|
135 | u"àè", |
---|
136 | u"ṧøмℯ", |
---|
137 | u"тεṧт", |
---|
138 | u"♥αłüℯṧ", |
---|
139 | u"ℊεᾔ℮яαт℮∂", |
---|
140 | u"♭ƴ", |
---|
141 | u"ᾔ☤ρℌℓ☺ḓ", |
---|
142 | ] |
---|
143 | for a in non_valid_examples: |
---|
144 | self.assertRaises(SyntaxError, Field, a, "string") |
---|
145 | |
---|
146 | def testFieldTypes(self): |
---|
147 | |
---|
148 | # Check that string, and password default length is 512 |
---|
149 | for typ in ["string", "password"]: |
---|
150 | self.assertTrue( |
---|
151 | Field("abc", typ).length == 512, |
---|
152 | "Default length for type '%s' is not 512 or 255" % typ, |
---|
153 | ) |
---|
154 | |
---|
155 | # Check that upload default length is 512 |
---|
156 | self.assertTrue( |
---|
157 | Field("abc", "upload").length == 512, |
---|
158 | "Default length for type 'upload' is not 512", |
---|
159 | ) |
---|
160 | |
---|
161 | # Check that Tables passed in the type creates a reference |
---|
162 | self.assertTrue( |
---|
163 | Field("abc", Table(None, "temp")).type == "reference temp", |
---|
164 | "Passing a Table does not result in a reference type.", |
---|
165 | ) |
---|
166 | |
---|
167 | def testFieldLabels(self): |
---|
168 | |
---|
169 | # Check that a label is successfully built from the supplied fieldname |
---|
170 | self.assertTrue( |
---|
171 | Field("abc", "string").label == "Abc", "Label built is incorrect" |
---|
172 | ) |
---|
173 | self.assertTrue( |
---|
174 | Field("abc_def", "string").label == "Abc Def", "Label built is incorrect" |
---|
175 | ) |
---|
176 | |
---|
177 | def testFieldFormatters(self): # Formatter should be called Validator |
---|
178 | |
---|
179 | # Test the default formatters |
---|
180 | for typ in ALLOWED_DATATYPES: |
---|
181 | f = Field("abc", typ) |
---|
182 | if typ not in ["date", "time", "datetime"]: |
---|
183 | isinstance(f.formatter("test"), str) |
---|
184 | else: |
---|
185 | isinstance(f.formatter(datetime.datetime.now()), str) |
---|
186 | |
---|
187 | def testUploadField(self): |
---|
188 | import tempfile |
---|
189 | |
---|
190 | stream = tempfile.NamedTemporaryFile() |
---|
191 | content = b"this is the stream content" |
---|
192 | stream.write(content) |
---|
193 | # rewind before inserting |
---|
194 | stream.seek(0) |
---|
195 | |
---|
196 | db = self.connect() |
---|
197 | db.define_table( |
---|
198 | "tt", |
---|
199 | Field( |
---|
200 | "fileobj", "upload", uploadfolder=tempfile.gettempdir(), autodelete=True |
---|
201 | ), |
---|
202 | ) |
---|
203 | f_id = db.tt.insert(fileobj=stream) |
---|
204 | |
---|
205 | row = db.tt[f_id] |
---|
206 | (retr_name, retr_stream) = db.tt.fileobj.retrieve(row.fileobj) |
---|
207 | |
---|
208 | # name should be the same |
---|
209 | self.assertEqual(retr_name, os.path.basename(stream.name)) |
---|
210 | # content should be the same |
---|
211 | retr_content = retr_stream.read() |
---|
212 | self.assertEqual(retr_content, content) |
---|
213 | |
---|
214 | # close streams! |
---|
215 | retr_stream.close() |
---|
216 | |
---|
217 | # delete |
---|
218 | row.delete_record() |
---|
219 | |
---|
220 | # drop |
---|
221 | db.tt.drop() |
---|
222 | |
---|
223 | # this part is triggered only if fs (AKA pyfilesystem) module is installed |
---|
224 | try: |
---|
225 | from fs.memoryfs import MemoryFS |
---|
226 | |
---|
227 | # rewind before inserting |
---|
228 | stream.seek(0) |
---|
229 | db.define_table( |
---|
230 | "tt", Field("fileobj", "upload", uploadfs=MemoryFS(), autodelete=True) |
---|
231 | ) |
---|
232 | |
---|
233 | f_id = db.tt.insert(fileobj=stream) |
---|
234 | |
---|
235 | row = db.tt[f_id] |
---|
236 | (retr_name, retr_stream) = db.tt.fileobj.retrieve(row.fileobj) |
---|
237 | |
---|
238 | # name should be the same |
---|
239 | self.assertEqual(retr_name, os.path.basename(stream.name)) |
---|
240 | # content should be the same |
---|
241 | retr_content = retr_stream.read() |
---|
242 | self.assertEqual(retr_content, content) |
---|
243 | |
---|
244 | # close streams |
---|
245 | retr_stream.close() |
---|
246 | stream.close() |
---|
247 | |
---|
248 | # delete |
---|
249 | row.delete_record() |
---|
250 | |
---|
251 | # drop |
---|
252 | db.tt.drop() |
---|
253 | |
---|
254 | except ImportError: |
---|
255 | pass |
---|
256 | |
---|
257 | def testBlobBytes(self): |
---|
258 | # Test blob with latin1 encoded bytes |
---|
259 | db = self.connect() |
---|
260 | obj = pickle.dumps("0") |
---|
261 | db.define_table("tt", Field("aa", "blob")) |
---|
262 | self.assertEqual(db.tt.insert(aa=obj), 1) |
---|
263 | self.assertEqual(to_bytes(db().select(db.tt.aa)[0].aa), obj) |
---|
264 | self.assertEqual(db.tt[1].aa, obj) |
---|
265 | self.assertEqual(BytesIO(to_bytes(db.tt[1].aa)).read(), obj) |
---|
266 | db.tt.drop() |
---|
267 | |
---|
268 | def testRun(self): |
---|
269 | # Test all field types and their return values |
---|
270 | db = self.connect() |
---|
271 | for ft in ["string", "text", "password", "upload", "blob"]: |
---|
272 | db.define_table("tt", Field("aa", ft, default="")) |
---|
273 | self.assertEqual(db.tt.insert(aa="ö"), 1) |
---|
274 | if not (IS_ORACLE and (ft == "text" or ft == "blob")): |
---|
275 | # only verify insert for LOB types in oracle; |
---|
276 | # select may create seg fault in test env |
---|
277 | self.assertEqual(db().select(db.tt.aa)[0].aa, "ö") |
---|
278 | db.tt.drop() |
---|
279 | db.define_table("tt", Field("aa", "integer", default=1)) |
---|
280 | self.assertEqual(db.tt.insert(aa=3), 1) |
---|
281 | self.assertEqual(db().select(db.tt.aa)[0].aa, 3) |
---|
282 | db.tt.drop() |
---|
283 | |
---|
284 | db.define_table("tt", Field("aa", "string")) |
---|
285 | ucs = "A\xc3\xa9 A" |
---|
286 | self.assertEqual(db.tt.insert(aa=ucs), 1) |
---|
287 | self.assertEqual(db().select(db.tt.aa)[0].aa, ucs) |
---|
288 | self.assertEqual(db().select(db.tt.aa.with_alias("zz"))[0].zz, ucs) |
---|
289 | db.tt.drop() |
---|
290 | |
---|
291 | db.define_table("tt", Field("aa", "double", default=1)) |
---|
292 | self.assertEqual(db.tt.insert(aa=3.1), 1) |
---|
293 | self.assertEqual(db().select(db.tt.aa)[0].aa, 3.1) |
---|
294 | db.tt.drop() |
---|
295 | db.define_table("tt", Field("aa", "boolean", default=True)) |
---|
296 | self.assertEqual(db.tt.insert(aa=True), 1) |
---|
297 | self.assertEqual(db().select(db.tt.aa)[0].aa, True) |
---|
298 | db.tt.drop() |
---|
299 | db.define_table("tt", Field("aa", "json", default={})) |
---|
300 | # test different python objects for correct serialization in json |
---|
301 | objs = [ |
---|
302 | {"a": 1, "b": 2}, |
---|
303 | [1, 2, 3], |
---|
304 | "abc", |
---|
305 | True, |
---|
306 | False, |
---|
307 | None, |
---|
308 | 11, |
---|
309 | 14.3, |
---|
310 | long(11), |
---|
311 | ] |
---|
312 | for obj in objs: |
---|
313 | rtn_id = db.tt.insert(aa=obj) |
---|
314 | rtn = db(db.tt.id == rtn_id).select().first().aa |
---|
315 | self.assertEqual(obj, rtn) |
---|
316 | db.tt.drop() |
---|
317 | db.define_table("tt", Field("aa", "date", default=datetime.date.today())) |
---|
318 | t0 = datetime.date.today() |
---|
319 | self.assertEqual(db.tt.insert(aa=t0), 1) |
---|
320 | self.assertEqual(db().select(db.tt.aa)[0].aa, t0) |
---|
321 | db.tt.drop() |
---|
322 | db.define_table( |
---|
323 | "tt", Field("aa", "datetime", default=datetime.datetime.today()) |
---|
324 | ) |
---|
325 | t0 = datetime.datetime(1971, 12, 21, 10, 30, 55, 0,) |
---|
326 | self.assertEqual(db.tt.insert(aa=t0), 1) |
---|
327 | self.assertEqual(db().select(db.tt.aa)[0].aa, t0) |
---|
328 | |
---|
329 | ## Row APIs |
---|
330 | row = db().select(db.tt.aa)[0] |
---|
331 | self.assertEqual(db.tt[1].aa, t0) |
---|
332 | self.assertEqual(db.tt["aa"], db.tt.aa) |
---|
333 | self.assertEqual(db.tt(1).aa, t0) |
---|
334 | self.assertTrue(db.tt(1, aa=None) == None) |
---|
335 | self.assertFalse(db.tt(1, aa=t0) == None) |
---|
336 | self.assertEqual(row.aa, t0) |
---|
337 | self.assertEqual(row["aa"], t0) |
---|
338 | self.assertEqual(row["tt.aa"], t0) |
---|
339 | self.assertEqual(row("tt.aa"), t0) |
---|
340 | |
---|
341 | ## Lazy and Virtual fields |
---|
342 | db.tt.b = Field.Virtual(lambda row: row.tt.aa) |
---|
343 | # test for FieldVirtual.bind |
---|
344 | self.assertEqual(db.tt.b.tablename, "tt") |
---|
345 | self.assertEqual(db.tt.b.name, "b") |
---|
346 | db.tt.c = Field.Lazy(lambda row: row.tt.aa) |
---|
347 | # test for FieldMethod.bind |
---|
348 | self.assertEqual(db.tt.c.name, "c") |
---|
349 | rows = db().select(db.tt.aa) |
---|
350 | row = rows[0] |
---|
351 | self.assertEqual(row.b, t0) |
---|
352 | self.assertEqual(row.c(), t0) |
---|
353 | # test for BasicRows.colnames_fields |
---|
354 | rows.colnames.insert(0, "tt.b") |
---|
355 | rows.colnames.insert(1, "tt.c") |
---|
356 | colnames_fields = rows.colnames_fields |
---|
357 | self.assertIs(colnames_fields[0], db.tt.b) |
---|
358 | self.assertIs(colnames_fields[1], db.tt.c) |
---|
359 | db.tt.drop() |
---|
360 | |
---|
361 | if not IS_ORACLE: |
---|
362 | db.define_table("tt", Field("aa", "time", default="11:30")) |
---|
363 | t0 = datetime.time(10, 30, 55) |
---|
364 | self.assertEqual(db.tt.insert(aa=t0), 1) |
---|
365 | self.assertEqual(db().select(db.tt.aa)[0].aa, t0) |
---|
366 | db.tt.drop() |
---|
367 | |
---|
368 | # aggregation type detection |
---|
369 | db.define_table( |
---|
370 | "tt", Field("aa", "datetime", default=datetime.datetime.today()) |
---|
371 | ) |
---|
372 | t0 = datetime.datetime(1971, 12, 21, 10, 30, 55, 0) |
---|
373 | self.assertEqual(db.tt.insert(aa=t0), 1) |
---|
374 | self.assertEqual(db().select(db.tt.aa.min())[0][db.tt.aa.min()], t0) |
---|
375 | db.tt.drop() |
---|
376 | |
---|
377 | |
---|
378 | class TestTables(unittest.TestCase): |
---|
379 | def testTableNames(self): |
---|
380 | """ |
---|
381 | - a "str" something |
---|
382 | - not a method or property of DAL |
---|
383 | - "dotted-notation" friendly: |
---|
384 | - a valid python identifier |
---|
385 | - not a python keyword |
---|
386 | - not starting with underscore or an integer |
---|
387 | - not containing dots |
---|
388 | |
---|
389 | Basically, anything alphanumeric, no symbols, only underscore as |
---|
390 | punctuation |
---|
391 | """ |
---|
392 | |
---|
393 | # Check that Tables cannot start with underscores |
---|
394 | self.assertRaises(SyntaxError, Table, None, "_abc") |
---|
395 | |
---|
396 | # Check that Tables cannot contain punctuation other than underscores |
---|
397 | self.assertRaises(SyntaxError, Table, None, "a.bc") |
---|
398 | |
---|
399 | # Check that Tables cannot be a name of a method or property of DAL |
---|
400 | for x in ["define_table", "tables", "as_dict"]: |
---|
401 | self.assertRaises(SyntaxError, Table, None, x) |
---|
402 | |
---|
403 | # Check that Table allows underscores in the body of a field name. |
---|
404 | self.assertTrue( |
---|
405 | Table(None, "a_bc"), |
---|
406 | "Table isn't allowing underscores in tablename. It should.", |
---|
407 | ) |
---|
408 | |
---|
409 | # Check that Table names don't allow a python keyword |
---|
410 | self.assertRaises(SyntaxError, Table, None, "True") |
---|
411 | self.assertRaises(SyntaxError, Table, None, "elif") |
---|
412 | self.assertRaises(SyntaxError, Table, None, "while") |
---|
413 | |
---|
414 | # Check that Table names don't allow a non-valid python identifier |
---|
415 | non_valid_examples = ["1x", "xx$%@%", "xx yy", "yy\na", "yy\n"] |
---|
416 | for a in non_valid_examples: |
---|
417 | self.assertRaises(SyntaxError, Table, None, a) |
---|
418 | |
---|
419 | # Check that Table names don't allow a unicode string |
---|
420 | non_valid_examples = [ |
---|
421 | "ℙƴ☂ℌøἤ", |
---|
422 | u"ℙƴ☂ℌøἤ", |
---|
423 | u"àè", |
---|
424 | u"ṧøмℯ", |
---|
425 | u"тεṧт", |
---|
426 | u"♥αłüℯṧ", |
---|
427 | u"ℊεᾔ℮яαт℮∂", |
---|
428 | u"♭ƴ", |
---|
429 | u"ᾔ☤ρℌℓ☺ḓ", |
---|
430 | ] |
---|
431 | for a in non_valid_examples: |
---|
432 | self.assertRaises(SyntaxError, Table, None, a) |
---|
433 | |
---|
434 | |
---|
435 | class TestAll(unittest.TestCase): |
---|
436 | def setUp(self): |
---|
437 | self.pt = Table(None, "PseudoTable", Field("name"), Field("birthdate")) |
---|
438 | |
---|
439 | def testSQLALL(self): |
---|
440 | ans = "PseudoTable.id, PseudoTable.name, PseudoTable.birthdate" |
---|
441 | self.assertEqual(str(SQLALL(self.pt)), ans) |
---|
442 | |
---|
443 | |
---|
444 | class TestTable(DALtest): |
---|
445 | def testTableCreation(self): |
---|
446 | |
---|
447 | # Check for error when not passing type other than Field or Table |
---|
448 | |
---|
449 | self.assertRaises(SyntaxError, Table, None, "test", None) |
---|
450 | |
---|
451 | persons = Table( |
---|
452 | None, "persons", Field("firstname", "string"), Field("lastname", "string") |
---|
453 | ) |
---|
454 | |
---|
455 | # Does it have the correct fields? |
---|
456 | |
---|
457 | self.assertTrue(set(persons.fields).issuperset(set(["firstname", "lastname"]))) |
---|
458 | |
---|
459 | # ALL is set correctly |
---|
460 | |
---|
461 | self.assertTrue("persons.firstname, persons.lastname" in str(persons.ALL)) |
---|
462 | |
---|
463 | def testTableAlias(self): |
---|
464 | db = self.connect() |
---|
465 | persons = Table( |
---|
466 | db, "persons", Field("firstname", "string"), Field("lastname", "string") |
---|
467 | ) |
---|
468 | aliens = persons.with_alias("aliens") |
---|
469 | |
---|
470 | # Are the different table instances with the same fields |
---|
471 | |
---|
472 | self.assertTrue(persons is not aliens) |
---|
473 | self.assertTrue(set(persons.fields) == set(aliens.fields)) |
---|
474 | |
---|
475 | def testTableInheritance(self): |
---|
476 | persons = Table( |
---|
477 | None, "persons", Field("firstname", "string"), Field("lastname", "string") |
---|
478 | ) |
---|
479 | customers = Table( |
---|
480 | None, "customers", Field("items_purchased", "integer"), persons |
---|
481 | ) |
---|
482 | self.assertTrue( |
---|
483 | set(customers.fields).issuperset( |
---|
484 | set(["items_purchased", "firstname", "lastname"]) |
---|
485 | ) |
---|
486 | ) |
---|
487 | |
---|
488 | |
---|
489 | class TestInsert(DALtest): |
---|
490 | def testRun(self): |
---|
491 | db = self.connect() |
---|
492 | db.define_table("tt", Field("aa")) |
---|
493 | self.assertEqual(db.tt.insert(aa="1"), 1) |
---|
494 | if not IS_TERADATA: |
---|
495 | self.assertEqual(db.tt.insert(aa="1"), 2) |
---|
496 | self.assertEqual(db.tt.insert(aa="1"), 3) |
---|
497 | else: |
---|
498 | self.assertEqual(db.tt.insert(aa="1"), 1) |
---|
499 | self.assertEqual(db.tt.insert(aa="1"), 1) |
---|
500 | |
---|
501 | self.assertEqual(db(db.tt.aa == "1").count(), 3) |
---|
502 | self.assertEqual(db(db.tt.aa == "2").isempty(), True) |
---|
503 | self.assertEqual(db(db.tt.aa == "1").update(aa="2"), 3) |
---|
504 | self.assertEqual(db(db.tt.aa == "2").count(), 3) |
---|
505 | self.assertEqual(db(db.tt.aa == "2").isempty(), False) |
---|
506 | self.assertEqual(db(db.tt.aa == "2").delete(), 3) |
---|
507 | self.assertEqual(db(db.tt.aa == "2").isempty(), True) |
---|
508 | |
---|
509 | |
---|
510 | class TestSelect(DALtest): |
---|
511 | def testRun(self): |
---|
512 | db = self.connect() |
---|
513 | db.define_table("tt", Field("aa")) |
---|
514 | self.assertEqual(db.tt.insert(aa="1"), 1) |
---|
515 | if not IS_TERADATA: |
---|
516 | self.assertEqual(db.tt.insert(aa="2"), 2) |
---|
517 | self.assertEqual(db.tt.insert(aa="3"), 3) |
---|
518 | else: |
---|
519 | self.assertEqual(db.tt.insert(aa="2"), 1) |
---|
520 | self.assertEqual(db.tt.insert(aa="3"), 1) |
---|
521 | self.assertEqual(db(db.tt.id > 0).count(), 3) |
---|
522 | self.assertEqual( |
---|
523 | db(db.tt.id > 0).select(orderby=~db.tt.aa | db.tt.id)[0].aa, "3" |
---|
524 | ) |
---|
525 | self.assertEqual(len(db(db.tt.id > 0).select(limitby=(1, 2))), 1) |
---|
526 | self.assertEqual(db(db.tt.id > 0).select(limitby=(1, 2))[0].aa, "2") |
---|
527 | self.assertEqual(len(db().select(db.tt.ALL)), 3) |
---|
528 | self.assertEqual(db(db.tt.aa == None).count(), 0) |
---|
529 | self.assertEqual(db(db.tt.aa != None).count(), 3) |
---|
530 | self.assertEqual(db(db.tt.aa > "1").count(), 2) |
---|
531 | self.assertEqual(db(db.tt.aa >= "1").count(), 3) |
---|
532 | self.assertEqual(db(db.tt.aa == "1").count(), 1) |
---|
533 | self.assertEqual(db(db.tt.aa != "1").count(), 2) |
---|
534 | self.assertEqual(db(db.tt.aa < "3").count(), 2) |
---|
535 | self.assertEqual(db(db.tt.aa <= "3").count(), 3) |
---|
536 | self.assertEqual(db(db.tt.aa > "1")(db.tt.aa < "3").count(), 1) |
---|
537 | self.assertEqual(db((db.tt.aa > "1") & (db.tt.aa < "3")).count(), 1) |
---|
538 | self.assertEqual(db((db.tt.aa > "1") | (db.tt.aa < "3")).count(), 3) |
---|
539 | self.assertEqual(db((db.tt.aa > "1") & ~(db.tt.aa > "2")).count(), 1) |
---|
540 | self.assertEqual(db(~(db.tt.aa > "1") & (db.tt.aa > "2")).count(), 0) |
---|
541 | # Test for REGEX_TABLE_DOT_FIELD |
---|
542 | self.assertEqual(db(db.tt).select("tt.aa").first()[db.tt.aa], "1") |
---|
543 | |
---|
544 | def testTestQuery(self): |
---|
545 | db = self.connect() |
---|
546 | db._adapter.test_connection() |
---|
547 | |
---|
548 | def testListInteger(self): |
---|
549 | db = self.connect() |
---|
550 | db.define_table("tt", Field("aa", "list:integer")) |
---|
551 | l = [1, 2, 3, 4, 5] |
---|
552 | db.tt.insert(aa=l) |
---|
553 | self.assertEqual(db(db.tt).select("tt.aa").first()[db.tt.aa], l) |
---|
554 | |
---|
555 | def testListString(self): |
---|
556 | db = self.connect() |
---|
557 | db.define_table("tt", Field("aa", "list:string")) |
---|
558 | l = ["a", "b", "c"] |
---|
559 | db.tt.insert(aa=l) |
---|
560 | self.assertEqual(db(db.tt).select("tt.aa").first()[db.tt.aa], l) |
---|
561 | |
---|
562 | def testListReference(self): |
---|
563 | db = self.connect() |
---|
564 | db.define_table("t0", Field("aa", "string")) |
---|
565 | db.define_table("tt", Field("t0_id", "list:reference t0")) |
---|
566 | id_a1 = db.t0.insert(aa="test1") |
---|
567 | id_a2 = db.t0.insert(aa="test2") |
---|
568 | ref1 = [id_a1] |
---|
569 | ref2 = [id_a2] |
---|
570 | ref3 = [id_a1, id_a2] |
---|
571 | db.tt.insert(t0_id=ref1) |
---|
572 | self.assertEqual(db(db.tt).select(db.tt.t0_id).last()[db.tt.t0_id], ref1) |
---|
573 | db.tt.insert(t0_id=ref2) |
---|
574 | self.assertEqual(db(db.tt).select(db.tt.t0_id).last()[db.tt.t0_id], ref2) |
---|
575 | db.tt.insert(t0_id=ref3) |
---|
576 | self.assertEqual(db(db.tt).select(db.tt.t0_id).last()[db.tt.t0_id], ref3) |
---|
577 | |
---|
578 | self.assertEqual(db(db.tt.t0_id == ref3).count(), 1) |
---|
579 | |
---|
580 | def testGroupByAndDistinct(self): |
---|
581 | db = self.connect() |
---|
582 | db.define_table( |
---|
583 | "tt", Field("aa"), Field("bb", "integer"), Field("cc", "integer") |
---|
584 | ) |
---|
585 | db.tt.insert(aa="4", bb=1, cc=1) |
---|
586 | db.tt.insert(aa="3", bb=2, cc=1) |
---|
587 | db.tt.insert(aa="3", bb=1, cc=1) |
---|
588 | db.tt.insert(aa="1", bb=1, cc=1) |
---|
589 | db.tt.insert(aa="1", bb=2, cc=1) |
---|
590 | db.tt.insert(aa="1", bb=3, cc=1) |
---|
591 | db.tt.insert(aa="1", bb=4, cc=1) |
---|
592 | db.tt.insert(aa="2", bb=1, cc=1) |
---|
593 | db.tt.insert(aa="2", bb=2, cc=1) |
---|
594 | db.tt.insert(aa="2", bb=3, cc=1) |
---|
595 | self.assertEqual(db(db.tt).count(), 10) |
---|
596 | |
---|
597 | # test groupby |
---|
598 | result = db().select(db.tt.aa, db.tt.bb.sum(), groupby=db.tt.aa) |
---|
599 | self.assertEqual(len(result), 4) |
---|
600 | result = db().select( |
---|
601 | db.tt.aa, db.tt.bb.sum(), groupby=db.tt.aa, orderby=db.tt.aa |
---|
602 | ) |
---|
603 | self.assertEqual(tuple(result.response[2]), ("3", 3)) |
---|
604 | result = db().select( |
---|
605 | db.tt.aa, db.tt.bb.sum(), groupby=db.tt.aa, orderby=~db.tt.aa |
---|
606 | ) |
---|
607 | self.assertEqual(tuple(result.response[1]), ("3", 3)) |
---|
608 | result = db().select( |
---|
609 | db.tt.aa, |
---|
610 | db.tt.bb, |
---|
611 | db.tt.cc.sum(), |
---|
612 | groupby=db.tt.aa | db.tt.bb, |
---|
613 | orderby=(db.tt.aa | ~db.tt.bb), |
---|
614 | ) |
---|
615 | self.assertEqual(tuple(result.response[4]), ("2", 3, 1)) |
---|
616 | result = db().select( |
---|
617 | db.tt.aa, |
---|
618 | db.tt.bb.sum(), |
---|
619 | groupby=db.tt.aa, |
---|
620 | orderby=~db.tt.aa, |
---|
621 | limitby=(1, 2), |
---|
622 | ) |
---|
623 | self.assertEqual(len(result), 1) |
---|
624 | self.assertEqual(tuple(result.response[0]), ("3", 3)) |
---|
625 | result = db().select( |
---|
626 | db.tt.aa, db.tt.bb.sum(), groupby=db.tt.aa, orderby=db.tt.aa, limitby=(0, 3) |
---|
627 | ) |
---|
628 | self.assertEqual(len(result), 3) |
---|
629 | self.assertEqual(tuple(result.response[2]), ("3", 3)) |
---|
630 | |
---|
631 | # test having |
---|
632 | self.assertEqual( |
---|
633 | len( |
---|
634 | db().select( |
---|
635 | db.tt.aa, |
---|
636 | db.tt.bb.sum(), |
---|
637 | groupby=db.tt.aa, |
---|
638 | having=db.tt.bb.sum() > 2, |
---|
639 | ) |
---|
640 | ), |
---|
641 | 3, |
---|
642 | ) |
---|
643 | |
---|
644 | # test distinct |
---|
645 | result = db().select(db.tt.aa, db.tt.cc, distinct=True) |
---|
646 | self.assertEqual(len(result), 4) |
---|
647 | result = db().select(db.tt.cc, distinct=True, groupby=db.tt.cc) |
---|
648 | self.assertEqual(len(result), 1) |
---|
649 | self.assertEqual(result[0].cc, 1) |
---|
650 | result = db().select(db.tt.aa, distinct=True, orderby=~db.tt.aa) |
---|
651 | self.assertEqual(result[2].aa, "2") |
---|
652 | self.assertEqual(result[1].aa, "3") |
---|
653 | result = db().select( |
---|
654 | db.tt.aa, db.tt.bb, distinct=True, orderby=(db.tt.aa | ~db.tt.bb) |
---|
655 | ) |
---|
656 | self.assertEqual(tuple(result.response[4]), ("2", 3)) |
---|
657 | result = db().select(db.tt.aa, distinct=True, orderby=~db.tt.aa, limitby=(1, 2)) |
---|
658 | self.assertEqual(len(result), 1) |
---|
659 | self.assertEqual(result[0].aa, "3") |
---|
660 | |
---|
661 | # test count distinct |
---|
662 | db.tt.insert(aa="2", bb=3, cc=1) |
---|
663 | self.assertEqual(db(db.tt).count(distinct=db.tt.aa), 4) |
---|
664 | self.assertEqual(db(db.tt.aa).count(db.tt.aa), 4) |
---|
665 | self.assertEqual(db(db.tt.aa).count(), 11) |
---|
666 | count = db.tt.aa.count() |
---|
667 | self.assertEqual(db(db.tt).select(count).first()[count], 11) |
---|
668 | |
---|
669 | count = db.tt.aa.count(distinct=True) |
---|
670 | sum = db.tt.bb.sum() |
---|
671 | result = db(db.tt).select(count, sum) |
---|
672 | self.assertEqual(tuple(result.response[0]), (4, 23)) |
---|
673 | |
---|
674 | def testCoalesce(self): |
---|
675 | db = self.connect() |
---|
676 | db.define_table("tt", Field("aa"), Field("bb"), Field("cc"), Field("dd")) |
---|
677 | db.tt.insert(aa="xx") |
---|
678 | db.tt.insert(aa="xx", bb="yy") |
---|
679 | db.tt.insert(aa="xx", bb="yy", cc="zz") |
---|
680 | if not IS_ORACLE: |
---|
681 | # empty string is treated as null |
---|
682 | db.tt.insert(aa="xx", bb="yy", cc="zz", dd="") |
---|
683 | result = db(db.tt).select(db.tt.dd.coalesce(db.tt.cc, db.tt.bb, db.tt.aa)) |
---|
684 | self.assertEqual(result.response[0][0], "xx") |
---|
685 | self.assertEqual(result.response[1][0], "yy") |
---|
686 | self.assertEqual(result.response[2][0], "zz") |
---|
687 | if not IS_ORACLE: |
---|
688 | self.assertEqual(result.response[3][0], "") |
---|
689 | db.tt.drop() |
---|
690 | |
---|
691 | db.define_table("tt", Field("aa", "integer"), Field("bb")) |
---|
692 | db.tt.insert(bb="") |
---|
693 | db.tt.insert(aa=1) |
---|
694 | result = db(db.tt).select(db.tt.aa.coalesce_zero()) |
---|
695 | self.assertEqual(result.response[0][0], 0) |
---|
696 | self.assertEqual(result.response[1][0], 1) |
---|
697 | |
---|
698 | def testTableAliasCollisions(self): |
---|
699 | db = self.connect() |
---|
700 | db.define_table("t1", Field("aa")) |
---|
701 | db.define_table("t2", Field("bb")) |
---|
702 | t1, t2 = db.t1, db.t2 |
---|
703 | t1.with_alias("t2") |
---|
704 | t2.with_alias("t1") |
---|
705 | |
---|
706 | # Passing tables by name will result in exception |
---|
707 | t1.insert(aa="test") |
---|
708 | t2.insert(bb="foo") |
---|
709 | db(t1.id > 0).update(aa="bar") |
---|
710 | having = t1.aa != None |
---|
711 | join = [t2.on(t1.aa == t2.bb)] |
---|
712 | db(t1.aa == t2.bb).select(t1.aa, groupby=t1.aa, having=having, orderby=t1.aa) |
---|
713 | db(t1.aa).select(t1.aa, join=join, groupby=t1.aa, having=having, orderby=t1.aa) |
---|
714 | db(t1.aa).select(t1.aa, left=join, groupby=t1.aa, having=having, orderby=t1.aa) |
---|
715 | db(t1.id > 0).delete() |
---|
716 | |
---|
717 | |
---|
718 | class TestSubselect(DALtest): |
---|
719 | def testMethods(self): |
---|
720 | db = self.connect() |
---|
721 | db.define_table("tt", Field("aa", "integer"), Field("bb")) |
---|
722 | data = [dict(aa=1, bb="foo"), dict(aa=1, bb="bar"), dict(aa=2, bb="foo")] |
---|
723 | for item in data: |
---|
724 | db.tt.insert(**item) |
---|
725 | fields = [db.tt.aa, db.tt.bb, db.tt.aa + 2, (db.tt.aa + 1).with_alias("exp")] |
---|
726 | sub = db(db.tt).nested_select(*fields, orderby=db.tt.id) |
---|
727 | # Check the fields provided by the object |
---|
728 | self.assertEqual(sorted(["aa", "bb", "exp"]), sorted(list(sub.fields))) |
---|
729 | for name in sub.fields: |
---|
730 | self.assertIsInstance(sub[name], Field) |
---|
731 | for item in sub: |
---|
732 | self.assertIsInstance(item, Field) |
---|
733 | self.assertEqual(len(list(sub)), len(sub.fields)) |
---|
734 | for key, val in zip(sub.fields, sub): |
---|
735 | self.assertIs(sub[key], val) |
---|
736 | self.assertIs(getattr(sub, key), val) |
---|
737 | tmp = sub._filter_fields(dict(aa=1, exp=2, foo=3)) |
---|
738 | self.assertEqual(tmp, dict(aa=1, exp=2)) |
---|
739 | # Check result from executing the query |
---|
740 | result = sub() |
---|
741 | self.assertEqual(len(result), len(data)) |
---|
742 | for idx, row in enumerate(data): |
---|
743 | self.assertEqual(result[idx]["tt"].as_dict(), row) |
---|
744 | self.assertEqual(result[idx]["exp"], row["aa"] + 1) |
---|
745 | result = db.executesql(str(sub)) |
---|
746 | for idx, row in enumerate(data): |
---|
747 | tmp = [row["aa"], row["bb"], row["aa"] + 2, row["aa"] + 1] |
---|
748 | self.assertEqual(list(result[idx]), tmp) |
---|
749 | # Check that query expansion methods don't work without alias |
---|
750 | self.assertEqual(sub._rname, None) |
---|
751 | self.assertEqual(sub._raw_rname, None) |
---|
752 | self.assertEqual(sub._dalname, None) |
---|
753 | with self.assertRaises(SyntaxError): |
---|
754 | sub.query_name() |
---|
755 | with self.assertRaises(SyntaxError): |
---|
756 | sub.sql_shortref |
---|
757 | with self.assertRaises(SyntaxError): |
---|
758 | sub.on(sub.aa != None) |
---|
759 | # Alias checks |
---|
760 | sub = sub.with_alias("foo") |
---|
761 | result = sub() |
---|
762 | for idx, row in enumerate(data): |
---|
763 | self.assertEqual(result[idx]["tt"].as_dict(), row) |
---|
764 | self.assertEqual(result[idx]["exp"], row["aa"] + 1) |
---|
765 | # Check query expansion methods again |
---|
766 | self.assertEqual(sub._rname, None) |
---|
767 | self.assertEqual(sub._raw_rname, None) |
---|
768 | self.assertEqual(sub._dalname, None) |
---|
769 | self.assertEqual(sub.query_name()[0], str(sub)) |
---|
770 | self.assertEqual(sub.sql_shortref, db._adapter.dialect.quote("foo")) |
---|
771 | self.assertIsInstance(sub.on(sub.aa != None), Expression) |
---|
772 | |
---|
773 | def testSelectArguments(self): |
---|
774 | db = self.connect() |
---|
775 | db.define_table("tt", Field("aa", "integer"), Field("bb")) |
---|
776 | data = [ |
---|
777 | dict(aa=1, bb="foo"), |
---|
778 | dict(aa=1, bb="bar"), |
---|
779 | dict(aa=2, bb="foo"), |
---|
780 | dict(aa=3, bb="foo"), |
---|
781 | dict(aa=3, bb="baz"), |
---|
782 | ] |
---|
783 | expected = [(1, None, 0), (2, 2, 2), (2, 2, 2), (3, 4, 3), (3, 8, 6)] |
---|
784 | for item in data: |
---|
785 | db.tt.insert(**item) |
---|
786 | |
---|
787 | # Check that select clauses work as expected in stand-alone query |
---|
788 | t1 = db.tt.with_alias("t1") |
---|
789 | t2 = db.tt.with_alias("t2") |
---|
790 | fields = [ |
---|
791 | t1.aa, |
---|
792 | t2.aa.sum().with_alias("total"), |
---|
793 | t2.aa.count().with_alias("cnt"), |
---|
794 | ] |
---|
795 | join = t1.on(db.tt.bb != t1.bb) |
---|
796 | left = t2.on(t1.aa > t2.aa) |
---|
797 | group = db.tt.bb | t1.aa |
---|
798 | having = db.tt.aa.count() > 1 |
---|
799 | order = t1.aa | t2.aa.count() |
---|
800 | limit = (1, 6) |
---|
801 | sub = db(db.tt.aa != 2).nested_select( |
---|
802 | *fields, |
---|
803 | join=join, |
---|
804 | left=left, |
---|
805 | orderby=order, |
---|
806 | groupby=group, |
---|
807 | having=having, |
---|
808 | limitby=limit |
---|
809 | ) |
---|
810 | result = sub() |
---|
811 | self.assertEqual(len(result), len(expected)) |
---|
812 | for idx, val in enumerate(expected): |
---|
813 | self.assertEqual(result[idx]["t1"]["aa"], val[0]) |
---|
814 | self.assertEqual(result[idx]["total"], val[1]) |
---|
815 | self.assertEqual(result[idx]["cnt"], val[2]) |
---|
816 | |
---|
817 | # Check again when nested inside another query |
---|
818 | # Also check that the alias will not conflict with existing table |
---|
819 | t3 = db.tt.with_alias("t3") |
---|
820 | sub = sub.with_alias("tt") |
---|
821 | query = (t3.bb == "foo") & (t3.aa == sub.aa) |
---|
822 | order = t3.aa | sub.cnt |
---|
823 | result = db(query).select(t3.aa, sub.total, sub.cnt, orderby=order) |
---|
824 | for idx, val in enumerate(expected): |
---|
825 | self.assertEqual(result[idx]["t3"]["aa"], val[0]) |
---|
826 | self.assertEqual(result[idx]["tt"]["total"], val[1]) |
---|
827 | self.assertEqual(result[idx]["tt"]["cnt"], val[2]) |
---|
828 | |
---|
829 | # Check "distinct" modifier separately |
---|
830 | sub = db(db.tt.aa != 2).nested_select(db.tt.aa, orderby=db.tt.aa, distinct=True) |
---|
831 | result = sub().as_list() |
---|
832 | self.assertEqual(result, [dict(aa=1), dict(aa=3)]) |
---|
833 | |
---|
834 | def testCorrelated(self): |
---|
835 | db = self.connect() |
---|
836 | db.define_table( |
---|
837 | "t1", Field("aa", "integer"), Field("bb"), Field("mark", "integer") |
---|
838 | ) |
---|
839 | db.define_table("t2", Field("aa", "integer"), Field("cc")) |
---|
840 | db.define_table("t3", Field("aa", "integer")) |
---|
841 | data_t1 = [ |
---|
842 | dict(aa=1, bb="bar"), |
---|
843 | dict(aa=1, bb="foo"), |
---|
844 | dict(aa=2, bb="foo"), |
---|
845 | dict(aa=2, bb="test"), |
---|
846 | dict(aa=3, bb="baz"), |
---|
847 | dict(aa=3, bb="foo"), |
---|
848 | ] |
---|
849 | data_t2 = [dict(aa=1, cc="foo"), dict(aa=2, cc="bar"), dict(aa=3, cc="baz")] |
---|
850 | expected_cor = [(1, "foo"), (3, "baz")] |
---|
851 | expected_leftcor = [(1, "foo"), (2, None), (3, "baz")] |
---|
852 | expected_uncor = [(1, "bar"), (1, "foo"), (2, "foo"), (3, "baz"), (3, "foo")] |
---|
853 | for item in data_t1: |
---|
854 | db.t1.insert(**item) |
---|
855 | for item in data_t2: |
---|
856 | db.t2.insert(**item) |
---|
857 | db.t3.insert(aa=item["aa"]) |
---|
858 | |
---|
859 | # Correlated subqueries |
---|
860 | subquery = db.t1.aa == db.t2.aa |
---|
861 | subfields = [db.t2.cc] |
---|
862 | sub = db(subquery).nested_select(*subfields).with_alias("sub") |
---|
863 | query = db.t1.bb.belongs(sub) |
---|
864 | order = db.t1.aa | db.t1.bb |
---|
865 | result = db(query).select(db.t1.aa, db.t1.bb, orderby=order) |
---|
866 | self.assertEqual(len(result), len(expected_cor)) |
---|
867 | for idx, val in enumerate(expected_cor): |
---|
868 | self.assertEqual(result[idx]["aa"], val[0]) |
---|
869 | self.assertEqual(result[idx]["bb"], val[1]) |
---|
870 | |
---|
871 | join = [db.t1.on((db.t3.aa == db.t1.aa) & db.t1.bb.belongs(sub))] |
---|
872 | order = db.t3.aa | db.t1.bb |
---|
873 | result = db(db.t3).select(db.t3.aa, db.t1.bb, join=join, orderby=order) |
---|
874 | self.assertEqual(len(result), len(expected_cor)) |
---|
875 | for idx, val in enumerate(expected_cor): |
---|
876 | self.assertEqual(result[idx]["t3"]["aa"], val[0]) |
---|
877 | self.assertEqual(result[idx]["t1"]["bb"], val[1]) |
---|
878 | |
---|
879 | left = [db.t1.on((db.t3.aa == db.t1.aa) & db.t1.bb.belongs(sub))] |
---|
880 | result = db(db.t3).select(db.t3.aa, db.t1.bb, left=left, orderby=order) |
---|
881 | self.assertEqual(len(result), len(expected_leftcor)) |
---|
882 | for idx, val in enumerate(expected_leftcor): |
---|
883 | self.assertEqual(result[idx]["t3"]["aa"], val[0]) |
---|
884 | self.assertEqual(result[idx]["t1"]["bb"], val[1]) |
---|
885 | |
---|
886 | order = db.t1.aa | db.t1.bb |
---|
887 | db(db.t1.bb.belongs(sub)).update(mark=1) |
---|
888 | result = db(db.t1.mark == 1).select(db.t1.aa, db.t1.bb, orderby=order) |
---|
889 | self.assertEqual(len(result), len(expected_cor)) |
---|
890 | for idx, val in enumerate(expected_cor): |
---|
891 | self.assertEqual(result[idx]["aa"], val[0]) |
---|
892 | self.assertEqual(result[idx]["bb"], val[1]) |
---|
893 | |
---|
894 | db(~db.t1.bb.belongs(sub)).delete() |
---|
895 | result = db(db.t1.id > 0).select(db.t1.aa, db.t1.bb, orderby=order) |
---|
896 | self.assertEqual(len(result), len(expected_cor)) |
---|
897 | for idx, val in enumerate(expected_cor): |
---|
898 | self.assertEqual(result[idx]["aa"], val[0]) |
---|
899 | self.assertEqual(result[idx]["bb"], val[1]) |
---|
900 | |
---|
901 | db(db.t1.id > 0).delete() |
---|
902 | for item in data_t1: |
---|
903 | db.t1.insert(**item) |
---|
904 | |
---|
905 | # Uncorrelated subqueries |
---|
906 | kwargs = dict(correlated=False) |
---|
907 | sub = db(subquery).nested_select(*subfields, **kwargs) |
---|
908 | query = db.t1.bb.belongs(sub) |
---|
909 | order = db.t1.aa | db.t1.bb |
---|
910 | result = db(query).select(db.t1.aa, db.t1.bb, orderby=order) |
---|
911 | self.assertEqual(len(result), len(expected_uncor)) |
---|
912 | for idx, val in enumerate(expected_uncor): |
---|
913 | self.assertEqual(result[idx]["aa"], val[0]) |
---|
914 | self.assertEqual(result[idx]["bb"], val[1]) |
---|
915 | |
---|
916 | join = [db.t1.on((db.t3.aa == db.t1.aa) & db.t1.bb.belongs(sub))] |
---|
917 | order = db.t3.aa | db.t1.bb |
---|
918 | result = db(db.t3).select(db.t3.aa, db.t1.bb, join=join, orderby=order) |
---|
919 | self.assertEqual(len(result), len(expected_uncor)) |
---|
920 | for idx, val in enumerate(expected_uncor): |
---|
921 | self.assertEqual(result[idx]["t3"]["aa"], val[0]) |
---|
922 | self.assertEqual(result[idx]["t1"]["bb"], val[1]) |
---|
923 | |
---|
924 | left = [db.t1.on((db.t3.aa == db.t1.aa) & db.t1.bb.belongs(sub))] |
---|
925 | result = db(db.t3).select(db.t3.aa, db.t1.bb, left=left, orderby=order) |
---|
926 | self.assertEqual(len(result), len(expected_uncor)) |
---|
927 | for idx, val in enumerate(expected_uncor): |
---|
928 | self.assertEqual(result[idx]["t3"]["aa"], val[0]) |
---|
929 | self.assertEqual(result[idx]["t1"]["bb"], val[1]) |
---|
930 | # MySQL does not support subqueries with uncorrelated references |
---|
931 | # to target table |
---|
932 | |
---|
933 | # Correlation prevented by alias in parent select |
---|
934 | tmp = db.t1.with_alias("tmp") |
---|
935 | sub = db(subquery).nested_select(*subfields) |
---|
936 | query = tmp.bb.belongs(sub) |
---|
937 | order = tmp.aa | tmp.bb |
---|
938 | result = db(query).select(tmp.aa, tmp.bb, orderby=order) |
---|
939 | self.assertEqual(len(result), len(expected_uncor)) |
---|
940 | for idx, val in enumerate(expected_uncor): |
---|
941 | self.assertEqual(result[idx]["aa"], val[0]) |
---|
942 | self.assertEqual(result[idx]["bb"], val[1]) |
---|
943 | |
---|
944 | join = [tmp.on((db.t3.aa == tmp.aa) & tmp.bb.belongs(sub))] |
---|
945 | order = db.t3.aa | tmp.bb |
---|
946 | result = db(db.t3).select(db.t3.aa, tmp.bb, join=join, orderby=order) |
---|
947 | self.assertEqual(len(result), len(expected_uncor)) |
---|
948 | for idx, val in enumerate(expected_uncor): |
---|
949 | self.assertEqual(result[idx]["t3"]["aa"], val[0]) |
---|
950 | self.assertEqual(result[idx]["tmp"]["bb"], val[1]) |
---|
951 | |
---|
952 | left = [tmp.on((db.t3.aa == tmp.aa) & tmp.bb.belongs(sub))] |
---|
953 | result = db(db.t3).select(db.t3.aa, tmp.bb, left=left, orderby=order) |
---|
954 | self.assertEqual(len(result), len(expected_uncor)) |
---|
955 | for idx, val in enumerate(expected_uncor): |
---|
956 | self.assertEqual(result[idx]["t3"]["aa"], val[0]) |
---|
957 | self.assertEqual(result[idx]["tmp"]["bb"], val[1]) |
---|
958 | # SQLite does not support aliasing target table in UPDATE/DELETE |
---|
959 | # MySQL does not support subqueries with uncorrelated references |
---|
960 | # to target table |
---|
961 | |
---|
962 | |
---|
963 | class TestAddMethod(DALtest): |
---|
964 | def testRun(self): |
---|
965 | db = self.connect() |
---|
966 | db.define_table("tt", Field("aa")) |
---|
967 | |
---|
968 | @db.tt.add_method.all |
---|
969 | def select_all(table, orderby=None): |
---|
970 | return table._db(table).select(orderby=orderby) |
---|
971 | |
---|
972 | self.assertEqual(db.tt.insert(aa="1"), 1) |
---|
973 | if not IS_TERADATA: |
---|
974 | self.assertEqual(db.tt.insert(aa="1"), 2) |
---|
975 | self.assertEqual(db.tt.insert(aa="1"), 3) |
---|
976 | else: |
---|
977 | self.assertEqual(db.tt.insert(aa="1"), 1) |
---|
978 | self.assertEqual(db.tt.insert(aa="1"), 1) |
---|
979 | self.assertEqual(len(db.tt.all()), 3) |
---|
980 | |
---|
981 | |
---|
982 | class TestBelongs(DALtest): |
---|
983 | def testRun(self): |
---|
984 | db = self.connect() |
---|
985 | db.define_table("tt", Field("aa")) |
---|
986 | self.assertEqual(db.tt.insert(aa="1"), 1) |
---|
987 | if not IS_TERADATA: |
---|
988 | self.assertEqual(db.tt.insert(aa="2"), 2) |
---|
989 | self.assertEqual(db.tt.insert(aa="3"), 3) |
---|
990 | else: |
---|
991 | self.assertEqual(db.tt.insert(aa="2"), 1) |
---|
992 | self.assertEqual(db.tt.insert(aa="3"), 1) |
---|
993 | self.assertEqual(db(db.tt.aa.belongs(("1", "3"))).count(), 2) |
---|
994 | self.assertEqual( |
---|
995 | db(db.tt.aa.belongs(db(db.tt.id > 2)._select(db.tt.aa))).count(), 1 |
---|
996 | ) |
---|
997 | self.assertEqual( |
---|
998 | db( |
---|
999 | db.tt.aa.belongs(db(db.tt.aa.belongs(("1", "3")))._select(db.tt.aa)) |
---|
1000 | ).count(), |
---|
1001 | 2, |
---|
1002 | ) |
---|
1003 | self.assertEqual( |
---|
1004 | db( |
---|
1005 | db.tt.aa.belongs( |
---|
1006 | db( |
---|
1007 | db.tt.aa.belongs( |
---|
1008 | db(db.tt.aa.belongs(("1", "3")))._select(db.tt.aa) |
---|
1009 | ) |
---|
1010 | )._select(db.tt.aa) |
---|
1011 | ) |
---|
1012 | ).count(), |
---|
1013 | 2, |
---|
1014 | ) |
---|
1015 | |
---|
1016 | |
---|
1017 | class TestContains(DALtest): |
---|
1018 | def testRun(self): |
---|
1019 | db = self.connect() |
---|
1020 | db.define_table("tt", Field("aa", "list:string"), Field("bb", "string")) |
---|
1021 | self.assertEqual(db.tt.insert(aa=["aaa", "bbb"], bb="aaa"), 1) |
---|
1022 | if not IS_TERADATA: |
---|
1023 | self.assertEqual(db.tt.insert(aa=["bbb", "ddd"], bb="abb"), 2) |
---|
1024 | self.assertEqual(db.tt.insert(aa=["eee", "aaa"], bb="acc"), 3) |
---|
1025 | else: |
---|
1026 | self.assertEqual(db.tt.insert(aa=["bbb", "ddd"], bb="abb"), 1) |
---|
1027 | self.assertEqual(db.tt.insert(aa=["eee", "aaa"], bb="acc"), 1) |
---|
1028 | self.assertEqual(db(db.tt.aa.contains("aaa")).count(), 2) |
---|
1029 | self.assertEqual(db(db.tt.aa.contains("bbb")).count(), 2) |
---|
1030 | self.assertEqual(db(db.tt.aa.contains("aa")).count(), 0) |
---|
1031 | self.assertEqual(db(db.tt.bb.contains("a")).count(), 3) |
---|
1032 | self.assertEqual(db(db.tt.bb.contains("b")).count(), 1) |
---|
1033 | self.assertEqual(db(db.tt.bb.contains("d")).count(), 0) |
---|
1034 | self.assertEqual(db(db.tt.aa.contains(db.tt.bb)).count(), 1) |
---|
1035 | # case-sensitivity tests, if 1 it isn't |
---|
1036 | is_case_insensitive = db(db.tt.bb.like("%AA%")).count() |
---|
1037 | if is_case_insensitive: |
---|
1038 | self.assertEqual(db(db.tt.aa.contains("AAA")).count(), 2) |
---|
1039 | self.assertEqual(db(db.tt.bb.contains("A")).count(), 3) |
---|
1040 | else: |
---|
1041 | self.assertEqual( |
---|
1042 | db(db.tt.aa.contains("AAA", case_sensitive=True)).count(), 0 |
---|
1043 | ) |
---|
1044 | self.assertEqual(db(db.tt.bb.contains("A", case_sensitive=True)).count(), 0) |
---|
1045 | self.assertEqual( |
---|
1046 | db(db.tt.aa.contains("AAA", case_sensitive=False)).count(), 2 |
---|
1047 | ) |
---|
1048 | self.assertEqual( |
---|
1049 | db(db.tt.bb.contains("A", case_sensitive=False)).count(), 3 |
---|
1050 | ) |
---|
1051 | db.tt.drop() |
---|
1052 | |
---|
1053 | # integers in string fields |
---|
1054 | db.define_table( |
---|
1055 | "tt", |
---|
1056 | Field("aa", "list:string"), |
---|
1057 | Field("bb", "string"), |
---|
1058 | Field("cc", "integer"), |
---|
1059 | ) |
---|
1060 | self.assertEqual(db.tt.insert(aa=["123", "456"], bb="123", cc=12), 1) |
---|
1061 | if not IS_TERADATA: |
---|
1062 | self.assertEqual(db.tt.insert(aa=["124", "456"], bb="123", cc=123), 2) |
---|
1063 | self.assertEqual(db.tt.insert(aa=["125", "457"], bb="23", cc=125), 3) |
---|
1064 | else: |
---|
1065 | self.assertEqual(db.tt.insert(aa=["124", "456"], bb="123", cc=123), 1) |
---|
1066 | self.assertEqual(db.tt.insert(aa=["125", "457"], bb="23", cc=125), 1) |
---|
1067 | self.assertEqual(db(db.tt.aa.contains(123)).count(), 1) |
---|
1068 | self.assertEqual(db(db.tt.aa.contains(23)).count(), 0) |
---|
1069 | self.assertEqual(db(db.tt.aa.contains(db.tt.cc)).count(), 1) |
---|
1070 | self.assertEqual(db(db.tt.bb.contains(123)).count(), 2) |
---|
1071 | self.assertEqual(db(db.tt.bb.contains(23)).count(), 3) |
---|
1072 | self.assertEqual(db(db.tt.bb.contains(db.tt.cc)).count(), 2) |
---|
1073 | db.tt.drop() |
---|
1074 | |
---|
1075 | # string field contains string field |
---|
1076 | db.define_table("tt", Field("aa"), Field("bb")) |
---|
1077 | db.tt.insert(aa="aaa", bb="%aaa") |
---|
1078 | db.tt.insert(aa="aaa", bb="aaa") |
---|
1079 | self.assertEqual(db(db.tt.aa.contains(db.tt.bb)).count(), 1) |
---|
1080 | db.tt.drop() |
---|
1081 | |
---|
1082 | # escaping |
---|
1083 | db.define_table("tt", Field("aa")) |
---|
1084 | db.tt.insert(aa="perc%ent") |
---|
1085 | db.tt.insert(aa="percent") |
---|
1086 | db.tt.insert(aa="percxyzent") |
---|
1087 | db.tt.insert(aa="under_score") |
---|
1088 | db.tt.insert(aa="underxscore") |
---|
1089 | db.tt.insert(aa="underyscore") |
---|
1090 | self.assertEqual(db(db.tt.aa.contains("perc%ent")).count(), 1) |
---|
1091 | self.assertEqual(db(db.tt.aa.contains("under_score")).count(), 1) |
---|
1092 | |
---|
1093 | |
---|
1094 | class TestLike(DALtest): |
---|
1095 | def setUp(self): |
---|
1096 | db = self.connect() |
---|
1097 | db.define_table("tt", Field("aa")) |
---|
1098 | self.assertEqual(isinstance(db.tt.insert(aa="abc"), long), True) |
---|
1099 | self.db = db |
---|
1100 | |
---|
1101 | def testRun(self): |
---|
1102 | db = self.db |
---|
1103 | self.assertEqual(db(db.tt.aa.like("a%")).count(), 1) |
---|
1104 | self.assertEqual(db(db.tt.aa.like("%b%")).count(), 1) |
---|
1105 | self.assertEqual(db(db.tt.aa.like("%c")).count(), 1) |
---|
1106 | self.assertEqual(db(db.tt.aa.like("%d%")).count(), 0) |
---|
1107 | self.assertEqual(db(db.tt.aa.like("ab_")).count(), 1) |
---|
1108 | self.assertEqual(db(db.tt.aa.like("a_c")).count(), 1) |
---|
1109 | self.assertEqual(db(db.tt.aa.like("_bc")).count(), 1) |
---|
1110 | |
---|
1111 | self.assertEqual(db(db.tt.aa.like("A%", case_sensitive=False)).count(), 1) |
---|
1112 | self.assertEqual(db(db.tt.aa.like("%B%", case_sensitive=False)).count(), 1) |
---|
1113 | self.assertEqual(db(db.tt.aa.like("%C", case_sensitive=False)).count(), 1) |
---|
1114 | self.assertEqual(db(db.tt.aa.ilike("A%")).count(), 1) |
---|
1115 | self.assertEqual(db(db.tt.aa.ilike("%B%")).count(), 1) |
---|
1116 | self.assertEqual(db(db.tt.aa.ilike("%C")).count(), 1) |
---|
1117 | |
---|
1118 | # DAL maps like() (and contains(), startswith(), endswith()) |
---|
1119 | # to the LIKE operator, that in ANSI-SQL is case-sensitive |
---|
1120 | # There are backends supporting case-sensitivity by default |
---|
1121 | # and backends that needs additional care to turn |
---|
1122 | # case-sensitivity on. To discern among those, let's run |
---|
1123 | # this query comparing previously inserted 'abc' with 'ABC': |
---|
1124 | # if the result is 0, then the backend recognizes |
---|
1125 | # case-sensitivity, if 1 it isn't |
---|
1126 | is_case_insensitive = db(db.tt.aa.like("%ABC%")).count() |
---|
1127 | self.assertEqual(db(db.tt.aa.like("A%")).count(), is_case_insensitive) |
---|
1128 | self.assertEqual(db(db.tt.aa.like("%B%")).count(), is_case_insensitive) |
---|
1129 | self.assertEqual(db(db.tt.aa.like("%C")).count(), is_case_insensitive) |
---|
1130 | |
---|
1131 | def testUpperLower(self): |
---|
1132 | db = self.db |
---|
1133 | self.assertEqual(db(db.tt.aa.upper().like("A%")).count(), 1) |
---|
1134 | self.assertEqual(db(db.tt.aa.upper().like("%B%")).count(), 1) |
---|
1135 | self.assertEqual(db(db.tt.aa.upper().like("%C")).count(), 1) |
---|
1136 | self.assertEqual(db(db.tt.aa.lower().like("%c")).count(), 1) |
---|
1137 | |
---|
1138 | def testStartsEndsWith(self): |
---|
1139 | db = self.db |
---|
1140 | self.assertEqual(db(db.tt.aa.startswith("a")).count(), 1) |
---|
1141 | self.assertEqual(db(db.tt.aa.endswith("c")).count(), 1) |
---|
1142 | self.assertEqual(db(db.tt.aa.startswith("c")).count(), 0) |
---|
1143 | self.assertEqual(db(db.tt.aa.endswith("a")).count(), 0) |
---|
1144 | |
---|
1145 | def testEscaping(self): |
---|
1146 | db = self.db |
---|
1147 | term = "ahbc".replace("h", "\\") # funny but to avoid any doubts... |
---|
1148 | db.tt.insert(aa="a%bc") |
---|
1149 | db.tt.insert(aa="a_bc") |
---|
1150 | db.tt.insert(aa=term) |
---|
1151 | self.assertEqual(db(db.tt.aa.like("%ax%bc%", escape="x")).count(), 1) |
---|
1152 | self.assertEqual(db(db.tt.aa.like("%ax_bc%", escape="x")).count(), 1) |
---|
1153 | self.assertEqual(db(db.tt.aa.like("%" + term + "%")).count(), 1) |
---|
1154 | db(db.tt.id > 0).delete() |
---|
1155 | # test "literal" like, i.e. exactly as LIKE in the backend |
---|
1156 | db.tt.insert(aa="perc%ent") |
---|
1157 | db.tt.insert(aa="percent") |
---|
1158 | db.tt.insert(aa="percxyzent") |
---|
1159 | db.tt.insert(aa="under_score") |
---|
1160 | db.tt.insert(aa="underxscore") |
---|
1161 | db.tt.insert(aa="underyscore") |
---|
1162 | self.assertEqual(db(db.tt.aa.like("%perc%ent%")).count(), 3) |
---|
1163 | self.assertEqual(db(db.tt.aa.like("%under_score%")).count(), 3) |
---|
1164 | db(db.tt.id > 0).delete() |
---|
1165 | # escaping with startswith and endswith |
---|
1166 | db.tt.insert(aa="%percent") |
---|
1167 | db.tt.insert(aa="xpercent") |
---|
1168 | db.tt.insert(aa="discount%") |
---|
1169 | db.tt.insert(aa="discountx") |
---|
1170 | self.assertEqual(db(db.tt.aa.endswith("discount%")).count(), 1) |
---|
1171 | self.assertEqual(db(db.tt.aa.like("discount%%")).count(), 2) |
---|
1172 | self.assertEqual(db(db.tt.aa.startswith("%percent")).count(), 1) |
---|
1173 | self.assertEqual(db(db.tt.aa.like("%%percent")).count(), 2) |
---|
1174 | |
---|
1175 | @unittest.skipIf(IS_MSSQL, "No Regexp on MSSQL") |
---|
1176 | def testRegexp(self): |
---|
1177 | db = self.db |
---|
1178 | db(db.tt.id > 0).delete() |
---|
1179 | db.tt.insert(aa="%percent") |
---|
1180 | db.tt.insert(aa="xpercent") |
---|
1181 | db.tt.insert(aa="discount%") |
---|
1182 | db.tt.insert(aa="discountx") |
---|
1183 | try: |
---|
1184 | self.assertEqual(db(db.tt.aa.regexp("count")).count(), 2) |
---|
1185 | except NotImplementedError: |
---|
1186 | pass |
---|
1187 | else: |
---|
1188 | self.assertEqual(db(db.tt.aa.lower().regexp("count")).count(), 2) |
---|
1189 | self.assertEqual( |
---|
1190 | db( |
---|
1191 | db.tt.aa.upper().regexp("COUNT") & db.tt.aa.lower().regexp("count") |
---|
1192 | ).count(), |
---|
1193 | 2, |
---|
1194 | ) |
---|
1195 | self.assertEqual( |
---|
1196 | db( |
---|
1197 | db.tt.aa.upper().regexp("COUNT") | (db.tt.aa.lower() == "xpercent") |
---|
1198 | ).count(), |
---|
1199 | 3, |
---|
1200 | ) |
---|
1201 | |
---|
1202 | def testLikeInteger(self): |
---|
1203 | db = self.db |
---|
1204 | db.tt.drop() |
---|
1205 | db.define_table("tt", Field("aa", "integer")) |
---|
1206 | self.assertEqual(isinstance(db.tt.insert(aa=1111111111), long), True) |
---|
1207 | self.assertEqual(isinstance(db.tt.insert(aa=1234567), long), True) |
---|
1208 | self.assertEqual(db(db.tt.aa.like("1%")).count(), 2) |
---|
1209 | self.assertEqual(db(db.tt.aa.like("1_3%")).count(), 1) |
---|
1210 | self.assertEqual(db(db.tt.aa.like("2%")).count(), 0) |
---|
1211 | self.assertEqual(db(db.tt.aa.like("_2%")).count(), 1) |
---|
1212 | self.assertEqual(db(db.tt.aa.like("12%")).count(), 1) |
---|
1213 | self.assertEqual(db(db.tt.aa.like("012%")).count(), 0) |
---|
1214 | self.assertEqual(db(db.tt.aa.like("%45%")).count(), 1) |
---|
1215 | self.assertEqual(db(db.tt.aa.like("%54%")).count(), 0) |
---|
1216 | |
---|
1217 | |
---|
1218 | class TestDatetime(DALtest): |
---|
1219 | def testRun(self): |
---|
1220 | db = self.connect() |
---|
1221 | db.define_table("tt", Field("aa", "datetime")) |
---|
1222 | self.assertEqual(db.tt.insert(aa=datetime.datetime(1971, 12, 21, 11, 30)), 1) |
---|
1223 | self.assertEqual(db.tt.insert(aa=datetime.datetime(1971, 11, 21, 10, 30)), 2) |
---|
1224 | self.assertEqual(db.tt.insert(aa=datetime.datetime(1970, 12, 21, 9, 31)), 3) |
---|
1225 | self.assertEqual( |
---|
1226 | db(db.tt.aa == datetime.datetime(1971, 12, 21, 11, 30)).count(), 1 |
---|
1227 | ) |
---|
1228 | self.assertEqual(db(db.tt.aa.year() == 1971).count(), 2) |
---|
1229 | self.assertEqual(db(db.tt.aa.month() > 11).count(), 2) |
---|
1230 | self.assertEqual(db(db.tt.aa.day() >= 21).count(), 3) |
---|
1231 | self.assertEqual(db(db.tt.aa.hour() < 10).count(), 1) |
---|
1232 | self.assertEqual(db(db.tt.aa.minutes() <= 30).count(), 2) |
---|
1233 | self.assertEqual(db(db.tt.aa.seconds() != 31).count(), 3) |
---|
1234 | self.assertEqual(db(db.tt.aa.epoch() < 365 * 24 * 3600).delete(), 1) |
---|
1235 | db.tt.drop() |
---|
1236 | |
---|
1237 | # pure TIME types without dates are not possible in Oracle |
---|
1238 | if not IS_ORACLE: |
---|
1239 | db.define_table("tt", Field("aa", "time")) |
---|
1240 | t0 = datetime.time(10, 30, 55) |
---|
1241 | db.tt.insert(aa=t0) |
---|
1242 | self.assertEqual(db().select(db.tt.aa)[0].aa, t0) |
---|
1243 | db.tt.drop() |
---|
1244 | |
---|
1245 | db.define_table("tt", Field("aa", "date")) |
---|
1246 | t0 = datetime.date.today() |
---|
1247 | db.tt.insert(aa=t0) |
---|
1248 | self.assertEqual(db().select(db.tt.aa)[0].aa, t0) |
---|
1249 | |
---|
1250 | |
---|
1251 | class TestExpressions(DALtest): |
---|
1252 | @unittest.skipIf(IS_POSTGRESQL, "PG8000 does not like these") |
---|
1253 | def testRun(self): |
---|
1254 | db = self.connect() |
---|
1255 | db.define_table( |
---|
1256 | "tt", Field("aa", "integer"), Field("bb", "integer"), Field("cc") |
---|
1257 | ) |
---|
1258 | self.assertEqual(db.tt.insert(aa=1, bb=0), 1) |
---|
1259 | self.assertEqual(db.tt.insert(aa=2, bb=0), 2) |
---|
1260 | self.assertEqual(db.tt.insert(aa=3, bb=0), 3) |
---|
1261 | |
---|
1262 | # test update |
---|
1263 | self.assertEqual(db(db.tt.aa == 3).update(aa=db.tt.aa + 1, bb=db.tt.bb + 2), 1) |
---|
1264 | self.assertEqual(db(db.tt.aa == 4).count(), 1) |
---|
1265 | self.assertEqual(db(db.tt.bb == 2).count(), 1) |
---|
1266 | self.assertEqual(db(db.tt.aa == -2).count(), 0) |
---|
1267 | self.assertEqual(db(db.tt.aa == 4).update(aa=db.tt.aa * 2, bb=5), 1) |
---|
1268 | self.assertEqual(db(db.tt.bb == 5).count(), 1) |
---|
1269 | self.assertEqual(db(db.tt.aa + 1 == 9).count(), 1) |
---|
1270 | self.assertEqual(db(db.tt.aa + 1 == 9).update(aa=db.tt.aa - 2, cc="cc"), 1) |
---|
1271 | self.assertEqual(db(db.tt.cc == "cc").count(), 1) |
---|
1272 | self.assertEqual(db(db.tt.aa == 6).count(), 1) |
---|
1273 | self.assertEqual(db(db.tt.aa == 6).update(bb=db.tt.aa * (db.tt.bb - 3)), 1) |
---|
1274 | self.assertEqual(db(db.tt.bb == 12).count(), 1) |
---|
1275 | self.assertEqual(db(db.tt.aa == 6).count(), 1) |
---|
1276 | self.assertEqual( |
---|
1277 | db(db.tt.aa == 6).update(aa=db.tt.aa % 4 + 1, cc=db.tt.cc + "1" + "1"), 1 |
---|
1278 | ) |
---|
1279 | self.assertEqual(db(db.tt.cc == "cc11").count(), 1) |
---|
1280 | self.assertEqual(db(db.tt.aa == 3).count(), 1) |
---|
1281 | |
---|
1282 | # test comparsion expression based count |
---|
1283 | self.assertEqual(db(db.tt.aa != db.tt.aa).count(), 0) |
---|
1284 | self.assertEqual(db(db.tt.aa == db.tt.aa).count(), 3) |
---|
1285 | |
---|
1286 | # test select aggregations |
---|
1287 | sum = (db.tt.aa + 1).sum() |
---|
1288 | self.assertEqual(db(db.tt.aa + 1 >= 3).select(sum).first()[sum], 7) |
---|
1289 | self.assertEqual(db((1 == 0) & (db.tt.aa >= db.tt.aa)).count(), 0) |
---|
1290 | self.assertEqual(db(db.tt.aa * 2 == -2).select(sum).first()[sum], None) |
---|
1291 | |
---|
1292 | count = db.tt.aa.count() |
---|
1293 | avg = db.tt.aa.avg() |
---|
1294 | min = db.tt.aa.min() |
---|
1295 | max = db.tt.aa.max() |
---|
1296 | result = db(db.tt).select(sum, count, avg, min, max).first() |
---|
1297 | self.assertEqual(result[sum], 9) |
---|
1298 | self.assertEqual(result[count], 3) |
---|
1299 | self.assertEqual(result[avg], 2) |
---|
1300 | self.assertEqual(result[min], 1) |
---|
1301 | self.assertEqual(result[max], 3) |
---|
1302 | |
---|
1303 | # Test basic expressions evaluated at python level |
---|
1304 | self.assertEqual(db((1 == 1) & (db.tt.aa >= 2)).count(), 2) |
---|
1305 | self.assertEqual(db((1 == 1) | (db.tt.aa >= 2)).count(), 3) |
---|
1306 | self.assertEqual(db((1 == 0) & (db.tt.aa >= 2)).count(), 0) |
---|
1307 | self.assertEqual(db((1 == 0) | (db.tt.aa >= 2)).count(), 2) |
---|
1308 | |
---|
1309 | # test abs() |
---|
1310 | self.assertEqual(db(db.tt.aa == 2).update(aa=db.tt.aa * -10), 1) |
---|
1311 | abs = db.tt.aa.abs().with_alias("abs") |
---|
1312 | result = db(db.tt.aa == -20).select(abs).first() |
---|
1313 | self.assertEqual(result[abs], 20) |
---|
1314 | self.assertEqual(result["abs"], 20) |
---|
1315 | abs = db.tt.aa.abs() / 10 + 5 |
---|
1316 | exp = abs.min() * 2 + 1 |
---|
1317 | result = db(db.tt.aa == -20).select(exp).first() |
---|
1318 | self.assertEqual(result[exp], 15) |
---|
1319 | |
---|
1320 | # test case() |
---|
1321 | condition = db.tt.aa > 2 |
---|
1322 | case = condition.case(db.tt.aa + 2, db.tt.aa - 2) |
---|
1323 | my_case = case.with_alias("my_case") |
---|
1324 | result = db().select(my_case) |
---|
1325 | self.assertEqual(len(result), 3) |
---|
1326 | self.assertEqual(result[0][my_case], -1) |
---|
1327 | self.assertEqual(result[0]["my_case"], -1) |
---|
1328 | self.assertEqual(result[1]["my_case"], -22) |
---|
1329 | self.assertEqual(result[2]["my_case"], 5) |
---|
1330 | |
---|
1331 | # test expression based delete |
---|
1332 | self.assertEqual(db(db.tt.aa + 1 >= 4).count(), 1) |
---|
1333 | self.assertEqual(db(db.tt.aa + 1 >= 4).delete(), 1) |
---|
1334 | self.assertEqual(db(db.tt.aa).count(), 2) |
---|
1335 | |
---|
1336 | def testUpdate(self): |
---|
1337 | db = self.connect() |
---|
1338 | |
---|
1339 | # some db's only support seconds |
---|
1340 | datetime_datetime_today = datetime.datetime.today() |
---|
1341 | datetime_datetime_today = datetime_datetime_today.replace(microsecond=0) |
---|
1342 | one_day = datetime.timedelta(1) |
---|
1343 | one_sec = datetime.timedelta(0, 1) |
---|
1344 | |
---|
1345 | update_vals = ( |
---|
1346 | ("string", "x", "y"), |
---|
1347 | ("text", "x", "y"), |
---|
1348 | ("password", "x", "y"), |
---|
1349 | ("integer", 1, 2), |
---|
1350 | ("bigint", 1, 2), |
---|
1351 | ("float", 1.0, 2.0), |
---|
1352 | ("double", 1.0, 2.0), |
---|
1353 | ("boolean", True, False), |
---|
1354 | ("date", datetime.date.today(), datetime.date.today() + one_day), |
---|
1355 | ( |
---|
1356 | "datetime", |
---|
1357 | datetime.datetime(1971, 12, 21, 10, 30, 55, 0), |
---|
1358 | datetime_datetime_today, |
---|
1359 | ), |
---|
1360 | ( |
---|
1361 | "time", |
---|
1362 | datetime_datetime_today.time(), |
---|
1363 | (datetime_datetime_today + one_sec).time(), |
---|
1364 | ), |
---|
1365 | ) |
---|
1366 | |
---|
1367 | for uv in update_vals: |
---|
1368 | if IS_ORACLE and (uv[0] == "time" or uv[0] == "text"): |
---|
1369 | # oracle can have problems with this test on CLOBs |
---|
1370 | # and date-less "time" type is not supported |
---|
1371 | continue |
---|
1372 | db.define_table("tt", Field("aa", "integer", default=0), Field("bb", uv[0])) |
---|
1373 | self.assertTrue(isinstance(db.tt.insert(bb=uv[1]), long)) |
---|
1374 | self.assertEqual(db(db.tt.aa + 1 == 1).select(db.tt.bb)[0].bb, uv[1]) |
---|
1375 | self.assertEqual(db(db.tt.aa + 1 == 1).update(bb=uv[2]), 1) |
---|
1376 | self.assertEqual(db(db.tt.aa / 3 == 0).select(db.tt.bb)[0].bb, uv[2]) |
---|
1377 | db.tt.drop() |
---|
1378 | |
---|
1379 | def testSubstring(self): |
---|
1380 | db = self.connect() |
---|
1381 | t0 = db.define_table("t0", Field("name")) |
---|
1382 | input_name = "web2py" |
---|
1383 | t0.insert(name=input_name) |
---|
1384 | exp_slice = t0.name.lower()[4:6] |
---|
1385 | exp_slice_no_max = t0.name.lower()[4:] |
---|
1386 | exp_slice_neg_max = t0.name.lower()[2:-2] |
---|
1387 | exp_slice_neg_start = t0.name.lower()[-2:] |
---|
1388 | exp_item = t0.name.lower()[3] |
---|
1389 | out = ( |
---|
1390 | db(t0) |
---|
1391 | .select( |
---|
1392 | exp_slice, |
---|
1393 | exp_item, |
---|
1394 | exp_slice_no_max, |
---|
1395 | exp_slice_neg_max, |
---|
1396 | exp_slice_neg_start, |
---|
1397 | ) |
---|
1398 | .first() |
---|
1399 | ) |
---|
1400 | self.assertEqual(out[exp_slice], input_name[4:6]) |
---|
1401 | self.assertEqual(out[exp_item], input_name[3]) |
---|
1402 | self.assertEqual(out[exp_slice_no_max], input_name[4:]) |
---|
1403 | self.assertEqual(out[exp_slice_neg_max], input_name[2:-2]) |
---|
1404 | self.assertEqual(out[exp_slice_neg_start], input_name[-2:]) |
---|
1405 | |
---|
1406 | def testOps(self): |
---|
1407 | db = self.connect() |
---|
1408 | t0 = db.define_table("t0", Field("vv", "integer")) |
---|
1409 | self.assertEqual(db.t0.insert(vv=1), 1) |
---|
1410 | self.assertEqual(db.t0.insert(vv=2), 2) |
---|
1411 | self.assertEqual(db.t0.insert(vv=3), 3) |
---|
1412 | sum = db.t0.vv.sum() |
---|
1413 | count = db.t0.vv.count() |
---|
1414 | avg = db.t0.vv.avg() |
---|
1415 | op = sum / count |
---|
1416 | op1 = (sum / count).with_alias("tot") |
---|
1417 | self.assertEqual(db(t0).select(op).first()[op], 2) |
---|
1418 | self.assertEqual(db(t0).select(op1).first()[op1], 2) |
---|
1419 | print("DICT", db(t0).select(op1).as_dict()) |
---|
1420 | self.assertEqual(db(t0).select(op1).first()["tot"], 2) |
---|
1421 | op2 = avg * count |
---|
1422 | self.assertEqual(db(t0).select(op2).first()[op2], 6) |
---|
1423 | # the following is not possible at least on sqlite |
---|
1424 | sum = db.t0.vv.sum().with_alias("s") |
---|
1425 | count = db.t0.vv.count().with_alias("c") |
---|
1426 | op = sum / count |
---|
1427 | # self.assertEqual(db(t0).select(op).first()[op], 2) |
---|
1428 | |
---|
1429 | |
---|
1430 | class TestTableAliasing(DALtest): |
---|
1431 | def testRun(self): |
---|
1432 | db = self.connect() |
---|
1433 | db.define_table("t1", Field("aa")) |
---|
1434 | db.define_table( |
---|
1435 | "t2", |
---|
1436 | Field("pk", type="id", unique=True, notnull=True), |
---|
1437 | Field("bb", type="integer"), |
---|
1438 | rname="tt", |
---|
1439 | ) |
---|
1440 | tab1 = db.t1.with_alias("test1") |
---|
1441 | tab2 = db.t2.with_alias("test2") |
---|
1442 | self.assertIs(tab2.id, tab2.pk) |
---|
1443 | self.assertIs(tab2._id, tab2.pk) |
---|
1444 | self.assertEqual(tab1._dalname, "t1") |
---|
1445 | self.assertEqual(tab1._tablename, "test1") |
---|
1446 | self.assertEqual(tab2._dalname, "t2") |
---|
1447 | self.assertEqual(tab2._tablename, "test2") |
---|
1448 | self.assertEqual(tab2._rname, "tt") |
---|
1449 | tab1.insert(aa="foo") |
---|
1450 | tab1.insert(aa="bar") |
---|
1451 | result = db(tab1).select(tab1.aa, orderby=tab1.aa) |
---|
1452 | self.assertEqual(result.as_list(), [{"aa": "bar"}, {"aa": "foo"}]) |
---|
1453 | |
---|
1454 | if not IS_SQLITE: |
---|
1455 | db(tab1.aa == "foo").update(aa="baz") |
---|
1456 | result = db(tab1).select(tab1.aa, orderby=tab1.aa) |
---|
1457 | self.assertEqual(result.as_list(), [{"aa": "bar"}, {"aa": "baz"}]) |
---|
1458 | db(tab1.aa == "bar").delete() |
---|
1459 | result = db(tab1).select(tab1.aa, orderby=tab1.aa) |
---|
1460 | self.assertEqual(result.as_list(), [{"aa": "baz"}]) |
---|
1461 | else: |
---|
1462 | with self.assertRaises(SyntaxError): |
---|
1463 | db(tab1.aa == "foo").update(aa="baz") |
---|
1464 | with self.assertRaises(SyntaxError): |
---|
1465 | db(tab1.aa == "bar").delete() |
---|
1466 | |
---|
1467 | tab2.insert(bb=123) |
---|
1468 | tab2.insert(bb=456) |
---|
1469 | result = db(tab2).select(tab2.bb, orderby=tab2.bb) |
---|
1470 | self.assertEqual(result.as_list(), [{"bb": 123}, {"bb": 456}]) |
---|
1471 | |
---|
1472 | if not IS_SQLITE: |
---|
1473 | db(tab2.bb == 456).update(bb=789) |
---|
1474 | result = db(tab2).select(tab2.bb, orderby=tab2.bb) |
---|
1475 | self.assertEqual(result.as_list(), [{"bb": 123}, {"bb": 789}]) |
---|
1476 | db(tab2.bb == 123).delete() |
---|
1477 | result = db(tab2).select(tab2.bb, orderby=tab2.bb) |
---|
1478 | self.assertEqual(result.as_list(), [{"bb": 789}]) |
---|
1479 | else: |
---|
1480 | with self.assertRaises(SyntaxError): |
---|
1481 | db(tab2.bb == 456).update(bb=789) |
---|
1482 | with self.assertRaises(SyntaxError): |
---|
1483 | db(tab2.bb == 123).delete() |
---|
1484 | |
---|
1485 | |
---|
1486 | class TestJoin(DALtest): |
---|
1487 | def testRun(self): |
---|
1488 | db = self.connect() |
---|
1489 | db.define_table("t1", Field("aa")) |
---|
1490 | db.define_table("t2", Field("aa"), Field("b", db.t1)) |
---|
1491 | i1 = db.t1.insert(aa="1") |
---|
1492 | i2 = db.t1.insert(aa="2") |
---|
1493 | i3 = db.t1.insert(aa="3") |
---|
1494 | db.t2.insert(aa="4", b=i1) |
---|
1495 | db.t2.insert(aa="5", b=i2) |
---|
1496 | db.t2.insert(aa="6", b=i2) |
---|
1497 | self.assertEqual( |
---|
1498 | len(db(db.t1.id == db.t2.b).select(orderby=db.t1.aa | db.t2.aa)), 3 |
---|
1499 | ) |
---|
1500 | self.assertEqual( |
---|
1501 | db(db.t1.id == db.t2.b).select(orderby=db.t1.aa | db.t2.aa)[2].t1.aa, "2" |
---|
1502 | ) |
---|
1503 | self.assertEqual( |
---|
1504 | db(db.t1.id == db.t2.b).select(orderby=db.t1.aa | db.t2.aa)[2].t2.aa, "6" |
---|
1505 | ) |
---|
1506 | self.assertEqual( |
---|
1507 | len( |
---|
1508 | db().select( |
---|
1509 | db.t1.ALL, |
---|
1510 | db.t2.ALL, |
---|
1511 | left=db.t2.on(db.t1.id == db.t2.b), |
---|
1512 | orderby=db.t1.aa | db.t2.aa, |
---|
1513 | ) |
---|
1514 | ), |
---|
1515 | 4, |
---|
1516 | ) |
---|
1517 | self.assertEqual( |
---|
1518 | db() |
---|
1519 | .select( |
---|
1520 | db.t1.ALL, |
---|
1521 | db.t2.ALL, |
---|
1522 | left=db.t2.on(db.t1.id == db.t2.b), |
---|
1523 | orderby=db.t1.aa | db.t2.aa, |
---|
1524 | )[2] |
---|
1525 | .t1.aa, |
---|
1526 | "2", |
---|
1527 | ) |
---|
1528 | self.assertEqual( |
---|
1529 | db() |
---|
1530 | .select( |
---|
1531 | db.t1.ALL, |
---|
1532 | db.t2.ALL, |
---|
1533 | left=db.t2.on(db.t1.id == db.t2.b), |
---|
1534 | orderby=db.t1.aa | db.t2.aa, |
---|
1535 | )[2] |
---|
1536 | .t2.aa, |
---|
1537 | "6", |
---|
1538 | ) |
---|
1539 | self.assertEqual( |
---|
1540 | db() |
---|
1541 | .select( |
---|
1542 | db.t1.ALL, |
---|
1543 | db.t2.ALL, |
---|
1544 | left=db.t2.on(db.t1.id == db.t2.b), |
---|
1545 | orderby=db.t1.aa | db.t2.aa, |
---|
1546 | )[3] |
---|
1547 | .t1.aa, |
---|
1548 | "3", |
---|
1549 | ) |
---|
1550 | self.assertEqual( |
---|
1551 | db() |
---|
1552 | .select( |
---|
1553 | db.t1.ALL, |
---|
1554 | db.t2.ALL, |
---|
1555 | left=db.t2.on(db.t1.id == db.t2.b), |
---|
1556 | orderby=db.t1.aa | db.t2.aa, |
---|
1557 | )[3] |
---|
1558 | .t2.aa, |
---|
1559 | None, |
---|
1560 | ) |
---|
1561 | self.assertEqual( |
---|
1562 | len( |
---|
1563 | db().select( |
---|
1564 | db.t1.aa, |
---|
1565 | db.t2.id.count(), |
---|
1566 | left=db.t2.on(db.t1.id == db.t2.b), |
---|
1567 | orderby=db.t1.aa, |
---|
1568 | groupby=db.t1.aa, |
---|
1569 | ) |
---|
1570 | ), |
---|
1571 | 3, |
---|
1572 | ) |
---|
1573 | self.assertEqual( |
---|
1574 | db() |
---|
1575 | .select( |
---|
1576 | db.t1.aa, |
---|
1577 | db.t2.id.count(), |
---|
1578 | left=db.t2.on(db.t1.id == db.t2.b), |
---|
1579 | orderby=db.t1.aa, |
---|
1580 | groupby=db.t1.aa, |
---|
1581 | )[0] |
---|
1582 | ._extra[db.t2.id.count()], |
---|
1583 | 1, |
---|
1584 | ) |
---|
1585 | self.assertEqual( |
---|
1586 | db() |
---|
1587 | .select( |
---|
1588 | db.t1.aa, |
---|
1589 | db.t2.id.count(), |
---|
1590 | left=db.t2.on(db.t1.id == db.t2.b), |
---|
1591 | orderby=db.t1.aa, |
---|
1592 | groupby=db.t1.aa, |
---|
1593 | )[1] |
---|
1594 | ._extra[db.t2.id.count()], |
---|
1595 | 2, |
---|
1596 | ) |
---|
1597 | self.assertEqual( |
---|
1598 | db() |
---|
1599 | .select( |
---|
1600 | db.t1.aa, |
---|
1601 | db.t2.id.count(), |
---|
1602 | left=db.t2.on(db.t1.id == db.t2.b), |
---|
1603 | orderby=db.t1.aa, |
---|
1604 | groupby=db.t1.aa, |
---|
1605 | )[2] |
---|
1606 | ._extra[db.t2.id.count()], |
---|
1607 | 0, |
---|
1608 | ) |
---|
1609 | db.t2.drop() |
---|
1610 | db.t1.drop() |
---|
1611 | |
---|
1612 | db.define_table("person", Field("name")) |
---|
1613 | id = db.person.insert(name="max") |
---|
1614 | self.assertEqual(id.name, "max") |
---|
1615 | db.define_table("dog", Field("name"), Field("ownerperson", "reference person")) |
---|
1616 | db.dog.insert(name="skipper", ownerperson=1) |
---|
1617 | row = db(db.person.id == db.dog.ownerperson).select().first() |
---|
1618 | self.assertEqual(row[db.person.name], "max") |
---|
1619 | self.assertEqual(row["person.name"], "max") |
---|
1620 | db.dog.drop() |
---|
1621 | self.assertEqual(len(db.person._referenced_by), 0) |
---|
1622 | |
---|
1623 | |
---|
1624 | class TestMinMaxSumAvg(DALtest): |
---|
1625 | def testRun(self): |
---|
1626 | db = self.connect() |
---|
1627 | db.define_table("tt", Field("aa", "integer")) |
---|
1628 | self.assertEqual(db.tt.insert(aa=1), 1) |
---|
1629 | self.assertEqual(db.tt.insert(aa=2), 2) |
---|
1630 | self.assertEqual(db.tt.insert(aa=3), 3) |
---|
1631 | s = db.tt.aa.min() |
---|
1632 | self.assertEqual(db(db.tt.id > 0).select(s)[0]._extra[s], 1) |
---|
1633 | self.assertEqual(db(db.tt.id > 0).select(s).first()[s], 1) |
---|
1634 | self.assertEqual(db().select(s).first()[s], 1) |
---|
1635 | s = db.tt.aa.max() |
---|
1636 | self.assertEqual(db().select(s).first()[s], 3) |
---|
1637 | s = db.tt.aa.sum() |
---|
1638 | self.assertEqual(db().select(s).first()[s], 6) |
---|
1639 | s = db.tt.aa.count() |
---|
1640 | self.assertEqual(db().select(s).first()[s], 3) |
---|
1641 | s = db.tt.aa.avg() |
---|
1642 | self.assertEqual(db().select(s).first()[s], 2) |
---|
1643 | |
---|
1644 | |
---|
1645 | class TestMigrations(unittest.TestCase): |
---|
1646 | def testRun(self): |
---|
1647 | db = DAL(DEFAULT_URI, check_reserved=["all"]) |
---|
1648 | db.define_table("tt", Field("aa"), Field("BB"), migrate=".storage.table") |
---|
1649 | db.define_table( |
---|
1650 | "t1", Field("aa"), Field("BB"), migrate=".storage.rname", rname="foo" |
---|
1651 | ) |
---|
1652 | db.commit() |
---|
1653 | db.close() |
---|
1654 | db = DAL(DEFAULT_URI, check_reserved=["all"]) |
---|
1655 | db.define_table("tt", Field("aa"), migrate=".storage.table") |
---|
1656 | db.define_table("t1", Field("aa"), migrate=".storage.rname", rname="foo") |
---|
1657 | db.commit() |
---|
1658 | db.close() |
---|
1659 | db = DAL(DEFAULT_URI, check_reserved=["all"]) |
---|
1660 | db.define_table("tt", Field("aa"), Field("b"), migrate=".storage.table") |
---|
1661 | db.define_table( |
---|
1662 | "t1", Field("aa"), Field("b"), migrate=".storage.rname", rname="foo" |
---|
1663 | ) |
---|
1664 | db.commit() |
---|
1665 | db.close() |
---|
1666 | db = DAL(DEFAULT_URI, check_reserved=["all"]) |
---|
1667 | db.define_table("tt", Field("aa"), Field("b", "text"), migrate=".storage.table") |
---|
1668 | db.define_table( |
---|
1669 | "t1", Field("aa"), Field("b", "text"), migrate=".storage.rname", rname="foo" |
---|
1670 | ) |
---|
1671 | db.commit() |
---|
1672 | db.close() |
---|
1673 | db = DAL(DEFAULT_URI, check_reserved=["all"]) |
---|
1674 | db.define_table("tt", Field("aa"), migrate=".storage.table") |
---|
1675 | db.define_table("t1", Field("aa"), migrate=".storage.rname", rname="foo") |
---|
1676 | db.tt.drop() |
---|
1677 | db.t1.drop() |
---|
1678 | db.commit() |
---|
1679 | db.close() |
---|
1680 | |
---|
1681 | def testFieldRName(self): |
---|
1682 | def checkWrite(db, table, data): |
---|
1683 | rowid = table.insert(**data) |
---|
1684 | query = table._id == rowid |
---|
1685 | fields = [table[x] for x in data.keys()] |
---|
1686 | row = db(query).select(*fields).first() |
---|
1687 | self.assertIsNot(row, None) |
---|
1688 | self.assertEqual(row.as_dict(), data) |
---|
1689 | db(query).delete() |
---|
1690 | |
---|
1691 | # Create tables |
---|
1692 | db = DAL(DEFAULT_URI, check_reserved=["all"]) |
---|
1693 | db.define_table( |
---|
1694 | "tt", |
---|
1695 | Field("aa", rname="faa"), |
---|
1696 | Field("BB", rname="fbb"), |
---|
1697 | migrate=".storage.table", |
---|
1698 | ) |
---|
1699 | db.define_table( |
---|
1700 | "t1", |
---|
1701 | Field("aa", rname="faa"), |
---|
1702 | Field("BB", rname="fbb"), |
---|
1703 | migrate=".storage.rname", |
---|
1704 | rname="foo", |
---|
1705 | ) |
---|
1706 | data = dict(aa="aa1", BB="BB1") |
---|
1707 | checkWrite(db, db.tt, data) |
---|
1708 | checkWrite(db, db.t1, data) |
---|
1709 | db.commit() |
---|
1710 | db.close() |
---|
1711 | |
---|
1712 | # Drop field defined by CREATE TABLE |
---|
1713 | db = DAL(DEFAULT_URI, check_reserved=["all"]) |
---|
1714 | db.define_table("tt", Field("aa", rname="faa"), migrate=".storage.table") |
---|
1715 | db.define_table( |
---|
1716 | "t1", Field("aa", rname="faa"), migrate=".storage.rname", rname="foo" |
---|
1717 | ) |
---|
1718 | data = dict(aa="aa2") |
---|
1719 | checkWrite(db, db.tt, data) |
---|
1720 | checkWrite(db, db.t1, data) |
---|
1721 | db.commit() |
---|
1722 | db.close() |
---|
1723 | |
---|
1724 | # Add new field |
---|
1725 | db = DAL(DEFAULT_URI, check_reserved=["all"]) |
---|
1726 | db.define_table( |
---|
1727 | "tt", |
---|
1728 | Field("aa", rname="faa"), |
---|
1729 | Field("b", rname="fb"), |
---|
1730 | migrate=".storage.table", |
---|
1731 | ) |
---|
1732 | db.define_table( |
---|
1733 | "t1", |
---|
1734 | Field("aa", rname="faa"), |
---|
1735 | Field("b", rname="fb"), |
---|
1736 | migrate=".storage.rname", |
---|
1737 | rname="foo", |
---|
1738 | ) |
---|
1739 | data = dict(aa="aa3", b="b3") |
---|
1740 | integrity = dict(aa="data", b="integrity") |
---|
1741 | checkWrite(db, db.tt, data) |
---|
1742 | checkWrite(db, db.t1, data) |
---|
1743 | db.tt.insert(**integrity) |
---|
1744 | db.t1.insert(**integrity) |
---|
1745 | db.commit() |
---|
1746 | db.close() |
---|
1747 | |
---|
1748 | if not IS_SQLITE: |
---|
1749 | |
---|
1750 | # Change field type |
---|
1751 | db = DAL(DEFAULT_URI, check_reserved=["all"]) |
---|
1752 | db.define_table( |
---|
1753 | "tt", |
---|
1754 | Field("aa", rname="faa"), |
---|
1755 | Field("b", "text", rname="fb"), |
---|
1756 | migrate=".storage.table", |
---|
1757 | ) |
---|
1758 | db.define_table( |
---|
1759 | "t1", |
---|
1760 | Field("aa", rname="faa"), |
---|
1761 | Field("b", "text", rname="fb"), |
---|
1762 | migrate=".storage.rname", |
---|
1763 | rname="foo", |
---|
1764 | ) |
---|
1765 | data = dict(aa="aa4", b="b4") |
---|
1766 | checkWrite(db, db.tt, data) |
---|
1767 | checkWrite(db, db.t1, data) |
---|
1768 | row = db(db.tt).select(*[db.tt[x] for x in integrity.keys()]).first() |
---|
1769 | self.assertIsNot(row, None) |
---|
1770 | self.assertEqual(row.as_dict(), integrity) |
---|
1771 | row2 = db(db.t1).select(*[db.t1[x] for x in integrity.keys()]).first() |
---|
1772 | self.assertIsNot(row2, None) |
---|
1773 | self.assertEqual(row2.as_dict(), integrity) |
---|
1774 | db.commit() |
---|
1775 | db.close() |
---|
1776 | |
---|
1777 | if not IS_SQLITE: |
---|
1778 | |
---|
1779 | # Change field rname |
---|
1780 | db = DAL(DEFAULT_URI, check_reserved=["all"]) |
---|
1781 | db.define_table( |
---|
1782 | "tt", |
---|
1783 | Field("aa", rname="faa"), |
---|
1784 | Field("b", "text", rname="xb"), |
---|
1785 | migrate=".storage.table", |
---|
1786 | ) |
---|
1787 | db.define_table( |
---|
1788 | "t1", |
---|
1789 | Field("aa", rname="faa"), |
---|
1790 | Field("b", "text", rname="xb"), |
---|
1791 | migrate=".storage.rname", |
---|
1792 | rname="foo", |
---|
1793 | ) |
---|
1794 | data = dict(aa="aa4", b="b4") |
---|
1795 | checkWrite(db, db.tt, data) |
---|
1796 | checkWrite(db, db.t1, data) |
---|
1797 | row = db(db.tt).select(*[db.tt[x] for x in integrity.keys()]).first() |
---|
1798 | self.assertIsNot(row, None) |
---|
1799 | self.assertEqual(row.as_dict(), integrity) |
---|
1800 | row2 = db(db.t1).select(*[db.t1[x] for x in integrity.keys()]).first() |
---|
1801 | self.assertIsNot(row2, None) |
---|
1802 | self.assertEqual(row2.as_dict(), integrity) |
---|
1803 | db.commit() |
---|
1804 | db.close() |
---|
1805 | |
---|
1806 | # Drop field defined by ALTER TABLE |
---|
1807 | db = DAL(DEFAULT_URI, check_reserved=["all"]) |
---|
1808 | db.define_table("tt", Field("aa", rname="faa"), migrate=".storage.table") |
---|
1809 | db.define_table( |
---|
1810 | "t1", Field("aa", rname="faa"), migrate=".storage.rname", rname="foo" |
---|
1811 | ) |
---|
1812 | data = dict(aa="aa5") |
---|
1813 | checkWrite(db, db.tt, data) |
---|
1814 | checkWrite(db, db.t1, data) |
---|
1815 | db.tt.drop() |
---|
1816 | db.t1.drop() |
---|
1817 | db.commit() |
---|
1818 | db.close() |
---|
1819 | |
---|
1820 | def tearDown(self): |
---|
1821 | if os.path.exists(".storage.db"): |
---|
1822 | os.unlink(".storage.db") |
---|
1823 | if os.path.exists(".storage.table"): |
---|
1824 | os.unlink(".storage.table") |
---|
1825 | if os.path.exists(".storage.rname"): |
---|
1826 | os.unlink(".storage.rname") |
---|
1827 | |
---|
1828 | |
---|
1829 | class TestReference(DALtest): |
---|
1830 | def testRun(self): |
---|
1831 | scenarios = ( |
---|
1832 | (True, "CASCADE"), |
---|
1833 | (False, "CASCADE"), |
---|
1834 | (False, "SET NULL"), |
---|
1835 | ) |
---|
1836 | for (b, ondelete) in scenarios: |
---|
1837 | db = self.connect(bigint_id=b) |
---|
1838 | if DEFAULT_URI.startswith("mssql"): |
---|
1839 | # multiple cascade gotcha |
---|
1840 | for key in ["reference", "reference FK"]: |
---|
1841 | db._adapter.types[key] = db._adapter.types[key].replace( |
---|
1842 | "%(on_delete_action)s", "NO ACTION" |
---|
1843 | ) |
---|
1844 | db.define_table( |
---|
1845 | "tt", Field("name"), Field("aa", "reference tt", ondelete=ondelete) |
---|
1846 | ) |
---|
1847 | db.commit() |
---|
1848 | x = db.tt.insert(name="xxx") |
---|
1849 | self.assertEqual(x.id, 1) |
---|
1850 | self.assertEqual(x["id"], 1) |
---|
1851 | x.aa = x |
---|
1852 | self.assertEqual(x.aa, 1) |
---|
1853 | x.update_record() |
---|
1854 | x1 = db.tt[1] |
---|
1855 | self.assertEqual(x1.aa, 1) |
---|
1856 | self.assertEqual(x1.aa.aa.aa.aa.aa.aa.name, "xxx") |
---|
1857 | y = db.tt.insert(name="yyy", aa=x1) |
---|
1858 | self.assertEqual(y.aa, x1.id) |
---|
1859 | |
---|
1860 | if not DEFAULT_URI.startswith("mssql"): |
---|
1861 | self.assertEqual(db.tt.insert(name="zzz"), 3) |
---|
1862 | self.assertEqual(db(db.tt.name).count(), 3) |
---|
1863 | db(db.tt.id == x).delete() |
---|
1864 | expected_count = { |
---|
1865 | "SET NULL": 2, |
---|
1866 | "NO ACTION": 2, |
---|
1867 | "CASCADE": 1, |
---|
1868 | } |
---|
1869 | self.assertEqual(db(db.tt.name).count(), expected_count[ondelete]) |
---|
1870 | if ondelete == "SET NULL": |
---|
1871 | self.assertEqual(db(db.tt.name == "yyy").select()[0].aa, None) |
---|
1872 | |
---|
1873 | self.tearDown() |
---|
1874 | |
---|
1875 | |
---|
1876 | class TestClientLevelOps(DALtest): |
---|
1877 | def testRun(self): |
---|
1878 | db = self.connect() |
---|
1879 | db.define_table( |
---|
1880 | "tt", |
---|
1881 | Field("aa", represent=lambda x, r: "x" + x), |
---|
1882 | Field("bb", type="integer", represent=lambda x, r: "y" + str(x)), |
---|
1883 | ) |
---|
1884 | db.commit() |
---|
1885 | db.tt.insert(aa="test", bb=1) |
---|
1886 | rows1 = db(db.tt.id < 0).select() |
---|
1887 | rows2 = db(db.tt.id > 0).select() |
---|
1888 | self.assertNotEqual(rows1, rows2) |
---|
1889 | rows1 = db(db.tt.id > 0).select() |
---|
1890 | rows2 = db(db.tt.id > 0).select() |
---|
1891 | self.assertEqual(rows1, rows2) |
---|
1892 | rows3 = rows1 + rows2 |
---|
1893 | self.assertEqual(len(rows3), 2) |
---|
1894 | rows4 = rows1 | rows2 |
---|
1895 | self.assertEqual(len(rows4), 1) |
---|
1896 | rows5 = rows1 & rows2 |
---|
1897 | self.assertEqual(len(rows5), 1) |
---|
1898 | rows6 = rows1.find(lambda row: row.aa == "test") |
---|
1899 | self.assertEqual(len(rows6), 1) |
---|
1900 | rows7 = rows2.exclude(lambda row: row.aa == "test") |
---|
1901 | self.assertEqual(len(rows7), 1) |
---|
1902 | rows8 = rows5.sort(lambda row: row.aa) |
---|
1903 | self.assertEqual(len(rows8), 1) |
---|
1904 | |
---|
1905 | def represent(f, v, r): |
---|
1906 | return "z" + str(v) |
---|
1907 | |
---|
1908 | db.representers = { |
---|
1909 | "rows_render": represent, |
---|
1910 | } |
---|
1911 | db.tt.insert(aa="foo", bb=2) |
---|
1912 | rows = db(db.tt.id > 0).select() |
---|
1913 | exp1 = [ |
---|
1914 | Row(aa="ztest", bb="z1", id=rows[0]["id"]), |
---|
1915 | Row(aa="zfoo", bb="z2", id=rows[1]["id"]), |
---|
1916 | ] |
---|
1917 | exp2 = [ |
---|
1918 | Row(aa="ztest", bb=1, id=rows[0]["id"]), |
---|
1919 | Row(aa="zfoo", bb=2, id=rows[1]["id"]), |
---|
1920 | ] |
---|
1921 | exp3 = [ |
---|
1922 | Row(aa="test", bb="z1", id=rows[0]["id"]), |
---|
1923 | Row(aa="foo", bb="z2", id=rows[1]["id"]), |
---|
1924 | ] |
---|
1925 | self.assertEqual(rows.render(i=0), exp1[0]) |
---|
1926 | self.assertEqual(rows.render(i=0, fields=[db.tt.aa, db.tt.bb]), exp1[0]) |
---|
1927 | self.assertEqual(rows.render(i=0, fields=[db.tt.aa]), exp2[0]) |
---|
1928 | self.assertEqual(rows.render(i=0, fields=[db.tt.bb]), exp3[0]) |
---|
1929 | self.assertEqual(list(rows.render()), exp1) |
---|
1930 | self.assertEqual(list(rows.render(fields=[db.tt.aa, db.tt.bb])), exp1) |
---|
1931 | self.assertEqual(list(rows.render(fields=[db.tt.aa])), exp2) |
---|
1932 | self.assertEqual(list(rows.render(fields=[db.tt.bb])), exp3) |
---|
1933 | ret = rows.render(i=0) |
---|
1934 | rows = db(db.tt.id > 0).select() |
---|
1935 | rows.compact = False |
---|
1936 | row = rows[0] |
---|
1937 | self.assertIn("tt", row) |
---|
1938 | self.assertIn("id", row.tt) |
---|
1939 | self.assertNotIn("id", row) |
---|
1940 | rows.compact = True |
---|
1941 | row = rows[0] |
---|
1942 | self.assertNotIn("tt", row) |
---|
1943 | self.assertIn("id", row) |
---|
1944 | |
---|
1945 | rows = db(db.tt.id > 0).select(db.tt.id.max()) |
---|
1946 | rows.compact = False |
---|
1947 | row = rows[0] |
---|
1948 | self.assertNotIn("tt", row) |
---|
1949 | self.assertIn("_extra", row) |
---|
1950 | |
---|
1951 | rows = db(db.tt.id > 0).select(db.tt.id.max()) |
---|
1952 | rows.compact = True |
---|
1953 | row = rows[0] |
---|
1954 | self.assertNotIn("tt", row) |
---|
1955 | self.assertIn("_extra", row) |
---|
1956 | db.tt.drop() |
---|
1957 | |
---|
1958 | db.define_table("tt", Field("aa"), Field.Virtual("bb", lambda row: ":p")) |
---|
1959 | db.tt.insert(aa="test") |
---|
1960 | rows = db(db.tt.id > 0).select() |
---|
1961 | row = rows.first() |
---|
1962 | self.assertNotIn("tt", row) |
---|
1963 | self.assertIn("id", row) |
---|
1964 | self.assertIn("bb", row) |
---|
1965 | |
---|
1966 | rows.compact = False |
---|
1967 | row = rows.first() |
---|
1968 | self.assertIn("tt", row) |
---|
1969 | self.assertEqual(len(row.keys()), 1) |
---|
1970 | self.assertIn("id", row.tt) |
---|
1971 | self.assertIn("bb", row.tt) |
---|
1972 | self.assertNotIn("id", row) |
---|
1973 | self.assertNotIn("bb", row) |
---|
1974 | |
---|
1975 | |
---|
1976 | class TestVirtualFields(DALtest): |
---|
1977 | def testRun(self): |
---|
1978 | db = self.connect() |
---|
1979 | db.define_table("tt", Field("aa")) |
---|
1980 | db.commit() |
---|
1981 | db.tt.insert(aa="test") |
---|
1982 | |
---|
1983 | class Compute: |
---|
1984 | def a_upper(row): |
---|
1985 | return row.tt.aa.upper() |
---|
1986 | |
---|
1987 | db.tt.virtualfields.append(Compute()) |
---|
1988 | assert db(db.tt.id > 0).select().first().a_upper == "TEST" |
---|
1989 | |
---|
1990 | |
---|
1991 | class TestComputedFields(DALtest): |
---|
1992 | def testRun(self): |
---|
1993 | db = self.connect() |
---|
1994 | db.define_table( |
---|
1995 | "tt", |
---|
1996 | Field("aa"), |
---|
1997 | Field("bb", default="x"), |
---|
1998 | Field("cc", compute=lambda r: r.aa + r.bb), |
---|
1999 | ) |
---|
2000 | db.commit() |
---|
2001 | id = db.tt.insert(aa="z") |
---|
2002 | self.assertEqual(db.tt[id].cc, "zx") |
---|
2003 | db.tt.drop() |
---|
2004 | db.commit() |
---|
2005 | |
---|
2006 | # test checking that a compute field can refer to earlier-defined computed fields |
---|
2007 | db.define_table( |
---|
2008 | "tt", |
---|
2009 | Field("aa"), |
---|
2010 | Field("bb", default="x"), |
---|
2011 | Field("cc", compute=lambda r: r.aa + r.bb), |
---|
2012 | Field("dd", compute=lambda r: r.bb + r.cc), |
---|
2013 | ) |
---|
2014 | db.commit() |
---|
2015 | id = db.tt.insert(aa="z") |
---|
2016 | self.assertEqual(db.tt[id].dd, "xzx") |
---|
2017 | |
---|
2018 | |
---|
2019 | class TestCommonFilters(DALtest): |
---|
2020 | def testRun(self): |
---|
2021 | db = self.connect() |
---|
2022 | db.define_table("t1", Field("aa", "integer")) |
---|
2023 | db.define_table("t2", Field("aa", "integer"), Field("b", db.t1)) |
---|
2024 | i1 = db.t1.insert(aa=1) |
---|
2025 | i2 = db.t1.insert(aa=2) |
---|
2026 | i3 = db.t1.insert(aa=3) |
---|
2027 | db.t2.insert(aa=4, b=i1) |
---|
2028 | db.t2.insert(aa=5, b=i2) |
---|
2029 | db.t2.insert(aa=6, b=i2) |
---|
2030 | db.t1._common_filter = lambda q: db.t1.aa > 1 |
---|
2031 | self.assertEqual(db(db.t1).count(), 2) |
---|
2032 | self.assertEqual(db(db.t1).count(), 2) |
---|
2033 | q = db.t2.b == db.t1.id |
---|
2034 | self.assertEqual(db(q).count(), 2) |
---|
2035 | self.assertEqual(db(q).count(), 2) |
---|
2036 | self.assertEqual(len(db(db.t1).select(left=db.t2.on(q))), 3) |
---|
2037 | db.t2._common_filter = lambda q: db.t2.aa < 6 |
---|
2038 | self.assertEqual(db(q).count(), 1) |
---|
2039 | self.assertEqual(db(q).count(), 1) |
---|
2040 | self.assertEqual(len(db(db.t1).select(left=db.t2.on(q))), 2) |
---|
2041 | # test delete |
---|
2042 | self.assertEqual(db(db.t2).count(), 2) |
---|
2043 | db(db.t2).delete() |
---|
2044 | self.assertEqual(db(db.t2).count(), 0) |
---|
2045 | db.t2._common_filter = None |
---|
2046 | self.assertEqual(db(db.t2).count(), 1) |
---|
2047 | # test update |
---|
2048 | db.t2.insert(aa=4, b=i1) |
---|
2049 | db.t2.insert(aa=5, b=i2) |
---|
2050 | db.t2._common_filter = lambda q: db.t2.aa < 6 |
---|
2051 | self.assertEqual(db(db.t2).count(), 2) |
---|
2052 | db(db.t2).update(aa=6) |
---|
2053 | self.assertEqual(db(db.t2).count(), 0) |
---|
2054 | db.t2._common_filter = None |
---|
2055 | self.assertEqual(db(db.t2).count(), 3) |
---|
2056 | |
---|
2057 | |
---|
2058 | class TestImportExportFields(DALtest): |
---|
2059 | def testRun(self): |
---|
2060 | db = self.connect() |
---|
2061 | db.define_table("person", Field("name")) |
---|
2062 | db.define_table("pet", Field("friend", db.person), Field("name")) |
---|
2063 | for n in range(2): |
---|
2064 | db(db.pet).delete() |
---|
2065 | db(db.person).delete() |
---|
2066 | for k in range(10): |
---|
2067 | id = db.person.insert(name=str(k)) |
---|
2068 | db.pet.insert(friend=id, name=str(k)) |
---|
2069 | db.commit() |
---|
2070 | stream = StringIO() |
---|
2071 | db.export_to_csv_file(stream) |
---|
2072 | db(db.pet).delete() |
---|
2073 | db(db.person).delete() |
---|
2074 | stream = StringIO(stream.getvalue()) |
---|
2075 | db.import_from_csv_file(stream) |
---|
2076 | assert ( |
---|
2077 | db(db.person.id == db.pet.friend)(db.person.name == db.pet.name).count() |
---|
2078 | == 10 |
---|
2079 | ) |
---|
2080 | |
---|
2081 | |
---|
2082 | class TestImportExportUuidFields(DALtest): |
---|
2083 | def testRun(self): |
---|
2084 | db = self.connect() |
---|
2085 | db.define_table("person", Field("name"), Field("uuid")) |
---|
2086 | db.define_table("pet", Field("friend", db.person), Field("name")) |
---|
2087 | for n in range(2): |
---|
2088 | db(db.pet).delete() |
---|
2089 | db(db.person).delete() |
---|
2090 | for k in range(10): |
---|
2091 | id = db.person.insert(name=str(k), uuid=str(k)) |
---|
2092 | db.pet.insert(friend=id, name=str(k)) |
---|
2093 | db.commit() |
---|
2094 | stream = StringIO() |
---|
2095 | db.export_to_csv_file(stream) |
---|
2096 | stream = StringIO(stream.getvalue()) |
---|
2097 | db.import_from_csv_file(stream) |
---|
2098 | assert db(db.person).count() == 10 |
---|
2099 | assert ( |
---|
2100 | db(db.person.id == db.pet.friend)(db.person.name == db.pet.name).count() |
---|
2101 | == 20 |
---|
2102 | ) |
---|
2103 | |
---|
2104 | |
---|
2105 | class TestDALDictImportExport(unittest.TestCase): |
---|
2106 | def testRun(self): |
---|
2107 | db = DAL(DEFAULT_URI, check_reserved=["all"]) |
---|
2108 | db.define_table("person", Field("name", default="Michael"), Field("uuid")) |
---|
2109 | db.define_table("pet", Field("friend", db.person), Field("name")) |
---|
2110 | dbdict = db.as_dict(flat=True, sanitize=False) |
---|
2111 | assert isinstance(dbdict, dict) |
---|
2112 | uri = dbdict["uri"] |
---|
2113 | assert isinstance(uri, basestring) and uri |
---|
2114 | assert len(dbdict["tables"]) == 2 |
---|
2115 | assert len(dbdict["tables"][0]["fields"]) == 3 |
---|
2116 | assert dbdict["tables"][0]["fields"][1]["type"] == db.person.name.type |
---|
2117 | assert dbdict["tables"][0]["fields"][1]["default"] == db.person.name.default |
---|
2118 | |
---|
2119 | db2 = DAL(**dbdict) |
---|
2120 | assert len(db.tables) == len(db2.tables) |
---|
2121 | assert hasattr(db2, "pet") and isinstance(db2.pet, Table) |
---|
2122 | assert hasattr(db2.pet, "friend") and isinstance(db2.pet.friend, Field) |
---|
2123 | db.pet.drop() |
---|
2124 | db.commit() |
---|
2125 | |
---|
2126 | db2.commit() |
---|
2127 | |
---|
2128 | dbjson = db.as_json(sanitize=False) |
---|
2129 | assert isinstance(dbjson, basestring) and len(dbjson) > 0 |
---|
2130 | db3 = DAL(**json.loads(dbjson)) |
---|
2131 | assert hasattr(db3, "person") and hasattr(db3.person, "uuid") |
---|
2132 | assert db3.person.uuid.type == db.person.uuid.type |
---|
2133 | db3.person.drop() |
---|
2134 | db3.commit() |
---|
2135 | db3.close() |
---|
2136 | |
---|
2137 | mpfc = "Monty Python's Flying Circus" |
---|
2138 | dbdict4 = { |
---|
2139 | "uri": DEFAULT_URI, |
---|
2140 | "tables": [ |
---|
2141 | { |
---|
2142 | "tablename": "tvshow", |
---|
2143 | "fields": [ |
---|
2144 | {"fieldname": "name", "default": mpfc}, |
---|
2145 | {"fieldname": "rating", "type": "double"}, |
---|
2146 | ], |
---|
2147 | }, |
---|
2148 | { |
---|
2149 | "tablename": "staff", |
---|
2150 | "fields": [ |
---|
2151 | {"fieldname": "name", "default": "Michael"}, |
---|
2152 | {"fieldname": "food", "default": "Spam"}, |
---|
2153 | {"fieldname": "tvshow", "type": "reference tvshow"}, |
---|
2154 | ], |
---|
2155 | }, |
---|
2156 | ], |
---|
2157 | } |
---|
2158 | db4 = DAL(**dbdict4) |
---|
2159 | assert "staff" in db4.tables |
---|
2160 | assert "name" in db4.staff |
---|
2161 | assert db4.tvshow.rating.type == "double" |
---|
2162 | assert ( |
---|
2163 | db4.tvshow.insert(), |
---|
2164 | db4.tvshow.insert(name="Loriot"), |
---|
2165 | db4.tvshow.insert(name="Il Mattatore"), |
---|
2166 | ) == (1, 2, 3) |
---|
2167 | assert db4(db4.tvshow).select().first().id == 1 |
---|
2168 | assert db4(db4.tvshow).select().first().name == mpfc |
---|
2169 | |
---|
2170 | db4.staff.drop() |
---|
2171 | db4.tvshow.drop() |
---|
2172 | db4.commit() |
---|
2173 | |
---|
2174 | dbdict5 = {"uri": DEFAULT_URI} |
---|
2175 | db5 = DAL(**dbdict5) |
---|
2176 | assert db5.tables in ([], None) |
---|
2177 | assert not (str(db5) in ("", None)) |
---|
2178 | |
---|
2179 | dbdict6 = { |
---|
2180 | "uri": DEFAULT_URI, |
---|
2181 | "tables": [ |
---|
2182 | {"tablename": "staff"}, |
---|
2183 | { |
---|
2184 | "tablename": "tvshow", |
---|
2185 | "fields": [ |
---|
2186 | {"fieldname": "name"}, |
---|
2187 | {"fieldname": "rating", "type": "double"}, |
---|
2188 | ], |
---|
2189 | }, |
---|
2190 | ], |
---|
2191 | } |
---|
2192 | db6 = DAL(**dbdict6) |
---|
2193 | |
---|
2194 | assert len(db6["staff"].fields) == 1 |
---|
2195 | assert "name" in db6["tvshow"].fields |
---|
2196 | |
---|
2197 | assert db6.staff.insert() is not None |
---|
2198 | assert db6(db6.staff).select().first().id == 1 |
---|
2199 | |
---|
2200 | db6.staff.drop() |
---|
2201 | db6.tvshow.drop() |
---|
2202 | db6.commit() |
---|
2203 | |
---|
2204 | db.close() |
---|
2205 | db2.close() |
---|
2206 | db4.close() |
---|
2207 | db5.close() |
---|
2208 | db6.close() |
---|
2209 | |
---|
2210 | |
---|
2211 | class TestSelectAsDict(DALtest): |
---|
2212 | def testSelect(self): |
---|
2213 | db = self.connect() |
---|
2214 | if IS_ORACLE: |
---|
2215 | # if lowercase fieldnames desired in return, must be quoted in oracle |
---|
2216 | db.define_table( |
---|
2217 | "a_table", Field("b_field"), Field("a_field"), |
---|
2218 | ) |
---|
2219 | db.a_table.insert(a_field="aa1", b_field="bb1") |
---|
2220 | rtn = db.executesql( |
---|
2221 | 'SELECT "id", "b_field", "a_field" FROM "a_table"', as_dict=True |
---|
2222 | ) |
---|
2223 | self.assertEqual(rtn[0]["b_field"], "bb1") |
---|
2224 | rtn = db.executesql( |
---|
2225 | 'SELECT "id", "b_field", "a_field" FROM "a_table"', as_ordered_dict=True |
---|
2226 | ) |
---|
2227 | self.assertEqual(rtn[0]["b_field"], "bb1") |
---|
2228 | self.assertEqual(list(rtn[0].keys()), ["id", "b_field", "a_field"]) |
---|
2229 | |
---|
2230 | else: |
---|
2231 | db.define_table( |
---|
2232 | "a_table", Field("b_field"), Field("a_field"), |
---|
2233 | ) |
---|
2234 | db.a_table.insert(a_field="aa1", b_field="bb1") |
---|
2235 | rtn = db.executesql( |
---|
2236 | "SELECT id, b_field, a_field FROM a_table", as_dict=True |
---|
2237 | ) |
---|
2238 | self.assertEqual(rtn[0]["b_field"], "bb1") |
---|
2239 | rtn = db.executesql( |
---|
2240 | "SELECT id, b_field, a_field FROM a_table", as_ordered_dict=True |
---|
2241 | ) |
---|
2242 | self.assertEqual(rtn[0]["b_field"], "bb1") |
---|
2243 | self.assertEqual(list(rtn[0].keys()), ["id", "b_field", "a_field"]) |
---|
2244 | |
---|
2245 | |
---|
2246 | class TestExecuteSQL(DALtest): |
---|
2247 | def testSelect(self): |
---|
2248 | if IS_ORACLE: |
---|
2249 | # see note on prior test |
---|
2250 | db = self.connect(DEFAULT_URI, entity_quoting=True) |
---|
2251 | db.define_table( |
---|
2252 | "a_table", Field("b_field"), Field("a_field"), |
---|
2253 | ) |
---|
2254 | db.a_table.insert(a_field="aa1", b_field="bb1") |
---|
2255 | rtn = db.executesql( |
---|
2256 | 'SELECT "id", "b_field", "a_field" FROM "a_table"', as_dict=True |
---|
2257 | ) |
---|
2258 | self.assertEqual(rtn[0]["b_field"], "bb1") |
---|
2259 | rtn = db.executesql( |
---|
2260 | 'SELECT "id", "b_field", "a_field" FROM "a_table"', as_ordered_dict=True |
---|
2261 | ) |
---|
2262 | self.assertEqual(rtn[0]["b_field"], "bb1") |
---|
2263 | self.assertEqual(rtn[0]["b_field"], "bb1") |
---|
2264 | self.assertEqual(list(rtn[0].keys()), ["id", "b_field", "a_field"]) |
---|
2265 | |
---|
2266 | rtn = db.executesql( |
---|
2267 | 'select "id", "b_field", "a_field" from "a_table"', fields=db.a_table |
---|
2268 | ) |
---|
2269 | self.assertTrue( |
---|
2270 | all(x in rtn[0].keys() for x in ["id", "b_field", "a_field"]) |
---|
2271 | ) |
---|
2272 | self.assertEqual(rtn[0].b_field, "bb1") |
---|
2273 | |
---|
2274 | rtn = db.executesql( |
---|
2275 | 'select "id", "b_field", "a_field" from "a_table"', |
---|
2276 | fields=db.a_table, |
---|
2277 | colnames=["a_table.id", "a_table.b_field", "a_table.a_field"], |
---|
2278 | ) |
---|
2279 | |
---|
2280 | self.assertTrue( |
---|
2281 | all(x in rtn[0].keys() for x in ["id", "b_field", "a_field"]) |
---|
2282 | ) |
---|
2283 | self.assertEqual(rtn[0].b_field, "bb1") |
---|
2284 | rtn = db.executesql( |
---|
2285 | 'select COUNT(*) from "a_table"', |
---|
2286 | fields=[db.a_table.id.count()], |
---|
2287 | colnames=["foo"], |
---|
2288 | ) |
---|
2289 | self.assertEqual(rtn[0].foo, 1) |
---|
2290 | |
---|
2291 | if not IS_ORACLE: |
---|
2292 | db = self.connect(DEFAULT_URI, entity_quoting=False) |
---|
2293 | db.define_table( |
---|
2294 | "a_table", Field("b_field"), Field("a_field"), |
---|
2295 | ) |
---|
2296 | db.a_table.insert(a_field="aa1", b_field="bb1") |
---|
2297 | rtn = db.executesql( |
---|
2298 | "SELECT id, b_field, a_field FROM a_table", as_dict=True |
---|
2299 | ) |
---|
2300 | self.assertEqual(rtn[0]["b_field"], "bb1") |
---|
2301 | rtn = db.executesql( |
---|
2302 | "SELECT id, b_field, a_field FROM a_table", as_ordered_dict=True |
---|
2303 | ) |
---|
2304 | self.assertEqual(rtn[0]["b_field"], "bb1") |
---|
2305 | self.assertEqual(rtn[0]["b_field"], "bb1") |
---|
2306 | self.assertEqual(list(rtn[0].keys()), ["id", "b_field", "a_field"]) |
---|
2307 | |
---|
2308 | rtn = db.executesql( |
---|
2309 | "select id, b_field, a_field from a_table", fields=db.a_table |
---|
2310 | ) |
---|
2311 | self.assertTrue( |
---|
2312 | all(x in rtn[0].keys() for x in ["id", "b_field", "a_field"]) |
---|
2313 | ) |
---|
2314 | self.assertEqual(rtn[0].b_field, "bb1") |
---|
2315 | |
---|
2316 | rtn = db.executesql( |
---|
2317 | "select id, b_field, a_field from a_table", |
---|
2318 | fields=db.a_table, |
---|
2319 | colnames=["a_table.id", "a_table.b_field", "a_table.a_field"], |
---|
2320 | ) |
---|
2321 | |
---|
2322 | self.assertTrue( |
---|
2323 | all(x in rtn[0].keys() for x in ["id", "b_field", "a_field"]) |
---|
2324 | ) |
---|
2325 | self.assertEqual(rtn[0].b_field, "bb1") |
---|
2326 | rtn = db.executesql( |
---|
2327 | "select COUNT(*) from a_table", |
---|
2328 | fields=[db.a_table.id.count()], |
---|
2329 | colnames=["foo"], |
---|
2330 | ) |
---|
2331 | self.assertEqual(rtn[0].foo, 1) |
---|
2332 | |
---|
2333 | |
---|
2334 | class TestRNameTable(DALtest): |
---|
2335 | # tests for highly experimental rname attribute |
---|
2336 | |
---|
2337 | def testSelect(self): |
---|
2338 | db = self.connect() |
---|
2339 | rname = "a_very_complicated_tablename" |
---|
2340 | if IS_ORACLE: |
---|
2341 | # name size limitations |
---|
2342 | rname = "a_complex_tablename" |
---|
2343 | db.define_table("easy_name", Field("a_field"), rname=rname) |
---|
2344 | rtn = db.easy_name.insert(a_field="a") |
---|
2345 | self.assertEqual(rtn.id, 1) |
---|
2346 | rtn = db(db.easy_name.a_field == "a").select() |
---|
2347 | self.assertEqual(len(rtn), 1) |
---|
2348 | self.assertEqual(rtn[0].id, 1) |
---|
2349 | self.assertEqual(rtn[0].a_field, "a") |
---|
2350 | db.easy_name.insert(a_field="b") |
---|
2351 | rtn = db(db.easy_name.id > 0).delete() |
---|
2352 | self.assertEqual(rtn, 2) |
---|
2353 | rtn = db(db.easy_name.id > 0).count() |
---|
2354 | self.assertEqual(rtn, 0) |
---|
2355 | db.easy_name.insert(a_field="a") |
---|
2356 | db.easy_name.insert(a_field="b") |
---|
2357 | rtn = db(db.easy_name.id > 0).count() |
---|
2358 | self.assertEqual(rtn, 2) |
---|
2359 | rtn = db(db.easy_name.a_field == "a").update(a_field="c") |
---|
2360 | rtn = db(db.easy_name.a_field == "c").count() |
---|
2361 | self.assertEqual(rtn, 1) |
---|
2362 | rtn = db(db.easy_name.a_field != "c").count() |
---|
2363 | self.assertEqual(rtn, 1) |
---|
2364 | avg = db.easy_name.id.avg() |
---|
2365 | rtn = db(db.easy_name.id > 0).select(avg) |
---|
2366 | self.assertEqual(rtn[0][avg], 3) |
---|
2367 | rname = "this_is_the_person_table" |
---|
2368 | db.define_table( |
---|
2369 | "person", Field("name", default="Michael"), Field("uuid"), rname=rname |
---|
2370 | ) |
---|
2371 | rname = "this_is_the_pet_table" |
---|
2372 | db.define_table( |
---|
2373 | "pet", Field("friend", "reference person"), Field("name"), rname=rname |
---|
2374 | ) |
---|
2375 | michael = db.person.insert() # default insert |
---|
2376 | john = db.person.insert(name="John") |
---|
2377 | luke = db.person.insert(name="Luke") |
---|
2378 | |
---|
2379 | # michael owns Phippo |
---|
2380 | phippo = db.pet.insert(friend=michael, name="Phippo") |
---|
2381 | # john owns Dunstin and Gertie |
---|
2382 | dunstin = db.pet.insert(friend=john, name="Dunstin") |
---|
2383 | gertie = db.pet.insert(friend=john, name="Gertie") |
---|
2384 | |
---|
2385 | rtn = db(db.person.id == db.pet.friend).select(orderby=db.person.id | db.pet.id) |
---|
2386 | self.assertEqual(len(rtn), 3) |
---|
2387 | self.assertEqual(rtn[0].person.id, michael) |
---|
2388 | self.assertEqual(rtn[0].person.name, "Michael") |
---|
2389 | self.assertEqual(rtn[0].pet.id, phippo) |
---|
2390 | self.assertEqual(rtn[0].pet.name, "Phippo") |
---|
2391 | self.assertEqual(rtn[1].person.id, john) |
---|
2392 | self.assertEqual(rtn[1].person.name, "John") |
---|
2393 | self.assertEqual(rtn[1].pet.name, "Dunstin") |
---|
2394 | self.assertEqual(rtn[2].pet.name, "Gertie") |
---|
2395 | # fetch owners, eventually with pet |
---|
2396 | # main point is retrieving Luke with no pets |
---|
2397 | rtn = db(db.person.id > 0).select( |
---|
2398 | orderby=db.person.id | db.pet.id, |
---|
2399 | left=db.pet.on(db.person.id == db.pet.friend), |
---|
2400 | ) |
---|
2401 | self.assertEqual(rtn[0].person.id, michael) |
---|
2402 | self.assertEqual(rtn[0].person.name, "Michael") |
---|
2403 | self.assertEqual(rtn[0].pet.id, phippo) |
---|
2404 | self.assertEqual(rtn[0].pet.name, "Phippo") |
---|
2405 | self.assertEqual(rtn[3].person.name, "Luke") |
---|
2406 | self.assertEqual(rtn[3].person.id, luke) |
---|
2407 | self.assertEqual(rtn[3].pet.name, None) |
---|
2408 | # lets test a subquery |
---|
2409 | subq = db(db.pet.name == "Gertie")._select(db.pet.friend) |
---|
2410 | rtn = db(db.person.id.belongs(subq)).select() |
---|
2411 | self.assertEqual(rtn[0].id, 2) |
---|
2412 | self.assertEqual(rtn[0]("person.name"), "John") |
---|
2413 | # as dict |
---|
2414 | rtn = db(db.person.id > 0).select().as_dict() |
---|
2415 | self.assertEqual(rtn[1]["name"], "Michael") |
---|
2416 | # as list |
---|
2417 | rtn = db(db.person.id > 0).select().as_list() |
---|
2418 | self.assertEqual(rtn[0]["name"], "Michael") |
---|
2419 | # isempty |
---|
2420 | rtn = db(db.person.id > 0).isempty() |
---|
2421 | self.assertEqual(rtn, False) |
---|
2422 | # join argument |
---|
2423 | rtn = db(db.person).select( |
---|
2424 | orderby=db.person.id | db.pet.id, |
---|
2425 | join=db.pet.on(db.person.id == db.pet.friend), |
---|
2426 | ) |
---|
2427 | self.assertEqual(len(rtn), 3) |
---|
2428 | self.assertEqual(rtn[0].person.id, michael) |
---|
2429 | self.assertEqual(rtn[0].person.name, "Michael") |
---|
2430 | self.assertEqual(rtn[0].pet.id, phippo) |
---|
2431 | self.assertEqual(rtn[0].pet.name, "Phippo") |
---|
2432 | self.assertEqual(rtn[1].person.id, john) |
---|
2433 | self.assertEqual(rtn[1].person.name, "John") |
---|
2434 | self.assertEqual(rtn[1].pet.name, "Dunstin") |
---|
2435 | self.assertEqual(rtn[2].pet.name, "Gertie") |
---|
2436 | |
---|
2437 | # aliases |
---|
2438 | if DEFAULT_URI.startswith("mssql"): |
---|
2439 | # multiple cascade gotcha |
---|
2440 | for key in ["reference", "reference FK"]: |
---|
2441 | db._adapter.types[key] = db._adapter.types[key].replace( |
---|
2442 | "%(on_delete_action)s", "NO ACTION" |
---|
2443 | ) |
---|
2444 | rname = "the_cubs" |
---|
2445 | db.define_table( |
---|
2446 | "pet_farm", |
---|
2447 | Field("name"), |
---|
2448 | Field("father", "reference pet_farm"), |
---|
2449 | Field("mother", "reference pet_farm"), |
---|
2450 | rname=rname, |
---|
2451 | ) |
---|
2452 | |
---|
2453 | minali = db.pet_farm.insert(name="Minali") |
---|
2454 | osbert = db.pet_farm.insert(name="Osbert") |
---|
2455 | # they had a cub |
---|
2456 | selina = db.pet_farm.insert(name="Selina", father=osbert, mother=minali) |
---|
2457 | |
---|
2458 | father = db.pet_farm.with_alias("father") |
---|
2459 | mother = db.pet_farm.with_alias("mother") |
---|
2460 | |
---|
2461 | # fetch pets with relatives |
---|
2462 | rtn = db().select( |
---|
2463 | db.pet_farm.name, |
---|
2464 | father.name, |
---|
2465 | mother.name, |
---|
2466 | left=[ |
---|
2467 | father.on(father.id == db.pet_farm.father), |
---|
2468 | mother.on(mother.id == db.pet_farm.mother), |
---|
2469 | ], |
---|
2470 | orderby=db.pet_farm.id, |
---|
2471 | ) |
---|
2472 | |
---|
2473 | self.assertEqual(len(rtn), 3) |
---|
2474 | self.assertEqual(rtn[0].pet_farm.name, "Minali") |
---|
2475 | self.assertEqual(rtn[0].father.name, None) |
---|
2476 | self.assertEqual(rtn[0].mother.name, None) |
---|
2477 | self.assertEqual(rtn[1].pet_farm.name, "Osbert") |
---|
2478 | self.assertEqual(rtn[2].pet_farm.name, "Selina") |
---|
2479 | self.assertEqual(rtn[2].father.name, "Osbert") |
---|
2480 | self.assertEqual(rtn[2].mother.name, "Minali") |
---|
2481 | |
---|
2482 | def testJoin(self): |
---|
2483 | db = self.connect() |
---|
2484 | rname = "this_is_table_t1" |
---|
2485 | rname2 = "this_is_table_t2" |
---|
2486 | db.define_table("t1", Field("aa"), rname=rname) |
---|
2487 | db.define_table("t2", Field("aa"), Field("b", db.t1), rname=rname2) |
---|
2488 | i1 = db.t1.insert(aa="1") |
---|
2489 | i2 = db.t1.insert(aa="2") |
---|
2490 | i3 = db.t1.insert(aa="3") |
---|
2491 | db.t2.insert(aa="4", b=i1) |
---|
2492 | db.t2.insert(aa="5", b=i2) |
---|
2493 | db.t2.insert(aa="6", b=i2) |
---|
2494 | self.assertEqual( |
---|
2495 | len(db(db.t1.id == db.t2.b).select(orderby=db.t1.aa | db.t2.aa)), 3 |
---|
2496 | ) |
---|
2497 | self.assertEqual( |
---|
2498 | db(db.t1.id == db.t2.b).select(orderby=db.t1.aa | db.t2.aa)[2].t1.aa, "2" |
---|
2499 | ) |
---|
2500 | self.assertEqual( |
---|
2501 | db(db.t1.id == db.t2.b).select(orderby=db.t1.aa | db.t2.aa)[2].t2.aa, "6" |
---|
2502 | ) |
---|
2503 | self.assertEqual( |
---|
2504 | len( |
---|
2505 | db().select( |
---|
2506 | db.t1.ALL, |
---|
2507 | db.t2.ALL, |
---|
2508 | left=db.t2.on(db.t1.id == db.t2.b), |
---|
2509 | orderby=db.t1.aa | db.t2.aa, |
---|
2510 | ) |
---|
2511 | ), |
---|
2512 | 4, |
---|
2513 | ) |
---|
2514 | self.assertEqual( |
---|
2515 | db() |
---|
2516 | .select( |
---|
2517 | db.t1.ALL, |
---|
2518 | db.t2.ALL, |
---|
2519 | left=db.t2.on(db.t1.id == db.t2.b), |
---|
2520 | orderby=db.t1.aa | db.t2.aa, |
---|
2521 | )[2] |
---|
2522 | .t1.aa, |
---|
2523 | "2", |
---|
2524 | ) |
---|
2525 | self.assertEqual( |
---|
2526 | db() |
---|
2527 | .select( |
---|
2528 | db.t1.ALL, |
---|
2529 | db.t2.ALL, |
---|
2530 | left=db.t2.on(db.t1.id == db.t2.b), |
---|
2531 | orderby=db.t1.aa | db.t2.aa, |
---|
2532 | )[2] |
---|
2533 | .t2.aa, |
---|
2534 | "6", |
---|
2535 | ) |
---|
2536 | self.assertEqual( |
---|
2537 | db() |
---|
2538 | .select( |
---|
2539 | db.t1.ALL, |
---|
2540 | db.t2.ALL, |
---|
2541 | left=db.t2.on(db.t1.id == db.t2.b), |
---|
2542 | orderby=db.t1.aa | db.t2.aa, |
---|
2543 | )[3] |
---|
2544 | .t1.aa, |
---|
2545 | "3", |
---|
2546 | ) |
---|
2547 | self.assertEqual( |
---|
2548 | db() |
---|
2549 | .select( |
---|
2550 | db.t1.ALL, |
---|
2551 | db.t2.ALL, |
---|
2552 | left=db.t2.on(db.t1.id == db.t2.b), |
---|
2553 | orderby=db.t1.aa | db.t2.aa, |
---|
2554 | )[3] |
---|
2555 | .t2.aa, |
---|
2556 | None, |
---|
2557 | ) |
---|
2558 | self.assertEqual( |
---|
2559 | len( |
---|
2560 | db().select( |
---|
2561 | db.t1.aa, |
---|
2562 | db.t2.id.count(), |
---|
2563 | left=db.t2.on(db.t1.id == db.t2.b), |
---|
2564 | orderby=db.t1.aa, |
---|
2565 | groupby=db.t1.aa, |
---|
2566 | ) |
---|
2567 | ), |
---|
2568 | 3, |
---|
2569 | ) |
---|
2570 | self.assertEqual( |
---|
2571 | db() |
---|
2572 | .select( |
---|
2573 | db.t1.aa, |
---|
2574 | db.t2.id.count(), |
---|
2575 | left=db.t2.on(db.t1.id == db.t2.b), |
---|
2576 | orderby=db.t1.aa, |
---|
2577 | groupby=db.t1.aa, |
---|
2578 | )[0] |
---|
2579 | ._extra[db.t2.id.count()], |
---|
2580 | 1, |
---|
2581 | ) |
---|
2582 | self.assertEqual( |
---|
2583 | db() |
---|
2584 | .select( |
---|
2585 | db.t1.aa, |
---|
2586 | db.t2.id.count(), |
---|
2587 | left=db.t2.on(db.t1.id == db.t2.b), |
---|
2588 | orderby=db.t1.aa, |
---|
2589 | groupby=db.t1.aa, |
---|
2590 | )[1] |
---|
2591 | ._extra[db.t2.id.count()], |
---|
2592 | 2, |
---|
2593 | ) |
---|
2594 | self.assertEqual( |
---|
2595 | db() |
---|
2596 | .select( |
---|
2597 | db.t1.aa, |
---|
2598 | db.t2.id.count(), |
---|
2599 | left=db.t2.on(db.t1.id == db.t2.b), |
---|
2600 | orderby=db.t1.aa, |
---|
2601 | groupby=db.t1.aa, |
---|
2602 | )[2] |
---|
2603 | ._extra[db.t2.id.count()], |
---|
2604 | 0, |
---|
2605 | ) |
---|
2606 | db.t2.drop() |
---|
2607 | db.t1.drop() |
---|
2608 | |
---|
2609 | db.define_table("person", Field("name"), rname=rname) |
---|
2610 | id = db.person.insert(name="max") |
---|
2611 | self.assertEqual(id.name, "max") |
---|
2612 | db.define_table( |
---|
2613 | "dog", Field("name"), Field("ownerperson", "reference person"), rname=rname2 |
---|
2614 | ) |
---|
2615 | db.dog.insert(name="skipper", ownerperson=1) |
---|
2616 | row = db(db.person.id == db.dog.ownerperson).select().first() |
---|
2617 | self.assertEqual(row[db.person.name], "max") |
---|
2618 | self.assertEqual(row["person.name"], "max") |
---|
2619 | db.dog.drop() |
---|
2620 | self.assertEqual(len(db.person._referenced_by), 0) |
---|
2621 | |
---|
2622 | |
---|
2623 | class TestRNameFields(DALtest): |
---|
2624 | # tests for highly experimental rname attribute |
---|
2625 | def testSelect(self): |
---|
2626 | db = self.connect() |
---|
2627 | rname = "a_very_complicated_fieldname" |
---|
2628 | rname2 = "rrating_from_1_to_10" |
---|
2629 | db.define_table( |
---|
2630 | "easy_name", |
---|
2631 | Field("a_field", rname=rname), |
---|
2632 | Field("rating", "integer", rname=rname2, default=2), |
---|
2633 | ) |
---|
2634 | rtn = db.easy_name.insert(a_field="a") |
---|
2635 | self.assertEqual(rtn.id, 1) |
---|
2636 | rtn = db(db.easy_name.a_field == "a").select() |
---|
2637 | self.assertEqual(len(rtn), 1) |
---|
2638 | self.assertEqual(rtn[0].id, 1) |
---|
2639 | self.assertEqual(rtn[0].a_field, "a") |
---|
2640 | db.easy_name.insert(a_field="b") |
---|
2641 | rtn = db(db.easy_name.id > 0).delete() |
---|
2642 | self.assertEqual(rtn, 2) |
---|
2643 | rtn = db(db.easy_name.id > 0).count() |
---|
2644 | self.assertEqual(rtn, 0) |
---|
2645 | db.easy_name.insert(a_field="a") |
---|
2646 | db.easy_name.insert(a_field="b") |
---|
2647 | rtn = db(db.easy_name.id > 0).count() |
---|
2648 | self.assertEqual(rtn, 2) |
---|
2649 | rtn = db(db.easy_name.a_field == "a").update(a_field="c") |
---|
2650 | rtn = db(db.easy_name.a_field == "c").count() |
---|
2651 | self.assertEqual(rtn, 1) |
---|
2652 | rtn = db(db.easy_name.a_field != "c").count() |
---|
2653 | self.assertEqual(rtn, 1) |
---|
2654 | avg = db.easy_name.id.avg() |
---|
2655 | rtn = db(db.easy_name.id > 0).select(avg) |
---|
2656 | self.assertEqual(rtn[0][avg], 3) |
---|
2657 | |
---|
2658 | avg = db.easy_name.rating.avg() |
---|
2659 | rtn = db(db.easy_name.id > 0).select(avg) |
---|
2660 | self.assertEqual(rtn[0][avg], 2) |
---|
2661 | |
---|
2662 | rname = "this_is_the_person_name" |
---|
2663 | db.define_table( |
---|
2664 | "person", |
---|
2665 | Field("id", type="id", rname="fooid"), |
---|
2666 | Field("name", default="Michael", rname=rname), |
---|
2667 | Field("uuid"), |
---|
2668 | ) |
---|
2669 | rname = "this_is_the_pet_name" |
---|
2670 | db.define_table( |
---|
2671 | "pet", Field("friend", "reference person"), Field("name", rname=rname) |
---|
2672 | ) |
---|
2673 | michael = db.person.insert() # default insert |
---|
2674 | john = db.person.insert(name="John") |
---|
2675 | luke = db.person.insert(name="Luke") |
---|
2676 | |
---|
2677 | # michael owns Phippo |
---|
2678 | phippo = db.pet.insert(friend=michael, name="Phippo") |
---|
2679 | # john owns Dunstin and Gertie |
---|
2680 | dunstin = db.pet.insert(friend=john, name="Dunstin") |
---|
2681 | gertie = db.pet.insert(friend=john, name="Gertie") |
---|
2682 | |
---|
2683 | rtn = db(db.person.id == db.pet.friend).select(orderby=db.person.id | db.pet.id) |
---|
2684 | self.assertEqual(len(rtn), 3) |
---|
2685 | self.assertEqual(rtn[0].person.id, michael) |
---|
2686 | self.assertEqual(rtn[0].person.name, "Michael") |
---|
2687 | self.assertEqual(rtn[0].pet.id, phippo) |
---|
2688 | self.assertEqual(rtn[0].pet.name, "Phippo") |
---|
2689 | self.assertEqual(rtn[1].person.id, john) |
---|
2690 | self.assertEqual(rtn[1].person.name, "John") |
---|
2691 | self.assertEqual(rtn[1].pet.name, "Dunstin") |
---|
2692 | self.assertEqual(rtn[2].pet.name, "Gertie") |
---|
2693 | # fetch owners, eventually with pet |
---|
2694 | # main point is retrieving Luke with no pets |
---|
2695 | rtn = db(db.person.id > 0).select( |
---|
2696 | orderby=db.person.id | db.pet.id, |
---|
2697 | left=db.pet.on(db.person.id == db.pet.friend), |
---|
2698 | ) |
---|
2699 | self.assertEqual(rtn[0].person.id, michael) |
---|
2700 | self.assertEqual(rtn[0].person.name, "Michael") |
---|
2701 | self.assertEqual(rtn[0].pet.id, phippo) |
---|
2702 | self.assertEqual(rtn[0].pet.name, "Phippo") |
---|
2703 | self.assertEqual(rtn[3].person.name, "Luke") |
---|
2704 | self.assertEqual(rtn[3].person.id, luke) |
---|
2705 | self.assertEqual(rtn[3].pet.name, None) |
---|
2706 | # lets test a subquery |
---|
2707 | subq = db(db.pet.name == "Gertie")._select(db.pet.friend) |
---|
2708 | rtn = db(db.person.id.belongs(subq)).select() |
---|
2709 | self.assertEqual(rtn[0].id, 2) |
---|
2710 | self.assertEqual(rtn[0]("person.name"), "John") |
---|
2711 | # as dict |
---|
2712 | rtn = db(db.person.id > 0).select().as_dict() |
---|
2713 | self.assertEqual(rtn[1]["name"], "Michael") |
---|
2714 | # as list |
---|
2715 | rtn = db(db.person.id > 0).select().as_list() |
---|
2716 | self.assertEqual(rtn[0]["name"], "Michael") |
---|
2717 | # isempty |
---|
2718 | rtn = db(db.person.id > 0).isempty() |
---|
2719 | self.assertEqual(rtn, False) |
---|
2720 | # join argument |
---|
2721 | rtn = db(db.person).select( |
---|
2722 | orderby=db.person.id | db.pet.id, |
---|
2723 | join=db.pet.on(db.person.id == db.pet.friend), |
---|
2724 | ) |
---|
2725 | self.assertEqual(len(rtn), 3) |
---|
2726 | self.assertEqual(rtn[0].person.id, michael) |
---|
2727 | self.assertEqual(rtn[0].person.name, "Michael") |
---|
2728 | self.assertEqual(rtn[0].pet.id, phippo) |
---|
2729 | self.assertEqual(rtn[0].pet.name, "Phippo") |
---|
2730 | self.assertEqual(rtn[1].person.id, john) |
---|
2731 | self.assertEqual(rtn[1].person.name, "John") |
---|
2732 | self.assertEqual(rtn[1].pet.name, "Dunstin") |
---|
2733 | self.assertEqual(rtn[2].pet.name, "Gertie") |
---|
2734 | |
---|
2735 | # aliases |
---|
2736 | rname = "the_cub_name" |
---|
2737 | if DEFAULT_URI.startswith("mssql"): |
---|
2738 | # multiple cascade gotcha |
---|
2739 | for key in ["reference", "reference FK"]: |
---|
2740 | db._adapter.types[key] = db._adapter.types[key].replace( |
---|
2741 | "%(on_delete_action)s", "NO ACTION" |
---|
2742 | ) |
---|
2743 | db.define_table( |
---|
2744 | "pet_farm", |
---|
2745 | Field("name", rname=rname), |
---|
2746 | Field("father", "reference pet_farm"), |
---|
2747 | Field("mother", "reference pet_farm"), |
---|
2748 | ) |
---|
2749 | |
---|
2750 | minali = db.pet_farm.insert(name="Minali") |
---|
2751 | osbert = db.pet_farm.insert(name="Osbert") |
---|
2752 | # they had a cub |
---|
2753 | selina = db.pet_farm.insert(name="Selina", father=osbert, mother=minali) |
---|
2754 | |
---|
2755 | father = db.pet_farm.with_alias("father") |
---|
2756 | mother = db.pet_farm.with_alias("mother") |
---|
2757 | |
---|
2758 | # fetch pets with relatives |
---|
2759 | rtn = db().select( |
---|
2760 | db.pet_farm.name, |
---|
2761 | father.name, |
---|
2762 | mother.name, |
---|
2763 | left=[ |
---|
2764 | father.on(father.id == db.pet_farm.father), |
---|
2765 | mother.on(mother.id == db.pet_farm.mother), |
---|
2766 | ], |
---|
2767 | orderby=db.pet_farm.id, |
---|
2768 | ) |
---|
2769 | |
---|
2770 | self.assertEqual(len(rtn), 3) |
---|
2771 | self.assertEqual(rtn[0].pet_farm.name, "Minali") |
---|
2772 | self.assertEqual(rtn[0].father.name, None) |
---|
2773 | self.assertEqual(rtn[0].mother.name, None) |
---|
2774 | self.assertEqual(rtn[1].pet_farm.name, "Osbert") |
---|
2775 | self.assertEqual(rtn[2].pet_farm.name, "Selina") |
---|
2776 | self.assertEqual(rtn[2].father.name, "Osbert") |
---|
2777 | self.assertEqual(rtn[2].mother.name, "Minali") |
---|
2778 | |
---|
2779 | def testRun(self): |
---|
2780 | db = self.connect() |
---|
2781 | rname = "a_very_complicated_fieldname" |
---|
2782 | for ft in ["string", "text", "password", "upload", "blob"]: |
---|
2783 | db.define_table("tt", Field("aa", ft, default="", rname=rname)) |
---|
2784 | self.assertEqual(db.tt.insert(aa="x"), 1) |
---|
2785 | if not IS_ORACLE: |
---|
2786 | self.assertEqual(db().select(db.tt.aa)[0].aa, "x") |
---|
2787 | db.tt.drop() |
---|
2788 | db.define_table("tt", Field("aa", "integer", default=1, rname=rname)) |
---|
2789 | self.assertEqual(db.tt.insert(aa=3), 1) |
---|
2790 | self.assertEqual(db().select(db.tt.aa)[0].aa, 3) |
---|
2791 | db.tt.drop() |
---|
2792 | db.define_table("tt", Field("aa", "double", default=1, rname=rname)) |
---|
2793 | self.assertEqual(db.tt.insert(aa=3.1), 1) |
---|
2794 | self.assertEqual(db().select(db.tt.aa)[0].aa, 3.1) |
---|
2795 | db.tt.drop() |
---|
2796 | db.define_table("tt", Field("aa", "boolean", default=True, rname=rname)) |
---|
2797 | self.assertEqual(db.tt.insert(aa=True), 1) |
---|
2798 | self.assertEqual(db().select(db.tt.aa)[0].aa, True) |
---|
2799 | db.tt.drop() |
---|
2800 | if not IS_ORACLE: |
---|
2801 | db.define_table("tt", Field("aa", "json", default={}, rname=rname)) |
---|
2802 | self.assertEqual(db.tt.insert(aa={}), 1) |
---|
2803 | self.assertEqual(db().select(db.tt.aa)[0].aa, {}) |
---|
2804 | db.tt.drop() |
---|
2805 | db.define_table( |
---|
2806 | "tt", Field("aa", "date", default=datetime.date.today(), rname=rname) |
---|
2807 | ) |
---|
2808 | t0 = datetime.date.today() |
---|
2809 | self.assertEqual(db.tt.insert(aa=t0), 1) |
---|
2810 | self.assertEqual(db().select(db.tt.aa)[0].aa, t0) |
---|
2811 | db.tt.drop() |
---|
2812 | db.define_table( |
---|
2813 | "tt", |
---|
2814 | Field("aa", "datetime", default=datetime.datetime.today(), rname=rname), |
---|
2815 | ) |
---|
2816 | t0 = datetime.datetime(1971, 12, 21, 10, 30, 55, 0,) |
---|
2817 | self.assertEqual(db.tt.insert(aa=t0), 1) |
---|
2818 | self.assertEqual(db().select(db.tt.aa)[0].aa, t0) |
---|
2819 | |
---|
2820 | ## Row APIs |
---|
2821 | row = db().select(db.tt.aa)[0] |
---|
2822 | self.assertEqual(db.tt[1].aa, t0) |
---|
2823 | self.assertEqual(db.tt["aa"], db.tt.aa) |
---|
2824 | self.assertEqual(db.tt(1).aa, t0) |
---|
2825 | self.assertTrue(db.tt(1, aa=None) == None) |
---|
2826 | self.assertFalse(db.tt(1, aa=t0) == None) |
---|
2827 | self.assertEqual(row.aa, t0) |
---|
2828 | self.assertEqual(row["aa"], t0) |
---|
2829 | self.assertEqual(row["tt.aa"], t0) |
---|
2830 | self.assertEqual(row("tt.aa"), t0) |
---|
2831 | self.assertTrue("aa" in row) |
---|
2832 | self.assertTrue("pydal" not in row) |
---|
2833 | self.assertTrue(hasattr(row, "aa")) |
---|
2834 | self.assertFalse(hasattr(row, "pydal")) |
---|
2835 | |
---|
2836 | ## Lazy and Virtual fields |
---|
2837 | db.tt.b = Field.Virtual(lambda row: row.tt.aa) |
---|
2838 | db.tt.c = Field.Lazy(lambda row: row.tt.aa) |
---|
2839 | row = db().select(db.tt.aa)[0] |
---|
2840 | self.assertEqual(row.b, t0) |
---|
2841 | self.assertEqual(row.c(), t0) |
---|
2842 | |
---|
2843 | db.tt.drop() |
---|
2844 | db.define_table("tt", Field("aa", "time", default="11:30", rname=rname)) |
---|
2845 | t0 = datetime.time(10, 30, 55) |
---|
2846 | self.assertEqual(db.tt.insert(aa=t0), 1) |
---|
2847 | self.assertEqual(db().select(db.tt.aa)[0].aa, t0) |
---|
2848 | |
---|
2849 | def testInsert(self): |
---|
2850 | db = self.connect() |
---|
2851 | rname = "a_very_complicated_fieldname" |
---|
2852 | db.define_table("tt", Field("aa", rname=rname)) |
---|
2853 | self.assertEqual(db.tt.insert(aa="1"), 1) |
---|
2854 | self.assertEqual(db.tt.insert(aa="1"), 2) |
---|
2855 | self.assertEqual(db.tt.insert(aa="1"), 3) |
---|
2856 | self.assertEqual(db(db.tt.aa == "1").count(), 3) |
---|
2857 | self.assertEqual(db(db.tt.aa == "2").isempty(), True) |
---|
2858 | self.assertEqual(db(db.tt.aa == "1").update(aa="2"), 3) |
---|
2859 | self.assertEqual(db(db.tt.aa == "2").count(), 3) |
---|
2860 | self.assertEqual(db(db.tt.aa == "2").isempty(), False) |
---|
2861 | self.assertEqual(db(db.tt.aa == "2").delete(), 3) |
---|
2862 | self.assertEqual(db(db.tt.aa == "2").isempty(), True) |
---|
2863 | |
---|
2864 | def testJoin(self): |
---|
2865 | db = self.connect() |
---|
2866 | rname = "this_is_field_aa" |
---|
2867 | rname2 = "this_is_field_b" |
---|
2868 | db.define_table("t1", Field("aa", rname=rname)) |
---|
2869 | db.define_table("t2", Field("aa", rname=rname), Field("b", db.t1, rname=rname2)) |
---|
2870 | i1 = db.t1.insert(aa="1") |
---|
2871 | i2 = db.t1.insert(aa="2") |
---|
2872 | i3 = db.t1.insert(aa="3") |
---|
2873 | db.t2.insert(aa="4", b=i1) |
---|
2874 | db.t2.insert(aa="5", b=i2) |
---|
2875 | db.t2.insert(aa="6", b=i2) |
---|
2876 | self.assertEqual( |
---|
2877 | len(db(db.t1.id == db.t2.b).select(orderby=db.t1.aa | db.t2.aa)), 3 |
---|
2878 | ) |
---|
2879 | self.assertEqual( |
---|
2880 | db(db.t1.id == db.t2.b).select(orderby=db.t1.aa | db.t2.aa)[2].t1.aa, "2" |
---|
2881 | ) |
---|
2882 | self.assertEqual( |
---|
2883 | db(db.t1.id == db.t2.b).select(orderby=db.t1.aa | db.t2.aa)[2].t2.aa, "6" |
---|
2884 | ) |
---|
2885 | self.assertEqual( |
---|
2886 | len( |
---|
2887 | db().select( |
---|
2888 | db.t1.ALL, |
---|
2889 | db.t2.ALL, |
---|
2890 | left=db.t2.on(db.t1.id == db.t2.b), |
---|
2891 | orderby=db.t1.aa | db.t2.aa, |
---|
2892 | ) |
---|
2893 | ), |
---|
2894 | 4, |
---|
2895 | ) |
---|
2896 | self.assertEqual( |
---|
2897 | db() |
---|
2898 | .select( |
---|
2899 | db.t1.ALL, |
---|
2900 | db.t2.ALL, |
---|
2901 | left=db.t2.on(db.t1.id == db.t2.b), |
---|
2902 | orderby=db.t1.aa | db.t2.aa, |
---|
2903 | )[2] |
---|
2904 | .t1.aa, |
---|
2905 | "2", |
---|
2906 | ) |
---|
2907 | self.assertEqual( |
---|
2908 | db() |
---|
2909 | .select( |
---|
2910 | db.t1.ALL, |
---|
2911 | db.t2.ALL, |
---|
2912 | left=db.t2.on(db.t1.id == db.t2.b), |
---|
2913 | orderby=db.t1.aa | db.t2.aa, |
---|
2914 | )[2] |
---|
2915 | .t2.aa, |
---|
2916 | "6", |
---|
2917 | ) |
---|
2918 | self.assertEqual( |
---|
2919 | db() |
---|
2920 | .select( |
---|
2921 | db.t1.ALL, |
---|
2922 | db.t2.ALL, |
---|
2923 | left=db.t2.on(db.t1.id == db.t2.b), |
---|
2924 | orderby=db.t1.aa | db.t2.aa, |
---|
2925 | )[3] |
---|
2926 | .t1.aa, |
---|
2927 | "3", |
---|
2928 | ) |
---|
2929 | self.assertEqual( |
---|
2930 | db() |
---|
2931 | .select( |
---|
2932 | db.t1.ALL, |
---|
2933 | db.t2.ALL, |
---|
2934 | left=db.t2.on(db.t1.id == db.t2.b), |
---|
2935 | orderby=db.t1.aa | db.t2.aa, |
---|
2936 | )[3] |
---|
2937 | .t2.aa, |
---|
2938 | None, |
---|
2939 | ) |
---|
2940 | self.assertEqual( |
---|
2941 | len( |
---|
2942 | db().select( |
---|
2943 | db.t1.aa, |
---|
2944 | db.t2.id.count(), |
---|
2945 | left=db.t2.on(db.t1.id == db.t2.b), |
---|
2946 | orderby=db.t1.aa, |
---|
2947 | groupby=db.t1.aa, |
---|
2948 | ) |
---|
2949 | ), |
---|
2950 | 3, |
---|
2951 | ) |
---|
2952 | self.assertEqual( |
---|
2953 | db() |
---|
2954 | .select( |
---|
2955 | db.t1.aa, |
---|
2956 | db.t2.id.count(), |
---|
2957 | left=db.t2.on(db.t1.id == db.t2.b), |
---|
2958 | orderby=db.t1.aa, |
---|
2959 | groupby=db.t1.aa, |
---|
2960 | )[0] |
---|
2961 | ._extra[db.t2.id.count()], |
---|
2962 | 1, |
---|
2963 | ) |
---|
2964 | self.assertEqual( |
---|
2965 | db() |
---|
2966 | .select( |
---|
2967 | db.t1.aa, |
---|
2968 | db.t2.id.count(), |
---|
2969 | left=db.t2.on(db.t1.id == db.t2.b), |
---|
2970 | orderby=db.t1.aa, |
---|
2971 | groupby=db.t1.aa, |
---|
2972 | )[1] |
---|
2973 | ._extra[db.t2.id.count()], |
---|
2974 | 2, |
---|
2975 | ) |
---|
2976 | self.assertEqual( |
---|
2977 | db() |
---|
2978 | .select( |
---|
2979 | db.t1.aa, |
---|
2980 | db.t2.id.count(), |
---|
2981 | left=db.t2.on(db.t1.id == db.t2.b), |
---|
2982 | orderby=db.t1.aa, |
---|
2983 | groupby=db.t1.aa, |
---|
2984 | )[2] |
---|
2985 | ._extra[db.t2.id.count()], |
---|
2986 | 0, |
---|
2987 | ) |
---|
2988 | db.t2.drop() |
---|
2989 | db.t1.drop() |
---|
2990 | |
---|
2991 | db.define_table("person", Field("name", rname=rname)) |
---|
2992 | id = db.person.insert(name="max") |
---|
2993 | self.assertEqual(id.name, "max") |
---|
2994 | db.define_table( |
---|
2995 | "dog", |
---|
2996 | Field("name", rname=rname), |
---|
2997 | Field("ownerperson", "reference person", rname=rname2), |
---|
2998 | ) |
---|
2999 | db.dog.insert(name="skipper", ownerperson=1) |
---|
3000 | row = db(db.person.id == db.dog.ownerperson).select().first() |
---|
3001 | self.assertEqual(row[db.person.name], "max") |
---|
3002 | self.assertEqual(row["person.name"], "max") |
---|
3003 | db.dog.drop() |
---|
3004 | self.assertEqual(len(db.person._referenced_by), 0) |
---|
3005 | |
---|
3006 | def testTFK(self): |
---|
3007 | db = self.connect() |
---|
3008 | if "reference TFK" not in db._adapter.types: |
---|
3009 | self.skipTest("Adapter does not support TFK references") |
---|
3010 | db.define_table( |
---|
3011 | "t1", |
---|
3012 | Field("id1", type="string", length=1, rname="foo1"), |
---|
3013 | Field("id2", type="integer", rname="foo2"), |
---|
3014 | Field("val", type="integer"), |
---|
3015 | primarykey=["id1", "id2"], |
---|
3016 | ) |
---|
3017 | db.define_table( |
---|
3018 | "t2", |
---|
3019 | Field("ref1", type=db.t1.id1, rname="bar1"), |
---|
3020 | Field("ref2", type=db.t1.id2, rname="bar2"), |
---|
3021 | ) |
---|
3022 | db.t1.insert(id1="a", id2=1, val=10) |
---|
3023 | db.t1.insert(id1="a", id2=2, val=30) |
---|
3024 | db.t2.insert(ref1="a", ref2=1) |
---|
3025 | query = (db.t1.id1 == db.t2.ref1) & (db.t1.id2 == db.t2.ref2) |
---|
3026 | result = db(query).select(db.t1.ALL) |
---|
3027 | self.assertEqual(len(result), 1) |
---|
3028 | self.assertEqual(result[0]["id1"], "a") |
---|
3029 | self.assertEqual(result[0]["id2"], 1) |
---|
3030 | self.assertEqual(result[0]["val"], 10) |
---|
3031 | |
---|
3032 | |
---|
3033 | class TestQuoting(DALtest): |
---|
3034 | |
---|
3035 | # tests for case sensitivity |
---|
3036 | def testCase(self): |
---|
3037 | db = self.connect(ignore_field_case=False, entity_quoting=True) |
---|
3038 | if DEFAULT_URI.startswith("mssql"): |
---|
3039 | # multiple cascade gotcha |
---|
3040 | for key in ["reference", "reference FK"]: |
---|
3041 | db._adapter.types[key] = db._adapter.types[key].replace( |
---|
3042 | "%(on_delete_action)s", "NO ACTION" |
---|
3043 | ) |
---|
3044 | |
---|
3045 | t0 = db.define_table("t0", Field("f", "string")) |
---|
3046 | t1 = db.define_table("b", Field("B", t0), Field("words", "text")) |
---|
3047 | |
---|
3048 | blather = "blah blah and so" |
---|
3049 | t0[None] = {"f": "content"} |
---|
3050 | t1[None] = {"B": int(t0[1]["id"]), "words": blather} |
---|
3051 | |
---|
3052 | r = db(db.t0.id == db.b.B).select() |
---|
3053 | |
---|
3054 | self.assertEqual(r[0].b.words, blather) |
---|
3055 | |
---|
3056 | t1.drop() |
---|
3057 | t0.drop() |
---|
3058 | |
---|
3059 | # test field case |
---|
3060 | try: |
---|
3061 | t0 = db.define_table("table_is_a_test", Field("a_a"), Field("a_A")) |
---|
3062 | except Exception as e: |
---|
3063 | # some db does not support case sensitive field names mysql is one of them. |
---|
3064 | if DEFAULT_URI.startswith("mysql:") or DEFAULT_URI.startswith("sqlite:"): |
---|
3065 | db.rollback() |
---|
3066 | return |
---|
3067 | if "Column names in each table must be unique" in e.args[1]: |
---|
3068 | db.rollback() |
---|
3069 | return |
---|
3070 | raise e |
---|
3071 | |
---|
3072 | t0[None] = dict(a_a="a_a", a_A="a_A") |
---|
3073 | |
---|
3074 | self.assertEqual(t0[1].a_a, "a_a") |
---|
3075 | self.assertEqual(t0[1].a_A, "a_A") |
---|
3076 | |
---|
3077 | def testPKFK(self): |
---|
3078 | |
---|
3079 | # test primary keys |
---|
3080 | |
---|
3081 | db = self.connect(ignore_field_case=False) |
---|
3082 | if DEFAULT_URI.startswith("mssql"): |
---|
3083 | # multiple cascade gotcha |
---|
3084 | for key in ["reference", "reference FK"]: |
---|
3085 | db._adapter.types[key] = db._adapter.types[key].replace( |
---|
3086 | "%(on_delete_action)s", "NO ACTION" |
---|
3087 | ) |
---|
3088 | # test table without surrogate key. Length must is limited to |
---|
3089 | # 100 because of MySQL limitations: it cannot handle more than |
---|
3090 | # 767 bytes in unique keys. |
---|
3091 | |
---|
3092 | t0 = db.define_table("t0", Field("Code", length=100), primarykey=["Code"]) |
---|
3093 | t2 = db.define_table("t2", Field("f"), Field("t0_Code", "reference t0")) |
---|
3094 | t3 = db.define_table( |
---|
3095 | "t3", Field("f", length=100), Field("t0_Code", t0.Code), primarykey=["f"] |
---|
3096 | ) |
---|
3097 | t4 = db.define_table( |
---|
3098 | "t4", Field("f", length=100), Field("t0", t0), primarykey=["f"] |
---|
3099 | ) |
---|
3100 | |
---|
3101 | try: |
---|
3102 | t5 = db.define_table( |
---|
3103 | "t5", |
---|
3104 | Field("f", length=100), |
---|
3105 | Field("t0", "reference no_table_wrong_reference"), |
---|
3106 | primarykey=["f"], |
---|
3107 | ) |
---|
3108 | except Exception as e: |
---|
3109 | self.assertTrue(isinstance(e, KeyError)) |
---|
3110 | |
---|
3111 | if DEFAULT_URI.startswith("mssql"): |
---|
3112 | # there's no drop cascade in mssql |
---|
3113 | t3.drop() |
---|
3114 | t4.drop() |
---|
3115 | t2.drop() |
---|
3116 | t0.drop() |
---|
3117 | else: |
---|
3118 | t0.drop("cascade") |
---|
3119 | t2.drop() |
---|
3120 | t3.drop() |
---|
3121 | t4.drop() |
---|
3122 | |
---|
3123 | def testPKFK2(self): |
---|
3124 | |
---|
3125 | # test reference to reference |
---|
3126 | |
---|
3127 | db = self.connect(ignore_field_case=False) |
---|
3128 | if DEFAULT_URI.startswith("mssql"): |
---|
3129 | # multiple cascade gotcha |
---|
3130 | for key in ["reference", "reference FK"]: |
---|
3131 | db._adapter.types[key] = db._adapter.types[key].replace( |
---|
3132 | "%(on_delete_action)s", "NO ACTION" |
---|
3133 | ) |
---|
3134 | |
---|
3135 | t0 = db.define_table("object_", Field("id", "id")) |
---|
3136 | t1 = db.define_table( |
---|
3137 | "part", Field("id", "reference object_"), primarykey=["id"] |
---|
3138 | ) |
---|
3139 | t2 = db.define_table( |
---|
3140 | "part_rev", |
---|
3141 | Field("id", "reference object_"), |
---|
3142 | Field("part", "reference part"), |
---|
3143 | Field("rev", "integer"), |
---|
3144 | primarykey=["id"], |
---|
3145 | ) |
---|
3146 | id = db.object_.insert() |
---|
3147 | db.part.insert(id=id) |
---|
3148 | id_rev = db.object_.insert() |
---|
3149 | db.part_rev.insert(id=id_rev, part=id, rev=0) |
---|
3150 | result = db(db.part_rev.part == db.part.id).select() |
---|
3151 | self.assertEqual(len(result), 1) |
---|
3152 | self.assertEqual(result[0]["part_rev.id"], id_rev) |
---|
3153 | self.assertEqual(result[0]["part_rev.part"], id) |
---|
3154 | |
---|
3155 | if DEFAULT_URI.startswith(("mssql", "sqlite")): |
---|
3156 | # there's no drop cascade in mssql and it seems there is some problem in sqlite |
---|
3157 | t2.drop() |
---|
3158 | t1.drop() |
---|
3159 | t0.drop() |
---|
3160 | else: |
---|
3161 | t0.drop("cascade") |
---|
3162 | t1.drop("cascade") |
---|
3163 | t2.drop() |
---|
3164 | |
---|
3165 | |
---|
3166 | class TestTableAndFieldCase(unittest.TestCase): |
---|
3167 | """ |
---|
3168 | at the Python level we should not allow db.C and db.c because of .table conflicts on windows |
---|
3169 | but it should be possible to map two different names into distinct tables "c" and "C" at the Python level |
---|
3170 | By default Python models names should be mapped into lower case table names and assume case insensitivity. |
---|
3171 | """ |
---|
3172 | |
---|
3173 | def testme(self): |
---|
3174 | return |
---|
3175 | |
---|
3176 | |
---|
3177 | class TestQuotesByDefault(unittest.TestCase): |
---|
3178 | """ |
---|
3179 | all default tables names should be quoted unless an explicit mapping has been given for a table. |
---|
3180 | """ |
---|
3181 | |
---|
3182 | def testme(self): |
---|
3183 | return |
---|
3184 | |
---|
3185 | class TestGis(DALtest): |
---|
3186 | |
---|
3187 | @unittest.skipIf(True, "WIP") |
---|
3188 | def testGeometry(self): |
---|
3189 | from pydal import geoPoint, geoLine, geoPolygon |
---|
3190 | |
---|
3191 | if not IS_POSTGRESQL: |
---|
3192 | return |
---|
3193 | db = self.connect() |
---|
3194 | t0 = db.define_table("t0", Field("point", "geometry()")) |
---|
3195 | t1 = db.define_table("t1", Field("line", "geometry(public, 4326, 2)")) |
---|
3196 | t2 = db.define_table("t2", Field("polygon", "geometry(public, 4326, 2)")) |
---|
3197 | t0.insert(point=geoPoint(1, 1)) |
---|
3198 | text = ( |
---|
3199 | db(db.t0.id) |
---|
3200 | .select(db.t0.point.st_astext()) |
---|
3201 | .first()[db.t0.point.st_astext()] |
---|
3202 | ) |
---|
3203 | self.assertEqual(text, "POINT(1 1)") |
---|
3204 | t1.insert(line=geoLine((1, 1), (2, 2))) |
---|
3205 | text = ( |
---|
3206 | db(db.t1.id).select(db.t1.line.st_astext()).first()[db.t1.line.st_astext()] |
---|
3207 | ) |
---|
3208 | self.assertEqual(text, "LINESTRING(1 1,2 2)") |
---|
3209 | t2.insert(polygon=geoPolygon((0, 0), (2, 0), (2, 2), (0, 2), (0, 0))) |
---|
3210 | text = ( |
---|
3211 | db(db.t2.id) |
---|
3212 | .select(db.t2.polygon.st_astext()) |
---|
3213 | .first()[db.t2.polygon.st_astext()] |
---|
3214 | ) |
---|
3215 | self.assertEqual(text, "POLYGON((0 0,2 0,2 2,0 2,0 0))") |
---|
3216 | query = t0.point.st_intersects(geoLine((0, 0), (2, 2))) |
---|
3217 | output = db(query).select(db.t0.point).first()[db.t0.point] |
---|
3218 | self.assertEqual(output, "POINT(1 1)") |
---|
3219 | query = t2.polygon.st_contains(geoPoint(1, 1)) |
---|
3220 | n = db(query).count() |
---|
3221 | self.assertEqual(n, 1) |
---|
3222 | x = t0.point.st_x() |
---|
3223 | y = t0.point.st_y() |
---|
3224 | point = db(t0.id).select(x, y).first() |
---|
3225 | self.assertEqual(point[x], 1) |
---|
3226 | self.assertEqual(point[y], 1) |
---|
3227 | |
---|
3228 | @unittest.skipIf(True, "WIP") |
---|
3229 | def testGeometryCase(self): |
---|
3230 | from pydal import geoPoint, geoLine, geoPolygon |
---|
3231 | |
---|
3232 | if not IS_POSTGRESQL: |
---|
3233 | return |
---|
3234 | db = self.connect(ignore_field_case=False) |
---|
3235 | t0 = db.define_table( |
---|
3236 | "t0", Field("point", "geometry()"), Field("Point", "geometry()") |
---|
3237 | ) |
---|
3238 | t0.insert(point=geoPoint(1, 1)) |
---|
3239 | t0.insert(Point=geoPoint(2, 2)) |
---|
3240 | |
---|
3241 | @unittest.skipIf(True, "WIP") |
---|
3242 | def testGisMigration(self): |
---|
3243 | if not IS_POSTGRESQL: |
---|
3244 | return |
---|
3245 | for b in [True, False]: |
---|
3246 | db = DAL(DEFAULT_URI, check_reserved=["all"], ignore_field_case=b) |
---|
3247 | t0 = db.define_table( |
---|
3248 | "t0", |
---|
3249 | Field("Point", "geometry()"), |
---|
3250 | Field("rname_point", "geometry()", rname="foo"), |
---|
3251 | ) |
---|
3252 | db.commit() |
---|
3253 | db.close() |
---|
3254 | db = DAL(DEFAULT_URI, check_reserved=["all"], ignore_field_case=b) |
---|
3255 | t0 = db.define_table("t0", Field("New_point", "geometry()")) |
---|
3256 | t0.drop() |
---|
3257 | db.commit() |
---|
3258 | db.close() |
---|
3259 | |
---|
3260 | |
---|
3261 | @unittest.skipUnless(IS_POSTGRESQL, "Only implemented for postgres for now") |
---|
3262 | class TestJSON(DALtest): |
---|
3263 | def testJSONExpressions(self): |
---|
3264 | db = self.connect() |
---|
3265 | if not hasattr(db._adapter.dialect, "json_key"): |
---|
3266 | return |
---|
3267 | tj = db.define_table("tj", Field("testjson", "json")) |
---|
3268 | rec1 = tj.insert( |
---|
3269 | testjson={ |
---|
3270 | u"a": {u"a1": 2, u"a0": 1}, |
---|
3271 | u"b": 3, |
---|
3272 | u"c": {u"c0": {u"c01": [2, 4]}}, |
---|
3273 | u"str": "foo", |
---|
3274 | } |
---|
3275 | ) |
---|
3276 | rec2 = tj.insert( |
---|
3277 | testjson={ |
---|
3278 | u"a": {u"a1": 2, u"a0": 2}, |
---|
3279 | u"b": 4, |
---|
3280 | u"c": {u"c0": {u"c01": [2, 3]}}, |
---|
3281 | u"str": "bar", |
---|
3282 | } |
---|
3283 | ) |
---|
3284 | rows = db(db.tj.testjson.json_key("a").json_key_value("a0") == 1).select() |
---|
3285 | self.assertEqual(len(rows), 1) |
---|
3286 | self.assertEqual(rows[0].id, rec1) |
---|
3287 | rows = db(db.tj.testjson.json_path_value("{a, a1}") == 2).select() |
---|
3288 | self.assertEqual(len(rows), 2) |
---|
3289 | rows = db(db.tj.testjson.json_path_value("{a, a0}") == 2).select() |
---|
3290 | self.assertEqual(len(rows), 1) |
---|
3291 | self.assertEqual(rows[0].id, rec2) |
---|
3292 | rows = db(db.tj.testjson.json_path_value(r"{str}") == "foo").select() |
---|
3293 | self.assertEqual(len(rows), 1) |
---|
3294 | rows = db(db.tj.testjson.json_contains('{"c": {"c0":{"c01": [2]}}}')).select() |
---|
3295 | self.assertEqual(len(rows), 2) |
---|
3296 | rows = db(db.tj.testjson.json_contains('{"c": {"c0":{"c01": [4]}}}')).select() |
---|
3297 | self.assertEqual(len(rows), 1) |
---|
3298 | rows = db(db.tj.id > 0).select( |
---|
3299 | db.tj.testjson.json_path("{c, c0, c01, 0}").with_alias("first") |
---|
3300 | ) |
---|
3301 | self.assertEqual(rows[0].first, 2) |
---|
3302 | self.assertEqual(rows[1].first, 2) |
---|
3303 | |
---|
3304 | |
---|
3305 | class TestSQLCustomType(DALtest): |
---|
3306 | def testRun(self): |
---|
3307 | db = self.connect() |
---|
3308 | from pydal.helpers.classes import SQLCustomType |
---|
3309 | |
---|
3310 | native_double = "double" |
---|
3311 | native_string = "string" |
---|
3312 | if hasattr(db._adapter, "types"): |
---|
3313 | native_double = db._adapter.types["double"] |
---|
3314 | native_string = db._adapter.types["string"] % {"length": 256} |
---|
3315 | basic_t = SQLCustomType(type="double", native=native_double) |
---|
3316 | basic_t_str = SQLCustomType(type="string", native=native_string) |
---|
3317 | t0 = db.define_table( |
---|
3318 | "t0", Field("price", basic_t), Field("product", basic_t_str) |
---|
3319 | ) |
---|
3320 | r_id = t0.insert(price=None, product=None) |
---|
3321 | row = db(t0.id == r_id).select(t0.ALL).first() |
---|
3322 | self.assertEqual(row["price"], None) |
---|
3323 | self.assertEqual(row["product"], None) |
---|
3324 | r_id = t0.insert(price=1.2, product="car") |
---|
3325 | row = db(t0.id == r_id).select(t0.ALL).first() |
---|
3326 | self.assertEqual(row["price"], 1.2) |
---|
3327 | self.assertEqual(row["product"], "car") |
---|
3328 | t0.drop() |
---|
3329 | |
---|
3330 | import zlib |
---|
3331 | |
---|
3332 | native = "text" |
---|
3333 | if IS_ORACLE: |
---|
3334 | native = "CLOB" |
---|
3335 | compressed = SQLCustomType( |
---|
3336 | type="text", |
---|
3337 | native=native, |
---|
3338 | encoder=(lambda x: zlib.compress(x or "", 1)), |
---|
3339 | decoder=(lambda x: zlib.decompress(x)), |
---|
3340 | ) |
---|
3341 | t1 = db.define_table("t0", Field("cdata", compressed)) |
---|
3342 | # r_id=t1.insert(cdata="car") |
---|
3343 | # row=db(t1.id == r_id).select(t1.ALL).first() |
---|
3344 | # self.assertEqual(row['cdata'], "'car'") |
---|
3345 | |
---|
3346 | |
---|
3347 | class TestLazy(DALtest): |
---|
3348 | def testRun(self): |
---|
3349 | db = self.connect(lazy_tables=True) |
---|
3350 | t0 = db.define_table("t0", Field("name")) |
---|
3351 | self.assertTrue(("t0" in db._LAZY_TABLES.keys())) |
---|
3352 | db.t0.insert(name="1") |
---|
3353 | self.assertFalse(("t0" in db._LAZY_TABLES.keys())) |
---|
3354 | |
---|
3355 | def testLazyGetter(self): |
---|
3356 | db = self.connect(check_reserved=None, lazy_tables=True) |
---|
3357 | db.define_table("tt", Field("value", "integer")) |
---|
3358 | db.define_table( |
---|
3359 | "ttt", Field("value", "integer"), Field("tt_id", "reference tt"), |
---|
3360 | ) |
---|
3361 | # Force table definition |
---|
3362 | db.ttt.value.writable = False |
---|
3363 | idd = db.tt.insert(value=0) |
---|
3364 | db.ttt.insert(tt_id=idd) |
---|
3365 | |
---|
3366 | def testRowNone(self): |
---|
3367 | db = self.connect(check_reserved=None, lazy_tables=True) |
---|
3368 | tt = db.define_table("tt", Field("value", "integer")) |
---|
3369 | db.tt.insert(value=None) |
---|
3370 | row = db(db.tt).select(db.tt.ALL).first() |
---|
3371 | self.assertEqual(row.value, None) |
---|
3372 | self.assertEqual(row[db.tt.value], None) |
---|
3373 | self.assertEqual(row["tt.value"], None) |
---|
3374 | self.assertEqual(row.get("tt.value"), None) |
---|
3375 | self.assertEqual(row["value"], None) |
---|
3376 | self.assertEqual(row.get("value"), None) |
---|
3377 | |
---|
3378 | def testRowExtra(self): |
---|
3379 | db = self.connect(check_reserved=None, lazy_tables=True) |
---|
3380 | if IS_ORACLE: |
---|
3381 | # lazy use is difficult in Oracle if using cased (non-upper) fields |
---|
3382 | tt = db.define_table("tt", Field("VALUE", "integer")) |
---|
3383 | db.tt.insert(VALUE=1) |
---|
3384 | else: |
---|
3385 | tt = db.define_table("tt", Field("value", "integer")) |
---|
3386 | db.tt.insert(value=1) |
---|
3387 | row = db(db.tt).select("value").first() |
---|
3388 | self.assertEqual(row.value, 1) |
---|
3389 | |
---|
3390 | |
---|
3391 | class TestRedefine(unittest.TestCase): |
---|
3392 | def testRun(self): |
---|
3393 | db = DAL(DEFAULT_URI, check_reserved=["all"], lazy_tables=True, migrate=False) |
---|
3394 | db.define_table("t_a", Field("code")) |
---|
3395 | self.assertTrue("code" in db.t_a) |
---|
3396 | self.assertTrue("code" in db["t_a"]) |
---|
3397 | db.define_table("t_a", Field("code_a"), redefine=True) |
---|
3398 | self.assertFalse("code" in db.t_a) |
---|
3399 | self.assertFalse("code" in db["t_a"]) |
---|
3400 | self.assertTrue("code_a" in db.t_a) |
---|
3401 | self.assertTrue("code_a" in db["t_a"]) |
---|
3402 | db.close() |
---|
3403 | |
---|
3404 | |
---|
3405 | class TestUpdateInsert(DALtest): |
---|
3406 | def testRun(self): |
---|
3407 | db = self.connect() |
---|
3408 | t0 = db.define_table("t0", Field("name")) |
---|
3409 | i_id = t0.update_or_insert((t0.id == 1), name="web2py") |
---|
3410 | u_id = t0.update_or_insert((t0.id == i_id), name="web2py2") |
---|
3411 | self.assertTrue(i_id != None) |
---|
3412 | self.assertTrue(u_id == None) |
---|
3413 | self.assertTrue(db(t0).count() == 1) |
---|
3414 | self.assertTrue(db(t0.name == "web2py").count() == 0) |
---|
3415 | self.assertTrue(db(t0.name == "web2py2").count() == 1) |
---|
3416 | |
---|
3417 | |
---|
3418 | class TestBulkInsert(DALtest): |
---|
3419 | def testRun(self): |
---|
3420 | db = self.connect() |
---|
3421 | t0 = db.define_table("t0", Field("name")) |
---|
3422 | global ctr |
---|
3423 | ctr = 0 |
---|
3424 | |
---|
3425 | def test_after_insert(i, r): |
---|
3426 | self.assertIsInstance(i, OpRow) |
---|
3427 | global ctr |
---|
3428 | ctr += 1 |
---|
3429 | return True |
---|
3430 | |
---|
3431 | t0._after_insert.append(test_after_insert) |
---|
3432 | items = [{"name": "web2py_%s" % pos} for pos in range(0, 10, 1)] |
---|
3433 | t0.bulk_insert(items) |
---|
3434 | self.assertTrue(db(t0).count() == len(items)) |
---|
3435 | for pos in range(0, 10, 1): |
---|
3436 | self.assertTrue(db(t0.name == "web2py_%s" % pos).count() == 1) |
---|
3437 | self.assertTrue(ctr == len(items)) |
---|
3438 | |
---|
3439 | |
---|
3440 | class TestRecordVersioning(DALtest): |
---|
3441 | def testRun(self): |
---|
3442 | db = self.connect() |
---|
3443 | db.define_table( |
---|
3444 | "t0", |
---|
3445 | Field("name"), |
---|
3446 | Field("is_active", writable=False, readable=False, default=True), |
---|
3447 | ) |
---|
3448 | db.t0._enable_record_versioning(archive_name="t0_archive") |
---|
3449 | self.assertTrue("t0_archive" in db) |
---|
3450 | i_id = db.t0.insert(name="web2py1") |
---|
3451 | db.t0.insert(name="web2py2") |
---|
3452 | db(db.t0.name == "web2py2").delete() |
---|
3453 | self.assertEqual(len(db(db.t0).select()), 1) |
---|
3454 | self.assertEqual(db(db.t0).count(), 1) |
---|
3455 | db(db.t0.id == i_id).update(name="web2py3") |
---|
3456 | self.assertEqual(len(db(db.t0).select()), 1) |
---|
3457 | self.assertEqual(db(db.t0).count(), 1) |
---|
3458 | self.assertEqual(len(db(db.t0_archive).select()), 2) |
---|
3459 | self.assertEqual(db(db.t0_archive).count(), 2) |
---|
3460 | |
---|
3461 | |
---|
3462 | @unittest.skipIf(IS_SQLITE or IS_NOSQL, "Skip if sqlite or NOSQL since no pools") |
---|
3463 | class TestConnection(unittest.TestCase): |
---|
3464 | def testRun(self): |
---|
3465 | # check connection is no longer active after close |
---|
3466 | db = DAL(DEFAULT_URI, check_reserved=["all"]) |
---|
3467 | connection = db._adapter.connection |
---|
3468 | db.close() |
---|
3469 | if not IS_ORACLE: |
---|
3470 | # newer Oracle versions no longer play well with explicit .close() |
---|
3471 | self.assertRaises(Exception, connection.commit) |
---|
3472 | |
---|
3473 | # check connection are reused with pool_size |
---|
3474 | connections = set() |
---|
3475 | for a in range(10): |
---|
3476 | db2 = DAL(DEFAULT_URI, check_reserved=["all"], pool_size=5) |
---|
3477 | c = db2._adapter.connection |
---|
3478 | connections.add(c) |
---|
3479 | db2.close() |
---|
3480 | self.assertEqual(len(connections), 1) |
---|
3481 | c = connections.pop() |
---|
3482 | c.commit() |
---|
3483 | c.close() |
---|
3484 | # check correct use of pool_size |
---|
3485 | dbs = [] |
---|
3486 | for a in range(10): |
---|
3487 | db3 = DAL(DEFAULT_URI, check_reserved=["all"], pool_size=5) |
---|
3488 | # make sure the connection is stablished |
---|
3489 | db3._adapter.get_connection() |
---|
3490 | dbs.append(db3) |
---|
3491 | for db in dbs: |
---|
3492 | db.close() |
---|
3493 | self.assertEqual(len(db3._adapter.POOLS[DEFAULT_URI]), 5) |
---|
3494 | for c in db3._adapter.POOLS[DEFAULT_URI]: |
---|
3495 | c.close() |
---|
3496 | db3._adapter.POOLS[DEFAULT_URI] = [] |
---|
3497 | # Clean close if a connection is broken (closed explicity) |
---|
3498 | if not IS_ORACLE: |
---|
3499 | for a in range(10): |
---|
3500 | db4 = DAL(DEFAULT_URI, check_reserved=["all"], pool_size=5) |
---|
3501 | db4._adapter.connection.close() |
---|
3502 | db4.close() |
---|
3503 | self.assertEqual(len(db4._adapter.POOLS[DEFAULT_URI]), 0) |
---|
3504 | |
---|
3505 | |
---|
3506 | class TestSerializers(DALtest): |
---|
3507 | def testAsJson(self): |
---|
3508 | db = self.connect() |
---|
3509 | db.define_table("tt", Field("date_field", "datetime")) |
---|
3510 | db.tt.insert(date_field=datetime.datetime.now()) |
---|
3511 | rows = db().select(db.tt.ALL) |
---|
3512 | j = rows.as_json() |
---|
3513 | import json # standard library |
---|
3514 | |
---|
3515 | json.loads(j) |
---|
3516 | |
---|
3517 | def testSelectIterselect(self): |
---|
3518 | db = self.connect() |
---|
3519 | db.define_table("tt", Field("tt")) |
---|
3520 | db.tt.insert(tt="pydal") |
---|
3521 | methods = ["as_dict", "as_csv", "as_json", "as_xml", "as_list"] |
---|
3522 | for method in methods: |
---|
3523 | rows = db(db.tt).select() |
---|
3524 | rowsI = db(db.tt).iterselect() |
---|
3525 | self.assertEqual( |
---|
3526 | getattr(rows, method)(), getattr(rowsI, method)(), "failed %s" % method |
---|
3527 | ) |
---|
3528 | |
---|
3529 | |
---|
3530 | class TestIterselect(DALtest): |
---|
3531 | def testRun(self): |
---|
3532 | db = self.connect() |
---|
3533 | t0 = db.define_table("t0", Field("name")) |
---|
3534 | names = ["web2py", "pydal", "Massimo"] |
---|
3535 | for n in names: |
---|
3536 | t0.insert(name=n) |
---|
3537 | |
---|
3538 | rows = db(db.t0).select(orderby=db.t0.id) |
---|
3539 | for pos, r in enumerate(rows): |
---|
3540 | self.assertEqual(r.name, names[pos]) |
---|
3541 | # Testing basic iteration |
---|
3542 | rows = db(db.t0).iterselect(orderby=db.t0.id) |
---|
3543 | for pos, r in enumerate(rows): |
---|
3544 | self.assertEqual(r.name, names[pos]) |
---|
3545 | # Testing IterRows.first before basic iteration |
---|
3546 | rows = db(db.t0).iterselect(orderby=db.t0.id) |
---|
3547 | self.assertEqual(rows.first().name, names[0]) |
---|
3548 | self.assertEqual(rows.first().name, names[0]) |
---|
3549 | |
---|
3550 | for pos, r in enumerate(rows): |
---|
3551 | self.assertEqual(r.name, names[pos]) |
---|
3552 | # Testing IterRows.__nonzero__ before basic iteration |
---|
3553 | rows = db(db.t0).iterselect(orderby=db.t0.id) |
---|
3554 | if rows: |
---|
3555 | for pos, r in enumerate(rows): |
---|
3556 | self.assertEqual(r.name, names[pos]) |
---|
3557 | |
---|
3558 | # Empty IterRows |
---|
3559 | rows = db(db.t0.name == "IterRows").iterselect(orderby=db.t0.id) |
---|
3560 | self.assertEqual(bool(rows), False) |
---|
3561 | for pos, r in enumerate(rows): |
---|
3562 | self.assertEqual(r.name, names[pos]) |
---|
3563 | |
---|
3564 | # Testing IterRows.__getitem__ |
---|
3565 | rows = db(db.t0).iterselect(orderby=db.t0.id) |
---|
3566 | self.assertEqual(rows[0].name, names[0]) |
---|
3567 | self.assertEqual(rows[1].name, names[1]) |
---|
3568 | # recall the same item |
---|
3569 | self.assertEqual(rows[1].name, names[1]) |
---|
3570 | self.assertEqual(rows[2].name, names[2]) |
---|
3571 | self.assertRaises(IndexError, rows.__getitem__, 1) |
---|
3572 | |
---|
3573 | # Testing IterRows.next() |
---|
3574 | rows = db(db.t0).iterselect(orderby=db.t0.id) |
---|
3575 | for n in names: |
---|
3576 | self.assertEqual(next(rows).name, n) |
---|
3577 | self.assertRaises(StopIteration, next, rows) |
---|
3578 | |
---|
3579 | # Testing IterRows.compact |
---|
3580 | rows = db(db.t0).iterselect(orderby=db.t0.id) |
---|
3581 | rows.compact = False |
---|
3582 | for n in names: |
---|
3583 | self.assertEqual(next(rows).t0.name, n) |
---|
3584 | |
---|
3585 | @unittest.skipIf(IS_MSSQL, "Skip mssql") |
---|
3586 | def testMultiSelect(self): |
---|
3587 | # Iterselect holds the cursors until all elemets have been evaluated |
---|
3588 | # inner queries use new cursors |
---|
3589 | db = self.connect() |
---|
3590 | t0 = db.define_table("t0", Field("name"), Field("name_copy")) |
---|
3591 | db(db.t0).delete() |
---|
3592 | db.commit() |
---|
3593 | names = ["web2py", "pydal", "Massimo"] |
---|
3594 | for n in names: |
---|
3595 | t0.insert(name=n) |
---|
3596 | c = 0 |
---|
3597 | for r in db(db.t0).iterselect(): |
---|
3598 | db.t0.update_or_insert(db.t0.id == r.id, name_copy=r.name) |
---|
3599 | c += 1 |
---|
3600 | |
---|
3601 | self.assertEqual(c, len(names), "The iterator is not looping over all elements") |
---|
3602 | self.assertEqual(db(db.t0).count(), len(names)) |
---|
3603 | c = 0 |
---|
3604 | for x in db(db.t0).iterselect(orderby=db.t0.id): |
---|
3605 | for y in db(db.t0).iterselect(orderby=db.t0.id): |
---|
3606 | db.t0.update_or_insert(db.t0.id == x.id, name_copy=x.name) |
---|
3607 | c += 1 |
---|
3608 | |
---|
3609 | self.assertEqual(c, len(names) * len(names)) |
---|
3610 | self.assertEqual(db(db.t0).count(), len(names)) |
---|
3611 | db._adapter.test_connection() |
---|
3612 | |
---|
3613 | @unittest.skipIf(IS_SQLITE | IS_MSSQL, "Skip sqlite & ms sql") |
---|
3614 | def testMultiSelectWithCommit(self): |
---|
3615 | db = self.connect() |
---|
3616 | t0 = db.define_table("t0", Field("nn", "integer")) |
---|
3617 | for n in xrange(1, 100, 1): |
---|
3618 | t0.insert(nn=n) |
---|
3619 | db.commit() |
---|
3620 | s = db.t0.nn.sum() |
---|
3621 | tot = db(db.t0).select(s).first()[s] |
---|
3622 | c = 0 |
---|
3623 | for r in db(db.t0).iterselect(db.t0.ALL): |
---|
3624 | db.t0.update_or_insert(db.t0.id == r.id, nn=r.nn * 2) |
---|
3625 | db.commit() |
---|
3626 | c += 1 |
---|
3627 | |
---|
3628 | self.assertEqual(c, db(db.t0).count()) |
---|
3629 | self.assertEqual(tot * 2, db(db.t0).select(s).first()[s]) |
---|
3630 | |
---|
3631 | db._adapter.test_connection() |
---|
3632 | |
---|
3633 | |
---|
3634 | if __name__ == "__main__": |
---|
3635 | unittest.main() |
---|
3636 | tearDownModule() |
---|