1 | # -*- coding: utf-8 -*- |
---|
2 | """ |
---|
3 | Unit tests for NoSQL adapters |
---|
4 | """ |
---|
5 | |
---|
6 | from __future__ import print_function |
---|
7 | import sys |
---|
8 | import os |
---|
9 | import glob |
---|
10 | import datetime |
---|
11 | from ._compat import unittest |
---|
12 | |
---|
13 | from pydal._compat import PY2, basestring, StringIO, to_bytes, long |
---|
14 | from pydal import DAL, Field |
---|
15 | from pydal.objects import Table, Query, Expression |
---|
16 | from pydal.helpers.classes import SQLALL, OpRow |
---|
17 | from pydal.exceptions import NotOnNOSQLError |
---|
18 | from ._adapt import DEFAULT_URI, IS_IMAP, drop, IS_GAE, IS_MONGODB, _quote |
---|
19 | |
---|
20 | if IS_IMAP: |
---|
21 | from pydal.adapters import IMAPAdapter |
---|
22 | from pydal.contrib import mockimaplib |
---|
23 | |
---|
24 | IMAPAdapter.driver = mockimaplib |
---|
25 | elif IS_MONGODB: |
---|
26 | from pydal.adapters.mongo import Expansion |
---|
27 | elif IS_GAE: |
---|
28 | # setup GAE dummy database |
---|
29 | from google.appengine.ext import testbed |
---|
30 | |
---|
31 | gaetestbed = testbed.Testbed() |
---|
32 | gaetestbed.activate() |
---|
33 | gaetestbed.init_datastore_v3_stub() |
---|
34 | gaetestbed.init_memcache_stub() |
---|
35 | |
---|
36 | print("Testing against %s engine (%s)" % (DEFAULT_URI.partition(":")[0], DEFAULT_URI)) |
---|
37 | |
---|
38 | ALLOWED_DATATYPES = [ |
---|
39 | "string", |
---|
40 | "text", |
---|
41 | "integer", |
---|
42 | "boolean", |
---|
43 | "double", |
---|
44 | "blob", |
---|
45 | "date", |
---|
46 | "time", |
---|
47 | "datetime", |
---|
48 | "upload", |
---|
49 | "password", |
---|
50 | "json", |
---|
51 | ] |
---|
52 | |
---|
53 | |
---|
54 | def setUpModule(): |
---|
55 | if not IS_IMAP: |
---|
56 | db = DAL(DEFAULT_URI, check_reserved=["all"]) |
---|
57 | |
---|
58 | def clean_table(db, tablename): |
---|
59 | try: |
---|
60 | db.define_table(tablename) |
---|
61 | except Exception as e: |
---|
62 | pass |
---|
63 | try: |
---|
64 | drop(db[tablename]) |
---|
65 | except Exception as e: |
---|
66 | pass |
---|
67 | |
---|
68 | for tablename in [ |
---|
69 | "tt", |
---|
70 | "t0", |
---|
71 | "t1", |
---|
72 | "t2", |
---|
73 | "t3", |
---|
74 | "t4", |
---|
75 | "easy_name", |
---|
76 | "tt_archive", |
---|
77 | "pet_farm", |
---|
78 | "person", |
---|
79 | ]: |
---|
80 | clean_table(db, tablename) |
---|
81 | db.close() |
---|
82 | |
---|
83 | |
---|
84 | def tearDownModule(): |
---|
85 | if os.path.isfile("sql.log"): |
---|
86 | os.unlink("sql.log") |
---|
87 | for a in glob.glob("*.table"): |
---|
88 | os.unlink(a) |
---|
89 | |
---|
90 | |
---|
91 | @unittest.skipIf(not IS_MONGODB, "Skipping MongoDB Tests") |
---|
92 | class TestMongo(unittest.TestCase): |
---|
93 | """ Tests specific to MongoDB, error and side path exercisers, etc |
---|
94 | """ |
---|
95 | |
---|
96 | def testVersionCheck(self): |
---|
97 | driver_args = {"fake_version": "2.9 Phony"} |
---|
98 | with self.assertRaises(Exception): |
---|
99 | db = DAL( |
---|
100 | DEFAULT_URI, attempts=1, check_reserved=["all"], driver_args=driver_args |
---|
101 | ) |
---|
102 | |
---|
103 | def testRun(self): |
---|
104 | db = DAL(DEFAULT_URI, check_reserved=["all"]) |
---|
105 | db.define_table("tt", Field("aa", "reference")) |
---|
106 | with self.assertRaises(ValueError): |
---|
107 | db.tt.insert(aa="x") |
---|
108 | with self.assertRaises(ValueError): |
---|
109 | db.tt.insert(aa="_") |
---|
110 | with self.assertRaises(TypeError): |
---|
111 | db.tt.insert(aa=3.1) |
---|
112 | self.assertEqual(isinstance(db.tt.insert(aa="<random>"), long), True) |
---|
113 | self.assertEqual(isinstance(db.tt.insert(aa="1"), long), True) |
---|
114 | self.assertEqual(isinstance(db.tt.insert(aa="0x1"), long), True) |
---|
115 | with self.assertRaises(RuntimeError): |
---|
116 | db(db.tt.aa + 1 == 1).update(aa=0) |
---|
117 | drop(db.tt) |
---|
118 | |
---|
119 | db.define_table("tt", Field("aa", "date")) |
---|
120 | self.assertEqual(isinstance(db.tt.insert(aa=None), long), True) |
---|
121 | self.assertEqual(db().select(db.tt.aa)[0].aa, None) |
---|
122 | drop(db.tt) |
---|
123 | |
---|
124 | db.define_table("tt", Field("aa", "time")) |
---|
125 | self.assertEqual(isinstance(db.tt.insert(aa=None), long), True) |
---|
126 | self.assertEqual(db().select(db.tt.aa)[0].aa, None) |
---|
127 | with self.assertRaises(RuntimeError): |
---|
128 | db(db.tt.aa <= None).count() |
---|
129 | with self.assertRaises(NotImplementedError): |
---|
130 | db._adapter.select( |
---|
131 | Query(db, db._adapter.dialect.aggregate, db.tt.aa, "UNKNOWN"), |
---|
132 | [db.tt.aa], |
---|
133 | {}, |
---|
134 | ) |
---|
135 | with self.assertRaises(NotImplementedError): |
---|
136 | db._adapter.select( |
---|
137 | Expression( |
---|
138 | db, db._adapter.dialect.extract, db.tt.aa, "UNKNOWN", "integer" |
---|
139 | ), |
---|
140 | [db.tt.aa], |
---|
141 | {}, |
---|
142 | ) |
---|
143 | drop(db.tt) |
---|
144 | |
---|
145 | db.define_table("tt", Field("aa", "integer")) |
---|
146 | case = (db.tt.aa == 0).case(db.tt.aa + 2) |
---|
147 | with self.assertRaises(SyntaxError): |
---|
148 | db(case).count() |
---|
149 | drop(db.tt) |
---|
150 | |
---|
151 | db.define_table( |
---|
152 | "tt", Field("aa"), Field("bb", "integer"), Field("cc", "list:integer") |
---|
153 | ) |
---|
154 | db.tt.insert(aa="aa") |
---|
155 | |
---|
156 | with self.assertRaises(NotImplementedError): |
---|
157 | db((db.tt.aa + 1).contains(db.tt.aa)).count() |
---|
158 | with self.assertRaises(NotImplementedError): |
---|
159 | db(db.tt.cc.contains(db.tt.aa)).count() |
---|
160 | with self.assertRaises(NotImplementedError): |
---|
161 | db(db.tt.aa.contains(db.tt.cc)).count() |
---|
162 | with self.assertRaises(NotImplementedError): |
---|
163 | db(db.tt.aa.contains(1.0)).count() |
---|
164 | with self.assertRaises(NotImplementedError): |
---|
165 | db().select(db.tt.aa.lower()[4:-1]).first() |
---|
166 | with self.assertRaises(NotOnNOSQLError): |
---|
167 | db(db.tt.aa.belongs(db()._select(db.tt.aa))).count() |
---|
168 | with self.assertRaises(RuntimeError): |
---|
169 | db(db.tt.aa.lower()).update(aa="bb") |
---|
170 | with self.assertRaises(NotImplementedError): |
---|
171 | db(db.tt).select(orderby="<random>") |
---|
172 | with self.assertRaises(RuntimeError): |
---|
173 | db().select() |
---|
174 | with self.assertRaises(RuntimeError): |
---|
175 | Expansion( |
---|
176 | db._adapter, |
---|
177 | "delete", |
---|
178 | Query(db, db._adapter.dialect.eq, db.tt.aa, "x"), |
---|
179 | [True], |
---|
180 | ) |
---|
181 | with self.assertRaises(RuntimeError): |
---|
182 | Expansion( |
---|
183 | db._adapter, |
---|
184 | "delete", |
---|
185 | Query(db, db._adapter.dialect.eq, db.tt.aa, "x"), |
---|
186 | [True], |
---|
187 | ) |
---|
188 | with self.assertRaises(RuntimeError): |
---|
189 | expanded = Expansion( |
---|
190 | db._adapter, |
---|
191 | "count", |
---|
192 | Query(db, db._adapter.dialect.eq, db.tt.aa, "x"), |
---|
193 | [True], |
---|
194 | ) |
---|
195 | expanded = Expansion( |
---|
196 | db._adapter, "count", Query(db, db._adapter.dialect.eq, db.tt.aa, "x"), [] |
---|
197 | ) |
---|
198 | self.assertEqual(db._adapter.expand(expanded).query_dict, {"aa": "x"}) |
---|
199 | |
---|
200 | if db._adapter.server_version_major >= 2.6: |
---|
201 | with self.assertRaises(RuntimeError): |
---|
202 | db(db.tt).update(id=1) |
---|
203 | else: |
---|
204 | db(db.tt).update(id=1) |
---|
205 | self.assertNotEqual(db(db.tt.aa == "aa").select(db.tt.id).response[0][0], 1) |
---|
206 | drop(db.tt) |
---|
207 | |
---|
208 | db.close() |
---|
209 | |
---|
210 | for safe in [False, True, False]: |
---|
211 | db = DAL(DEFAULT_URI, check_reserved=["all"]) |
---|
212 | db.define_table("tt", Field("aa")) |
---|
213 | self.assertEqual(isinstance(db.tt.insert(aa="x"), long), True) |
---|
214 | with self.assertRaises(RuntimeError): |
---|
215 | db._adapter.delete(db["tt"], "x", safe=safe) |
---|
216 | self.assertEqual( |
---|
217 | db._adapter.delete( |
---|
218 | db["tt"], |
---|
219 | Query(db, db._adapter.dialect.eq, db.tt.aa, "x"), |
---|
220 | safe=safe, |
---|
221 | ), |
---|
222 | 1, |
---|
223 | ) |
---|
224 | self.assertEqual(db(db.tt.aa == "x").count(), 0) |
---|
225 | self.assertEqual( |
---|
226 | db._adapter.update( |
---|
227 | db["tt"], |
---|
228 | Query(db, db._adapter.dialect.eq, db.tt.aa, "x"), |
---|
229 | db["tt"]._fields_and_values_for_update({"aa": "x"}).op_values(), |
---|
230 | safe=safe, |
---|
231 | ), |
---|
232 | 0, |
---|
233 | ) |
---|
234 | drop(db.tt) |
---|
235 | db.close() |
---|
236 | |
---|
237 | def testJoin(self): |
---|
238 | db = DAL(DEFAULT_URI, check_reserved=["all"]) |
---|
239 | db.define_table("tt", Field("aa", "integer"), Field("b", "reference tt")) |
---|
240 | i1 = db.tt.insert(aa=1) |
---|
241 | db.tt.insert(aa=4, b=i1) |
---|
242 | q = db.tt.b == db.tt.id |
---|
243 | with self.assertRaises(NotOnNOSQLError): |
---|
244 | db(db.tt).select(left=db.tt.on(q)) |
---|
245 | with self.assertRaises(NotOnNOSQLError): |
---|
246 | db(db.tt).select(join=db.tt.on(q)) |
---|
247 | with self.assertRaises(NotOnNOSQLError): |
---|
248 | db(db.tt).select(db.tt.on(q)) |
---|
249 | with self.assertRaises(TypeError): |
---|
250 | db(db.tt).select(UNKNOWN=True) |
---|
251 | db(db.tt).select(for_update=True) |
---|
252 | self.assertEqual(db(db.tt).count(), 2) |
---|
253 | db.tt.truncate() |
---|
254 | self.assertEqual(db(db.tt).count(), 0) |
---|
255 | drop(db.tt) |
---|
256 | db.close() |
---|
257 | |
---|
258 | |
---|
259 | @unittest.skipIf(IS_IMAP, "Skip IMAP") |
---|
260 | class TestFields(unittest.TestCase): |
---|
261 | def testFieldName(self): |
---|
262 | """ |
---|
263 | - a "str" something |
---|
264 | - not a method or property of Table |
---|
265 | - "dotted-notation" friendly: |
---|
266 | - a valid python identifier |
---|
267 | - not a python keyword |
---|
268 | - not starting with underscore or an integer |
---|
269 | - not containing dots |
---|
270 | |
---|
271 | Basically, anything alphanumeric, no symbols, only underscore as |
---|
272 | punctuation |
---|
273 | """ |
---|
274 | |
---|
275 | # Check that Fields cannot start with underscores |
---|
276 | self.assertRaises(SyntaxError, Field, "_abc", "string") |
---|
277 | |
---|
278 | # Check that Fields cannot contain punctuation other than underscores |
---|
279 | self.assertRaises(SyntaxError, Field, "a.bc", "string") |
---|
280 | |
---|
281 | # Check that Fields cannot be a name of a method or property of Table |
---|
282 | for x in ["drop", "on", "truncate"]: |
---|
283 | self.assertRaises(SyntaxError, Field, x, "string") |
---|
284 | |
---|
285 | # Check that Fields allows underscores in the body of a field name. |
---|
286 | self.assertTrue( |
---|
287 | Field("a_bc", "string"), |
---|
288 | "Field isn't allowing underscores in fieldnames. It should.", |
---|
289 | ) |
---|
290 | |
---|
291 | # Check that Field names don't allow a python keyword |
---|
292 | self.assertRaises(SyntaxError, Field, "True", "string") |
---|
293 | self.assertRaises(SyntaxError, Field, "elif", "string") |
---|
294 | self.assertRaises(SyntaxError, Field, "while", "string") |
---|
295 | |
---|
296 | # Check that Field names don't allow a non-valid python identifier |
---|
297 | non_valid_examples = ["1x", "xx$%@%", "xx yy", "yy\na", "yy\n"] |
---|
298 | for a in non_valid_examples: |
---|
299 | self.assertRaises(SyntaxError, Field, a, "string") |
---|
300 | |
---|
301 | # Check that Field names don't allow a unicode string |
---|
302 | non_valid_examples = non_valid_examples = [ |
---|
303 | "ℙƴ☂ℌøἤ", |
---|
304 | u"ℙƴ☂ℌøἤ", |
---|
305 | u"àè", |
---|
306 | u"ṧøмℯ", |
---|
307 | u"тεṧт", |
---|
308 | u"♥αłüℯṧ", |
---|
309 | u"ℊεᾔ℮яαт℮∂", |
---|
310 | u"♭ƴ", |
---|
311 | u"ᾔ☤ρℌℓ☺ḓ", |
---|
312 | ] |
---|
313 | for a in non_valid_examples: |
---|
314 | self.assertRaises(SyntaxError, Field, a, "string") |
---|
315 | |
---|
316 | def testFieldTypes(self): |
---|
317 | |
---|
318 | # Check that string, and password default length is 512 |
---|
319 | for typ in ["string", "password"]: |
---|
320 | self.assertTrue( |
---|
321 | Field("abc", typ).length == 512, |
---|
322 | "Default length for type '%s' is not 512 or 255" % typ, |
---|
323 | ) |
---|
324 | |
---|
325 | # Check that upload default length is 512 |
---|
326 | self.assertTrue( |
---|
327 | Field("abc", "upload").length == 512, |
---|
328 | "Default length for type 'upload' is not 512", |
---|
329 | ) |
---|
330 | |
---|
331 | # Check that Tables passed in the type creates a reference |
---|
332 | self.assertTrue( |
---|
333 | Field("abc", Table(None, "temp")).type == "reference temp", |
---|
334 | "Passing a Table does not result in a reference type.", |
---|
335 | ) |
---|
336 | |
---|
337 | def testFieldLabels(self): |
---|
338 | |
---|
339 | # Check that a label is successfully built from the supplied fieldname |
---|
340 | self.assertTrue( |
---|
341 | Field("abc", "string").label == "Abc", "Label built is incorrect" |
---|
342 | ) |
---|
343 | self.assertTrue( |
---|
344 | Field("abc_def", "string").label == "Abc Def", "Label built is incorrect" |
---|
345 | ) |
---|
346 | |
---|
347 | def testFieldFormatters(self): # Formatter should be called Validator |
---|
348 | |
---|
349 | # Test the default formatters |
---|
350 | for typ in ALLOWED_DATATYPES: |
---|
351 | f = Field("abc", typ) |
---|
352 | if typ not in ["date", "time", "datetime"]: |
---|
353 | isinstance(f.formatter("test"), str) |
---|
354 | else: |
---|
355 | isinstance(f.formatter(datetime.datetime.now()), str) |
---|
356 | |
---|
357 | def testRun(self): |
---|
358 | db = DAL(DEFAULT_URI, check_reserved=["all"]) |
---|
359 | import pickle |
---|
360 | |
---|
361 | # some db's only support milliseconds |
---|
362 | datetime_datetime_today = datetime.datetime.today() |
---|
363 | datetime_datetime_today = datetime_datetime_today.replace( |
---|
364 | microsecond=datetime_datetime_today.microsecond |
---|
365 | - datetime_datetime_today.microsecond % 1000 |
---|
366 | ) |
---|
367 | |
---|
368 | insert_vals = [ |
---|
369 | ("string", "x", ""), |
---|
370 | ("string", "A\xc3\xa9 A", ""), |
---|
371 | ("text", "x", ""), |
---|
372 | ("password", "x", ""), |
---|
373 | ("upload", "x", ""), |
---|
374 | ("double", 3.1, 1), |
---|
375 | ("integer", 3, 1), |
---|
376 | ("boolean", True, True), |
---|
377 | ("date", datetime.date.today(), datetime.date.today()), |
---|
378 | ( |
---|
379 | "datetime", |
---|
380 | datetime.datetime(1971, 12, 21, 10, 30, 55, 0), |
---|
381 | datetime_datetime_today, |
---|
382 | ), |
---|
383 | ("time", datetime_datetime_today.time(), datetime_datetime_today.time()), |
---|
384 | ("blob", "x", ""), |
---|
385 | ("blob", b"xyzzy", ""), |
---|
386 | # pickling a tuple will create a string which is not UTF-8 able. |
---|
387 | ("blob", pickle.dumps((0,), pickle.HIGHEST_PROTOCOL), ""), |
---|
388 | ] |
---|
389 | |
---|
390 | if not IS_GAE: |
---|
391 | # these are unsupported by GAE |
---|
392 | insert_vals.append(("blob", bytearray("a", "utf-8"), "")) |
---|
393 | insert_vals.append(("json", {"a": "b", "c": [1, 2]}, {})) |
---|
394 | |
---|
395 | for iv in insert_vals: |
---|
396 | db.define_table("tt", Field("aa", iv[0], default=iv[2])) |
---|
397 | # empty string stored to blob returns None |
---|
398 | default_return = None if iv[0] == "blob" and iv[2] == "" else iv[2] |
---|
399 | self.assertTrue(isinstance(db.tt.insert(), long)) |
---|
400 | self.assertTrue(isinstance(db.tt.insert(aa=iv[1]), long)) |
---|
401 | self.assertTrue(isinstance(db.tt.insert(aa=None), long)) |
---|
402 | cv = iv[1] |
---|
403 | if IS_MONGODB and not PY2 and iv[0] == "blob": |
---|
404 | cv = to_bytes(iv[1]) |
---|
405 | self.assertEqual(db().select(db.tt.aa)[0].aa, default_return) |
---|
406 | self.assertEqual(db().select(db.tt.aa)[1].aa, cv) |
---|
407 | self.assertEqual(db().select(db.tt.aa)[2].aa, None) |
---|
408 | |
---|
409 | if not IS_GAE: |
---|
410 | ## field aliases |
---|
411 | row = db().select(db.tt.aa.with_alias("zz"))[1] |
---|
412 | self.assertEqual(row["zz"], cv) |
---|
413 | |
---|
414 | drop(db.tt) |
---|
415 | |
---|
416 | ## Row APIs |
---|
417 | db.define_table( |
---|
418 | "tt", Field("aa", "datetime", default=datetime.datetime.today()) |
---|
419 | ) |
---|
420 | t0 = datetime.datetime(1971, 12, 21, 10, 30, 55, 0) |
---|
421 | id = db.tt.insert(aa=t0) |
---|
422 | self.assertEqual(isinstance(id, long), True) |
---|
423 | |
---|
424 | row = db().select(db.tt.aa)[0] |
---|
425 | self.assertEqual(db.tt[id].aa, t0) |
---|
426 | self.assertEqual(db.tt["aa"], db.tt.aa) |
---|
427 | self.assertEqual(db.tt(id).aa, t0) |
---|
428 | self.assertTrue(db.tt(id, aa=None) == None) |
---|
429 | self.assertFalse(db.tt(id, aa=t0) == None) |
---|
430 | self.assertEqual(row.aa, t0) |
---|
431 | self.assertEqual(row["aa"], t0) |
---|
432 | self.assertEqual(row["tt.aa"], t0) |
---|
433 | self.assertEqual(row("tt.aa"), t0) |
---|
434 | |
---|
435 | ## Lazy and Virtual fields |
---|
436 | db.tt.b = Field.Virtual(lambda row: row.tt.aa) |
---|
437 | # test for FieldVirtual.bind |
---|
438 | self.assertEqual(db.tt.b.tablename, "tt") |
---|
439 | self.assertEqual(db.tt.b.name, "b") |
---|
440 | db.tt.c = Field.Lazy(lambda row: row.tt.aa) |
---|
441 | # test for FieldMethod.bind |
---|
442 | self.assertEqual(db.tt.c.name, "c") |
---|
443 | rows = db().select(db.tt.aa) |
---|
444 | row = rows[0] |
---|
445 | self.assertEqual(row.b, t0) |
---|
446 | self.assertEqual(row.c(), t0) |
---|
447 | # test for BasicRows.colnames_fields |
---|
448 | rows.colnames.insert(0, "tt.b") |
---|
449 | rows.colnames.insert(1, "tt.c") |
---|
450 | colnames_fields = rows.colnames_fields |
---|
451 | self.assertIs(colnames_fields[0], db.tt.b) |
---|
452 | self.assertIs(colnames_fields[1], db.tt.c) |
---|
453 | drop(db.tt) |
---|
454 | |
---|
455 | db.define_table("tt", Field("aa", "time", default="11:30")) |
---|
456 | t0 = datetime.time(10, 30, 55) |
---|
457 | self.assertEqual(isinstance(db.tt.insert(aa=t0), long), True) |
---|
458 | self.assertEqual(db().select(db.tt.aa)[0].aa, t0) |
---|
459 | drop(db.tt) |
---|
460 | db.close() |
---|
461 | |
---|
462 | |
---|
463 | @unittest.skipIf(IS_IMAP, "Skip IMAP") |
---|
464 | class TestTables(unittest.TestCase): |
---|
465 | def testTableNames(self): |
---|
466 | """ |
---|
467 | - a "str" something |
---|
468 | - not a method or property of DAL |
---|
469 | - "dotted-notation" friendly: |
---|
470 | - a valid python identifier |
---|
471 | - not a python keyword |
---|
472 | - not starting with underscore or an integer |
---|
473 | - not containing dots |
---|
474 | |
---|
475 | Basically, anything alphanumeric, no symbols, only underscore as |
---|
476 | punctuation |
---|
477 | """ |
---|
478 | |
---|
479 | # Check that Tables cannot start with underscores |
---|
480 | self.assertRaises(SyntaxError, Table, None, "_abc") |
---|
481 | |
---|
482 | # Check that Tables cannot contain punctuation other than underscores |
---|
483 | self.assertRaises(SyntaxError, Table, None, "a.bc") |
---|
484 | |
---|
485 | # Check that Tables cannot be a name of a method or property of DAL |
---|
486 | for x in ["define_table", "tables", "as_dict"]: |
---|
487 | self.assertRaises(SyntaxError, Table, None, x) |
---|
488 | |
---|
489 | # Check that Table allows underscores in the body of a field name. |
---|
490 | self.assertTrue( |
---|
491 | Table(None, "a_bc"), |
---|
492 | "Table isn't allowing underscores in tablename. It should.", |
---|
493 | ) |
---|
494 | |
---|
495 | # Check that Table names don't allow a python keyword |
---|
496 | self.assertRaises(SyntaxError, Table, None, "True") |
---|
497 | self.assertRaises(SyntaxError, Table, None, "elif") |
---|
498 | self.assertRaises(SyntaxError, Table, None, "while") |
---|
499 | |
---|
500 | # Check that Table names don't allow a non-valid python identifier |
---|
501 | non_valid_examples = ["1x", "xx$%@%", "xx yy", "yy\na", "yy\n"] |
---|
502 | for a in non_valid_examples: |
---|
503 | self.assertRaises(SyntaxError, Table, None, a) |
---|
504 | |
---|
505 | # Check that Table names don't allow a unicode string |
---|
506 | non_valid_examples = [ |
---|
507 | "ℙƴ☂ℌøἤ", |
---|
508 | u"ℙƴ☂ℌøἤ", |
---|
509 | u"àè", |
---|
510 | u"ṧøмℯ", |
---|
511 | u"тεṧт", |
---|
512 | u"♥αłüℯṧ", |
---|
513 | u"ℊεᾔ℮яαт℮∂", |
---|
514 | u"♭ƴ", |
---|
515 | u"ᾔ☤ρℌℓ☺ḓ", |
---|
516 | ] |
---|
517 | for a in non_valid_examples: |
---|
518 | self.assertRaises(SyntaxError, Table, None, a) |
---|
519 | |
---|
520 | |
---|
521 | @unittest.skipIf(IS_IMAP, "Skip IMAP") |
---|
522 | class TestAll(unittest.TestCase): |
---|
523 | def setUp(self): |
---|
524 | self.pt = Table(None, "PseudoTable", Field("name"), Field("birthdate")) |
---|
525 | |
---|
526 | def testSQLALL(self): |
---|
527 | ans = "PseudoTable.id, PseudoTable.name, PseudoTable.birthdate" |
---|
528 | self.assertEqual(str(SQLALL(self.pt)), ans) |
---|
529 | |
---|
530 | |
---|
531 | @unittest.skipIf(IS_IMAP, "Skip IMAP") |
---|
532 | class TestTable(unittest.TestCase): |
---|
533 | def testTableCreation(self): |
---|
534 | |
---|
535 | # Check for error when not passing type other than Field or Table |
---|
536 | |
---|
537 | self.assertRaises(SyntaxError, Table, None, "test", None) |
---|
538 | |
---|
539 | persons = Table( |
---|
540 | None, "persons", Field("firstname", "string"), Field("lastname", "string") |
---|
541 | ) |
---|
542 | |
---|
543 | # Does it have the correct fields? |
---|
544 | |
---|
545 | self.assertTrue(set(persons.fields).issuperset(set(["firstname", "lastname"]))) |
---|
546 | |
---|
547 | # ALL is set correctly |
---|
548 | |
---|
549 | self.assertTrue("persons.firstname, persons.lastname" in str(persons.ALL)) |
---|
550 | |
---|
551 | def testTableAlias(self): |
---|
552 | db = DAL(DEFAULT_URI, check_reserved=["all"]) |
---|
553 | persons = Table( |
---|
554 | db, "persons", Field("firstname", "string"), Field("lastname", "string") |
---|
555 | ) |
---|
556 | aliens = persons.with_alias("aliens") |
---|
557 | |
---|
558 | # Are the different table instances with the same fields |
---|
559 | |
---|
560 | self.assertTrue(persons is not aliens) |
---|
561 | self.assertTrue(set(persons.fields) == set(aliens.fields)) |
---|
562 | db.close() |
---|
563 | |
---|
564 | def testTableInheritance(self): |
---|
565 | persons = Table( |
---|
566 | None, "persons", Field("firstname", "string"), Field("lastname", "string") |
---|
567 | ) |
---|
568 | customers = Table( |
---|
569 | None, "customers", Field("items_purchased", "integer"), persons |
---|
570 | ) |
---|
571 | self.assertTrue( |
---|
572 | set(customers.fields).issuperset( |
---|
573 | set(["items_purchased", "firstname", "lastname"]) |
---|
574 | ) |
---|
575 | ) |
---|
576 | |
---|
577 | |
---|
578 | class TestInsert(unittest.TestCase): |
---|
579 | def testRun(self): |
---|
580 | if IS_IMAP: |
---|
581 | imap = DAL(DEFAULT_URI) |
---|
582 | imap.define_tables() |
---|
583 | self.assertEqual( |
---|
584 | imap.Draft.insert( |
---|
585 | to="nurse@example.com", |
---|
586 | subject="Nurse!", |
---|
587 | sender="gumby@example.com", |
---|
588 | content="Nurse!\r\nNurse!", |
---|
589 | ), |
---|
590 | 2, |
---|
591 | ) |
---|
592 | self.assertEqual(imap.Draft[2].subject, "Nurse!") |
---|
593 | self.assertEqual(imap.Draft[2].sender, "gumby@example.com") |
---|
594 | self.assertEqual(isinstance(imap.Draft[2].uid, long), True) |
---|
595 | self.assertEqual(imap.Draft[2].content[0]["text"], "Nurse!\r\nNurse!") |
---|
596 | imap.close() |
---|
597 | else: |
---|
598 | db = DAL(DEFAULT_URI, check_reserved=["all"]) |
---|
599 | db.define_table("tt", Field("aa")) |
---|
600 | self.assertEqual(isinstance(db.tt.insert(aa="1"), long), True) |
---|
601 | self.assertEqual(isinstance(db.tt.insert(aa="1"), long), True) |
---|
602 | self.assertEqual(isinstance(db.tt.insert(aa="1"), long), True) |
---|
603 | self.assertEqual(db(db.tt.aa == "1").count(), 3) |
---|
604 | self.assertEqual(db(db.tt.aa == "2").isempty(), True) |
---|
605 | self.assertEqual(db(db.tt.aa == "1").update(aa="2"), 3) |
---|
606 | self.assertEqual(db(db.tt.aa == "2").count(), 3) |
---|
607 | self.assertEqual(db(db.tt.aa == "2").isempty(), False) |
---|
608 | self.assertEqual(db(db.tt.aa == "2").delete(), 3) |
---|
609 | self.assertEqual(db(db.tt.aa == "2").isempty(), True) |
---|
610 | |
---|
611 | def callable(): |
---|
612 | return "aa" |
---|
613 | |
---|
614 | self.assertTrue(isinstance(db.tt.insert(aa=callable), long)) |
---|
615 | self.assertEqual(db(db.tt.aa == "aa").count(), 1) |
---|
616 | |
---|
617 | drop(db.tt) |
---|
618 | db.close() |
---|
619 | |
---|
620 | |
---|
621 | @unittest.skipIf(IS_IMAP, "Skip IMAP") |
---|
622 | class TestSelect(unittest.TestCase): |
---|
623 | def testRun(self): |
---|
624 | db = DAL(DEFAULT_URI, check_reserved=["all"]) |
---|
625 | db.define_table("tt", Field("aa")) |
---|
626 | self.assertEqual(isinstance(db.tt.insert(aa="1"), long), True) |
---|
627 | self.assertEqual(isinstance(db.tt.insert(aa="2"), long), True) |
---|
628 | self.assertEqual(isinstance(db.tt.insert(aa="3"), long), True) |
---|
629 | self.assertEqual(db(db.tt.id > 0).count(), 3) |
---|
630 | self.assertEqual(db(db.tt.aa).count(), 3) |
---|
631 | self.assertEqual(db(db.tt.id).count(), 3) |
---|
632 | self.assertEqual(db(db.tt.id != None).count(), 3) |
---|
633 | |
---|
634 | self.assertEqual(db(db.tt.id > 0).select(orderby=~db.tt.aa)[0].aa, "3") |
---|
635 | self.assertEqual( |
---|
636 | db(db.tt.id > 0).select(orderby=~db.tt.aa | db.tt.id)[0].aa, "3" |
---|
637 | ) |
---|
638 | self.assertEqual(len(db(db.tt.id > 0).select(limitby=(1, 2))), 1) |
---|
639 | self.assertEqual(db(db.tt.id > 0).select(limitby=(1, 2))[0].aa, "2") |
---|
640 | self.assertEqual(len(db().select(db.tt.ALL)), 3) |
---|
641 | self.assertEqual(db(db.tt.aa == None).count(), 0) |
---|
642 | self.assertEqual(db(db.tt.aa != None).count(), 3) |
---|
643 | self.assertEqual(db(db.tt.aa > "1").count(), 2) |
---|
644 | self.assertEqual(db(db.tt.aa >= "1").count(), 3) |
---|
645 | self.assertEqual(db(db.tt.aa == "1").count(), 1) |
---|
646 | self.assertEqual(db(db.tt.aa != "1").count(), 2) |
---|
647 | self.assertEqual(db(db.tt.aa < "3").count(), 2) |
---|
648 | self.assertEqual(db(db.tt.aa <= "3").count(), 3) |
---|
649 | self.assertEqual(db(db.tt.aa > "1")(db.tt.aa < "3").count(), 1) |
---|
650 | self.assertEqual(db((db.tt.aa > "1") & (db.tt.aa < "3")).count(), 1) |
---|
651 | self.assertEqual(db((db.tt.aa > "1") | (db.tt.aa < "3")).count(), 3) |
---|
652 | # Test not operator |
---|
653 | self.assertEqual(db(~(db.tt.aa != "1")).count(), 1) |
---|
654 | self.assertEqual(db(~(db.tt.aa == "1")).count(), 2) |
---|
655 | self.assertEqual(db((db.tt.aa > "1") & ~(db.tt.aa > "2")).count(), 1) |
---|
656 | self.assertEqual(db(~(db.tt.aa > "1") & (db.tt.aa > "2")).count(), 0) |
---|
657 | self.assertEqual(db(~((db.tt.aa < "1") | (db.tt.aa > "2"))).count(), 2) |
---|
658 | self.assertEqual(db(~((db.tt.aa >= "1") & (db.tt.aa <= "2"))).count(), 1) |
---|
659 | drop(db.tt) |
---|
660 | db.close() |
---|
661 | |
---|
662 | def testListInteger(self): |
---|
663 | db = DAL(DEFAULT_URI, check_reserved=["all"]) |
---|
664 | db.define_table("tt", Field("aa", "list:integer")) |
---|
665 | l = [0, 1, 2, 3, 4, 5] |
---|
666 | db.tt.insert(aa=l) |
---|
667 | self.assertEqual(db(db.tt).select("tt.aa").first()[db.tt.aa], l) |
---|
668 | drop(db.tt) |
---|
669 | db.close() |
---|
670 | |
---|
671 | def testListString(self): |
---|
672 | db = DAL(DEFAULT_URI, check_reserved=["all"]) |
---|
673 | db.define_table("tt", Field("aa", "list:string")) |
---|
674 | l = ["a", "b", "c"] |
---|
675 | db.tt.insert(aa=l) |
---|
676 | self.assertEqual(db(db.tt).select("tt.aa").first()[db.tt.aa], l) |
---|
677 | drop(db.tt) |
---|
678 | db.close() |
---|
679 | |
---|
680 | def testListReference(self): |
---|
681 | db = DAL(DEFAULT_URI, check_reserved=["all"]) |
---|
682 | on_deletes = ( |
---|
683 | "CASCADE", |
---|
684 | "SET NULL", |
---|
685 | ) |
---|
686 | for ondelete in on_deletes: |
---|
687 | db.define_table("t0", Field("aa", "string")) |
---|
688 | db.define_table( |
---|
689 | "tt", Field("t0_id", "list:reference t0", ondelete=ondelete) |
---|
690 | ) |
---|
691 | id_a1 = db.t0.insert(aa="test1") |
---|
692 | id_a2 = db.t0.insert(aa="test2") |
---|
693 | ref1 = [id_a1] |
---|
694 | ref2 = [id_a2] |
---|
695 | ref3 = [id_a1, id_a2] |
---|
696 | db.tt.insert(t0_id=ref1) |
---|
697 | self.assertEqual(db(db.tt).select(db.tt.t0_id).last()[db.tt.t0_id], ref1) |
---|
698 | db.tt.insert(t0_id=ref2) |
---|
699 | self.assertEqual(db(db.tt).select(db.tt.t0_id).last()[db.tt.t0_id], ref2) |
---|
700 | db.tt.insert(t0_id=ref3) |
---|
701 | self.assertEqual(db(db.tt).select(db.tt.t0_id).last()[db.tt.t0_id], ref3) |
---|
702 | |
---|
703 | if IS_MONGODB: |
---|
704 | self.assertEqual(db(db.tt.t0_id == ref3).count(), 1) |
---|
705 | |
---|
706 | self.assertEqual(db(db.tt.t0_id.contains(id_a1)).count(), 2) |
---|
707 | self.assertEqual(db(db.tt.t0_id.contains(id_a2)).count(), 2) |
---|
708 | db(db.t0.aa == "test1").delete() |
---|
709 | if ondelete == "SET NULL": |
---|
710 | self.assertEqual(db(db.tt).count(), 3) |
---|
711 | self.assertEqual(db(db.tt).select()[0].t0_id, []) |
---|
712 | if ondelete == "CASCADE": |
---|
713 | self.assertEqual(db(db.tt).count(), 2) |
---|
714 | self.assertEqual(db(db.tt).select()[0].t0_id, ref2) |
---|
715 | |
---|
716 | drop(db.tt) |
---|
717 | drop(db.t0) |
---|
718 | db.close() |
---|
719 | |
---|
720 | @unittest.skipIf(IS_GAE, "no groupby in appengine") |
---|
721 | def testGroupByAndDistinct(self): |
---|
722 | db = DAL(DEFAULT_URI, check_reserved=["all"]) |
---|
723 | db.define_table( |
---|
724 | "tt", Field("aa"), Field("bb", "integer"), Field("cc", "integer") |
---|
725 | ) |
---|
726 | db.tt.insert(aa="4", bb=1, cc=1) |
---|
727 | db.tt.insert(aa="3", bb=2, cc=1) |
---|
728 | db.tt.insert(aa="3", bb=1, cc=1) |
---|
729 | db.tt.insert(aa="1", bb=1, cc=1) |
---|
730 | db.tt.insert(aa="1", bb=2, cc=1) |
---|
731 | db.tt.insert(aa="1", bb=3, cc=1) |
---|
732 | db.tt.insert(aa="1", bb=4, cc=1) |
---|
733 | db.tt.insert(aa="2", bb=1, cc=1) |
---|
734 | db.tt.insert(aa="2", bb=2, cc=1) |
---|
735 | db.tt.insert(aa="2", bb=3, cc=1) |
---|
736 | self.assertEqual(db(db.tt).count(), 10) |
---|
737 | |
---|
738 | # test groupby |
---|
739 | result = db().select(db.tt.aa, db.tt.bb.sum(), groupby=db.tt.aa) |
---|
740 | self.assertEqual(len(result), 4) |
---|
741 | result = db().select( |
---|
742 | db.tt.aa, db.tt.bb.sum(), groupby=db.tt.aa, orderby=db.tt.aa |
---|
743 | ) |
---|
744 | self.assertEqual(tuple(result.response[2]), ("3", 3)) |
---|
745 | result = db().select( |
---|
746 | db.tt.aa, db.tt.bb.sum(), groupby=db.tt.aa, orderby=~db.tt.aa |
---|
747 | ) |
---|
748 | self.assertEqual(tuple(result.response[1]), ("3", 3)) |
---|
749 | result = db().select( |
---|
750 | db.tt.aa, |
---|
751 | db.tt.bb, |
---|
752 | db.tt.cc.sum(), |
---|
753 | groupby=db.tt.aa | db.tt.bb, |
---|
754 | orderby=(db.tt.aa | ~db.tt.bb), |
---|
755 | ) |
---|
756 | self.assertEqual(tuple(result.response[4]), ("2", 3, 1)) |
---|
757 | result = db().select( |
---|
758 | db.tt.aa, |
---|
759 | db.tt.bb.sum(), |
---|
760 | groupby=db.tt.aa, |
---|
761 | orderby=~db.tt.aa, |
---|
762 | limitby=(1, 2), |
---|
763 | ) |
---|
764 | self.assertEqual(len(result), 1) |
---|
765 | self.assertEqual(tuple(result.response[0]), ("3", 3)) |
---|
766 | result = db().select(db.tt.aa, db.tt.bb.sum(), groupby=db.tt.aa, limitby=(0, 3)) |
---|
767 | self.assertEqual(len(result), 3) |
---|
768 | self.assertEqual(tuple(result.response[2]), ("3", 3)) |
---|
769 | |
---|
770 | # test having |
---|
771 | self.assertEqual( |
---|
772 | len( |
---|
773 | db().select( |
---|
774 | db.tt.aa, |
---|
775 | db.tt.bb.sum(), |
---|
776 | groupby=db.tt.aa, |
---|
777 | having=db.tt.bb.sum() > 2, |
---|
778 | ) |
---|
779 | ), |
---|
780 | 3, |
---|
781 | ) |
---|
782 | |
---|
783 | # test distinct |
---|
784 | result = db().select(db.tt.aa, db.tt.cc, distinct=True) |
---|
785 | self.assertEqual(len(result), 4) |
---|
786 | result = db().select(db.tt.cc, distinct=True, groupby=db.tt.cc) |
---|
787 | self.assertEqual(len(result), 1) |
---|
788 | self.assertEqual(result[0].cc, 1) |
---|
789 | result = db().select(db.tt.aa, distinct=True, orderby=~db.tt.aa) |
---|
790 | self.assertEqual(result[2].aa, "2") |
---|
791 | self.assertEqual(result[1].aa, "3") |
---|
792 | result = db().select( |
---|
793 | db.tt.aa, db.tt.bb, distinct=True, orderby=(db.tt.aa | ~db.tt.bb) |
---|
794 | ) |
---|
795 | self.assertEqual(tuple(result.response[4]), ("2", 3)) |
---|
796 | result = db().select( |
---|
797 | db.tt.aa, distinct=db.tt.aa, orderby=~db.tt.aa, limitby=(1, 2) |
---|
798 | ) |
---|
799 | self.assertEqual(len(result), 1) |
---|
800 | self.assertEqual(result[0].aa, "3") |
---|
801 | |
---|
802 | # test count distinct |
---|
803 | db.tt.insert(aa="2", bb=3, cc=1) |
---|
804 | self.assertEqual(db(db.tt).count(distinct=db.tt.aa), 4) |
---|
805 | self.assertEqual(db(db.tt).count(distinct=db.tt.aa | db.tt.bb), 10) |
---|
806 | self.assertEqual(db(db.tt).count(distinct=db.tt.aa | db.tt.bb | db.tt.cc), 10) |
---|
807 | self.assertEqual(db(db.tt).count(distinct=True), 10) |
---|
808 | self.assertEqual(db(db.tt.aa).count(db.tt.aa), 4) |
---|
809 | self.assertEqual(db(db.tt.aa).count(), 11) |
---|
810 | count = db.tt.aa.count() |
---|
811 | self.assertEqual(db(db.tt).select(count).first()[count], 11) |
---|
812 | |
---|
813 | count = db.tt.aa.count(distinct=True) |
---|
814 | sum = db.tt.bb.sum() |
---|
815 | result = db(db.tt).select(count, sum) |
---|
816 | self.assertEqual(tuple(result.response[0]), (4, 23)) |
---|
817 | self.assertEqual(result.first()[count], 4) |
---|
818 | self.assertEqual(result.first()[sum], 23) |
---|
819 | |
---|
820 | if not IS_MONGODB or db._adapter.server_version_major >= 2.6: |
---|
821 | # mongo < 2.6 does not support $size |
---|
822 | count = db.tt.aa.count(distinct=True) + db.tt.bb.count(distinct=True) |
---|
823 | self.assertEqual(db(db.tt).select(count).first()[count], 8) |
---|
824 | |
---|
825 | drop(db.tt) |
---|
826 | db.close() |
---|
827 | |
---|
828 | @unittest.skipIf(IS_GAE, "no coalesce in appengine") |
---|
829 | def testCoalesce(self): |
---|
830 | db = DAL(DEFAULT_URI, check_reserved=["all"]) |
---|
831 | db.define_table("tt", Field("aa"), Field("bb"), Field("cc"), Field("dd")) |
---|
832 | db.tt.insert(aa="xx") |
---|
833 | db.tt.insert(aa="xx", bb="yy") |
---|
834 | db.tt.insert(aa="xx", bb="yy", cc="zz") |
---|
835 | db.tt.insert(aa="xx", bb="yy", cc="zz", dd="") |
---|
836 | result = db(db.tt).select(db.tt.dd.coalesce(db.tt.cc, db.tt.bb, db.tt.aa)) |
---|
837 | self.assertEqual(result.response[0][0], "xx") |
---|
838 | self.assertEqual(result.response[1][0], "yy") |
---|
839 | self.assertEqual(result.response[2][0], "zz") |
---|
840 | self.assertEqual(result.response[3][0], "") |
---|
841 | db.tt.drop() |
---|
842 | |
---|
843 | db.define_table("tt", Field("aa", "integer"), Field("bb")) |
---|
844 | db.tt.insert(bb="") |
---|
845 | db.tt.insert(aa=1) |
---|
846 | result = db(db.tt).select(db.tt.aa.coalesce_zero()) |
---|
847 | self.assertEqual(result.response[0][0], 0) |
---|
848 | self.assertEqual(result.response[1][0], 1) |
---|
849 | |
---|
850 | db.tt.drop() |
---|
851 | db.close() |
---|
852 | |
---|
853 | |
---|
854 | @unittest.skipIf(IS_IMAP, "TODO: IMAP test") |
---|
855 | class TestAddMethod(unittest.TestCase): |
---|
856 | def testRun(self): |
---|
857 | db = DAL(DEFAULT_URI, check_reserved=["all"]) |
---|
858 | db.define_table("tt", Field("aa")) |
---|
859 | |
---|
860 | @db.tt.add_method.all |
---|
861 | def select_all(table, orderby=None): |
---|
862 | return table._db(table).select(orderby=orderby) |
---|
863 | |
---|
864 | self.assertEqual(isinstance(db.tt.insert(aa="1"), long), True) |
---|
865 | self.assertEqual(isinstance(db.tt.insert(aa="2"), long), True) |
---|
866 | self.assertEqual(isinstance(db.tt.insert(aa="3"), long), True) |
---|
867 | self.assertEqual(len(db.tt.all()), 3) |
---|
868 | drop(db.tt) |
---|
869 | db.close() |
---|
870 | |
---|
871 | |
---|
872 | @unittest.skipIf(IS_IMAP, "TODO: IMAP test") |
---|
873 | class TestBelongs(unittest.TestCase): |
---|
874 | def __init__(self, *args, **vars): |
---|
875 | unittest.TestCase.__init__(self, *args, **vars) |
---|
876 | self.db = None |
---|
877 | |
---|
878 | def setUp(self): |
---|
879 | db = DAL(DEFAULT_URI, check_reserved=["all"]) |
---|
880 | db.define_table("tt", Field("aa")) |
---|
881 | self.i_id = db.tt.insert(aa="1") |
---|
882 | self.assertEqual(isinstance(self.i_id, long), True) |
---|
883 | self.assertEqual(isinstance(db.tt.insert(aa="2"), long), True) |
---|
884 | self.assertEqual(isinstance(db.tt.insert(aa="3"), long), True) |
---|
885 | self.db = db |
---|
886 | |
---|
887 | def testRun(self): |
---|
888 | db = self.db |
---|
889 | self.assertEqual(db(db.tt.aa.belongs(("1", "3"))).count(), 2) |
---|
890 | self.assertEqual(db(db.tt.aa.belongs(["1", "3"])).count(), 2) |
---|
891 | self.assertEqual(db(db.tt.aa.belongs(["1", "3"])).count(), 2) |
---|
892 | self.assertEqual(db(db.tt.id.belongs([self.i_id])).count(), 1) |
---|
893 | self.assertEqual(db(db.tt.id.belongs([])).count(), 0) |
---|
894 | |
---|
895 | @unittest.skipIf( |
---|
896 | IS_GAE or IS_MONGODB, |
---|
897 | "Datastore/Mongodb belongs() does not accept nested queries", |
---|
898 | ) |
---|
899 | def testNested(self): |
---|
900 | db = self.db |
---|
901 | self.assertEqual( |
---|
902 | db(db.tt.aa.belongs(db(db.tt.id == self.i_id)._select(db.tt.aa))).count(), 1 |
---|
903 | ) |
---|
904 | |
---|
905 | self.assertEqual( |
---|
906 | db( |
---|
907 | db.tt.aa.belongs(db(db.tt.aa.belongs(("1", "3")))._select(db.tt.aa)) |
---|
908 | ).count(), |
---|
909 | 2, |
---|
910 | ) |
---|
911 | self.assertEqual( |
---|
912 | db( |
---|
913 | db.tt.aa.belongs( |
---|
914 | db( |
---|
915 | db.tt.aa.belongs( |
---|
916 | db(db.tt.aa.belongs(("1", "3")))._select(db.tt.aa) |
---|
917 | ) |
---|
918 | )._select(db.tt.aa) |
---|
919 | ) |
---|
920 | ).count(), |
---|
921 | 2, |
---|
922 | ) |
---|
923 | |
---|
924 | def tearDown(self): |
---|
925 | db = self.db |
---|
926 | drop(db.tt) |
---|
927 | db.close() |
---|
928 | self.db = None |
---|
929 | |
---|
930 | |
---|
931 | @unittest.skipIf( |
---|
932 | IS_GAE or IS_IMAP, "Contains not supported on GAE Datastore. TODO: IMAP tests" |
---|
933 | ) |
---|
934 | class TestContains(unittest.TestCase): |
---|
935 | def testRun(self): |
---|
936 | db = DAL(DEFAULT_URI, check_reserved=["all"]) |
---|
937 | db.define_table("tt", Field("aa", "list:string"), Field("bb", "string")) |
---|
938 | self.assertEqual( |
---|
939 | isinstance(db.tt.insert(aa=["aaa", "bbb"], bb="aaa"), long), True |
---|
940 | ) |
---|
941 | self.assertEqual( |
---|
942 | isinstance(db.tt.insert(aa=["bbb", "ddd"], bb="abb"), long), True |
---|
943 | ) |
---|
944 | self.assertEqual( |
---|
945 | isinstance(db.tt.insert(aa=["eee", "aaa"], bb="acc"), long), True |
---|
946 | ) |
---|
947 | self.assertEqual(db(db.tt.aa.contains("aaa")).count(), 2) |
---|
948 | self.assertEqual(db(db.tt.aa.contains("bbb")).count(), 2) |
---|
949 | self.assertEqual(db(db.tt.aa.contains("aa")).count(), 0) |
---|
950 | self.assertEqual(db(db.tt.bb.contains("a")).count(), 3) |
---|
951 | self.assertEqual(db(db.tt.bb.contains("b")).count(), 1) |
---|
952 | self.assertEqual(db(db.tt.bb.contains("d")).count(), 0) |
---|
953 | self.assertEqual(db(db.tt.aa.contains(db.tt.bb)).count(), 1) |
---|
954 | |
---|
955 | # case-sensitivity tests, if 1 it isn't |
---|
956 | is_case_insensitive = db(db.tt.bb.contains("AAA", case_sensitive=True)).count() |
---|
957 | if is_case_insensitive: |
---|
958 | self.assertEqual(db(db.tt.aa.contains("AAA")).count(), 2) |
---|
959 | self.assertEqual(db(db.tt.bb.contains("A")).count(), 3) |
---|
960 | else: |
---|
961 | self.assertEqual( |
---|
962 | db(db.tt.aa.contains("AAA", case_sensitive=True)).count(), 0 |
---|
963 | ) |
---|
964 | self.assertEqual(db(db.tt.bb.contains("A", case_sensitive=True)).count(), 0) |
---|
965 | self.assertEqual( |
---|
966 | db(db.tt.aa.contains("AAA", case_sensitive=False)).count(), 2 |
---|
967 | ) |
---|
968 | self.assertEqual( |
---|
969 | db(db.tt.bb.contains("A", case_sensitive=False)).count(), 3 |
---|
970 | ) |
---|
971 | db.tt.drop() |
---|
972 | |
---|
973 | # integers in string fields |
---|
974 | db.define_table( |
---|
975 | "tt", |
---|
976 | Field("aa", "list:string"), |
---|
977 | Field("bb", "string"), |
---|
978 | Field("cc", "integer"), |
---|
979 | ) |
---|
980 | self.assertEqual( |
---|
981 | isinstance(db.tt.insert(aa=["123", "456"], bb="123", cc=12), long), True |
---|
982 | ) |
---|
983 | self.assertEqual( |
---|
984 | isinstance(db.tt.insert(aa=["124", "456"], bb="123", cc=123), long), True |
---|
985 | ) |
---|
986 | self.assertEqual( |
---|
987 | isinstance(db.tt.insert(aa=["125", "457"], bb="23", cc=125), long), True |
---|
988 | ) |
---|
989 | self.assertEqual(db(db.tt.aa.contains(123)).count(), 1) |
---|
990 | self.assertEqual(db(db.tt.aa.contains(23)).count(), 0) |
---|
991 | self.assertEqual(db(db.tt.aa.contains(db.tt.cc)).count(), 1) |
---|
992 | self.assertEqual(db(db.tt.bb.contains(123)).count(), 2) |
---|
993 | self.assertEqual(db(db.tt.bb.contains(23)).count(), 3) |
---|
994 | self.assertEqual(db(db.tt.bb.contains(db.tt.cc)).count(), 2) |
---|
995 | db.tt.drop() |
---|
996 | |
---|
997 | # string field contains string field |
---|
998 | db.define_table("tt", Field("aa"), Field("bb")) |
---|
999 | db.tt.insert(aa="aaa", bb="%aaa") |
---|
1000 | db.tt.insert(aa="aaa", bb="aaa") |
---|
1001 | self.assertEqual(db(db.tt.aa.contains(db.tt.bb)).count(), 1) |
---|
1002 | drop(db.tt) |
---|
1003 | |
---|
1004 | # escaping |
---|
1005 | db.define_table("tt", Field("aa")) |
---|
1006 | db.tt.insert(aa="perc%ent") |
---|
1007 | db.tt.insert(aa="percent") |
---|
1008 | db.tt.insert(aa="percxyzent") |
---|
1009 | db.tt.insert(aa="under_score") |
---|
1010 | db.tt.insert(aa="underxscore") |
---|
1011 | db.tt.insert(aa="underyscore") |
---|
1012 | self.assertEqual(db(db.tt.aa.contains("perc%ent")).count(), 1) |
---|
1013 | self.assertEqual(db(db.tt.aa.contains("under_score")).count(), 1) |
---|
1014 | drop(db.tt) |
---|
1015 | db.close() |
---|
1016 | |
---|
1017 | |
---|
1018 | @unittest.skipIf(IS_GAE, "Like not supported on GAE Datastore.") |
---|
1019 | @unittest.skipIf(IS_IMAP, "TODO: IMAP test") |
---|
1020 | class TestLike(unittest.TestCase): |
---|
1021 | def setUp(self): |
---|
1022 | db = DAL(DEFAULT_URI, check_reserved=["all"]) |
---|
1023 | db.define_table("tt", Field("aa")) |
---|
1024 | self.assertEqual(isinstance(db.tt.insert(aa="abc"), long), True) |
---|
1025 | self.db = db |
---|
1026 | |
---|
1027 | def tearDown(self): |
---|
1028 | db = self.db |
---|
1029 | drop(db.tt) |
---|
1030 | db.close() |
---|
1031 | self.db = None |
---|
1032 | |
---|
1033 | def testRun(self): |
---|
1034 | db = self.db |
---|
1035 | self.assertEqual(db(db.tt.aa.like("a%")).count(), 1) |
---|
1036 | self.assertEqual(db(db.tt.aa.like("%b%")).count(), 1) |
---|
1037 | self.assertEqual(db(db.tt.aa.like("%c")).count(), 1) |
---|
1038 | self.assertEqual(db(db.tt.aa.like("%d%")).count(), 0) |
---|
1039 | self.assertEqual(db(db.tt.aa.like("ab_")).count(), 1) |
---|
1040 | self.assertEqual(db(db.tt.aa.like("a_c")).count(), 1) |
---|
1041 | self.assertEqual(db(db.tt.aa.like("_bc")).count(), 1) |
---|
1042 | |
---|
1043 | self.assertEqual(db(db.tt.aa.like("A%", case_sensitive=False)).count(), 1) |
---|
1044 | self.assertEqual(db(db.tt.aa.like("%B%", case_sensitive=False)).count(), 1) |
---|
1045 | self.assertEqual(db(db.tt.aa.like("%C", case_sensitive=False)).count(), 1) |
---|
1046 | self.assertEqual(db(db.tt.aa.ilike("A%")).count(), 1) |
---|
1047 | self.assertEqual(db(db.tt.aa.ilike("%B%")).count(), 1) |
---|
1048 | self.assertEqual(db(db.tt.aa.ilike("%C")).count(), 1) |
---|
1049 | |
---|
1050 | # DAL maps like() (and contains(), startswith(), endswith()) |
---|
1051 | # to the LIKE operator, that in ANSI-SQL is case-sensitive |
---|
1052 | # There are backends supporting case-sensitivity by default |
---|
1053 | # and backends that needs additional care to turn |
---|
1054 | # case-sensitivity on. To discern among those, let's run |
---|
1055 | # this query comparing previously inserted 'abc' with 'ABC': |
---|
1056 | # if the result is 0, then the backend recognizes |
---|
1057 | # case-sensitivity, if 1 it isn't |
---|
1058 | is_case_insensitive = db(db.tt.aa.like("%ABC%")).count() |
---|
1059 | self.assertEqual(db(db.tt.aa.like("A%")).count(), is_case_insensitive) |
---|
1060 | self.assertEqual(db(db.tt.aa.like("%B%")).count(), is_case_insensitive) |
---|
1061 | self.assertEqual(db(db.tt.aa.like("%C")).count(), is_case_insensitive) |
---|
1062 | |
---|
1063 | def testUpperLower(self): |
---|
1064 | db = self.db |
---|
1065 | self.assertEqual(db(db.tt.aa.upper().like("A%")).count(), 1) |
---|
1066 | self.assertEqual(db(db.tt.aa.upper().like("%B%")).count(), 1) |
---|
1067 | self.assertEqual(db(db.tt.aa.upper().like("%C")).count(), 1) |
---|
1068 | self.assertEqual(db(db.tt.aa.lower().like("%c")).count(), 1) |
---|
1069 | |
---|
1070 | def testStartsEndsWith(self): |
---|
1071 | db = self.db |
---|
1072 | self.assertEqual(db(db.tt.aa.startswith("a")).count(), 1) |
---|
1073 | self.assertEqual(db(db.tt.aa.endswith("c")).count(), 1) |
---|
1074 | self.assertEqual(db(db.tt.aa.startswith("c")).count(), 0) |
---|
1075 | self.assertEqual(db(db.tt.aa.endswith("a")).count(), 0) |
---|
1076 | |
---|
1077 | def testEscaping(self): |
---|
1078 | db = self.db |
---|
1079 | term = "ahbc".replace("h", "\\") # funny but to avoid any doubts... |
---|
1080 | db.tt.insert(aa="a%bc") |
---|
1081 | db.tt.insert(aa="a_bc") |
---|
1082 | db.tt.insert(aa=term) |
---|
1083 | self.assertEqual(db(db.tt.aa.like("%ax%bc%", escape="x")).count(), 1) |
---|
1084 | self.assertEqual(db(db.tt.aa.like("%ax_bc%", escape="x")).count(), 1) |
---|
1085 | self.assertEqual(db(db.tt.aa.like("%" + term + "%")).count(), 1) |
---|
1086 | db(db.tt.id > 0).delete() |
---|
1087 | # test "literal" like, i.e. exactly as LIKE in the backend |
---|
1088 | db.tt.insert(aa="perc%ent") |
---|
1089 | db.tt.insert(aa="percent") |
---|
1090 | db.tt.insert(aa="percxyzent") |
---|
1091 | db.tt.insert(aa="under_score") |
---|
1092 | db.tt.insert(aa="underxscore") |
---|
1093 | db.tt.insert(aa="underyscore") |
---|
1094 | self.assertEqual(db(db.tt.aa.like("%perc%ent%")).count(), 3) |
---|
1095 | self.assertEqual(db(db.tt.aa.like("%under_score%")).count(), 3) |
---|
1096 | db(db.tt.id > 0).delete() |
---|
1097 | # escaping with startswith and endswith |
---|
1098 | db.tt.insert(aa="%percent") |
---|
1099 | db.tt.insert(aa="xpercent") |
---|
1100 | db.tt.insert(aa="discount%") |
---|
1101 | db.tt.insert(aa="discountx") |
---|
1102 | self.assertEqual(db(db.tt.aa.endswith("discount%")).count(), 1) |
---|
1103 | self.assertEqual(db(db.tt.aa.like("discount%%")).count(), 2) |
---|
1104 | self.assertEqual(db(db.tt.aa.startswith("%percent")).count(), 1) |
---|
1105 | self.assertEqual(db(db.tt.aa.like("%%percent")).count(), 2) |
---|
1106 | |
---|
1107 | def testRegexp(self): |
---|
1108 | db = self.db |
---|
1109 | db(db.tt.id > 0).delete() |
---|
1110 | db.tt.insert(aa="%percent") |
---|
1111 | db.tt.insert(aa="xpercent") |
---|
1112 | db.tt.insert(aa="discount%") |
---|
1113 | db.tt.insert(aa="discountx") |
---|
1114 | try: |
---|
1115 | self.assertEqual(db(db.tt.aa.regexp("count")).count(), 2) |
---|
1116 | except NotImplementedError: |
---|
1117 | pass |
---|
1118 | else: |
---|
1119 | self.assertEqual(db(db.tt.aa.lower().regexp("count")).count(), 2) |
---|
1120 | self.assertEqual( |
---|
1121 | db( |
---|
1122 | db.tt.aa.upper().regexp("COUNT") & db.tt.aa.lower().regexp("count") |
---|
1123 | ).count(), |
---|
1124 | 2, |
---|
1125 | ) |
---|
1126 | self.assertEqual( |
---|
1127 | db( |
---|
1128 | db.tt.aa.upper().regexp("COUNT") | (db.tt.aa.lower() == "xpercent") |
---|
1129 | ).count(), |
---|
1130 | 3, |
---|
1131 | ) |
---|
1132 | |
---|
1133 | def testLikeInteger(self): |
---|
1134 | db = self.db |
---|
1135 | db.tt.drop() |
---|
1136 | db.define_table("tt", Field("aa", "integer")) |
---|
1137 | self.assertEqual(isinstance(db.tt.insert(aa=1111111111), long), True) |
---|
1138 | self.assertEqual(isinstance(db.tt.insert(aa=1234567), long), True) |
---|
1139 | self.assertEqual(db(db.tt.aa.like("1%")).count(), 2) |
---|
1140 | self.assertEqual(db(db.tt.aa.like("1_3%")).count(), 1) |
---|
1141 | self.assertEqual(db(db.tt.aa.like("2%")).count(), 0) |
---|
1142 | self.assertEqual(db(db.tt.aa.like("_2%")).count(), 1) |
---|
1143 | self.assertEqual(db(db.tt.aa.like("12%")).count(), 1) |
---|
1144 | self.assertEqual(db(db.tt.aa.like("012%")).count(), 0) |
---|
1145 | self.assertEqual(db(db.tt.aa.like("%45%")).count(), 1) |
---|
1146 | self.assertEqual(db(db.tt.aa.like("%54%")).count(), 0) |
---|
1147 | |
---|
1148 | |
---|
1149 | @unittest.skipIf(IS_IMAP, "TODO: IMAP test") |
---|
1150 | class TestDatetime(unittest.TestCase): |
---|
1151 | def testRun(self): |
---|
1152 | db = DAL(DEFAULT_URI, check_reserved=["all"]) |
---|
1153 | db.define_table("tt", Field("aa", "datetime")) |
---|
1154 | self.assertEqual( |
---|
1155 | isinstance(db.tt.insert(aa=datetime.datetime(1971, 12, 21, 11, 30)), long), |
---|
1156 | True, |
---|
1157 | ) |
---|
1158 | self.assertEqual( |
---|
1159 | isinstance(db.tt.insert(aa=datetime.datetime(1971, 11, 21, 10, 30)), long), |
---|
1160 | True, |
---|
1161 | ) |
---|
1162 | self.assertEqual( |
---|
1163 | isinstance(db.tt.insert(aa=datetime.datetime(1970, 12, 21, 9, 31)), long), |
---|
1164 | True, |
---|
1165 | ) |
---|
1166 | self.assertEqual( |
---|
1167 | db(db.tt.aa == datetime.datetime(1971, 12, 21, 11, 30)).count(), 1 |
---|
1168 | ) |
---|
1169 | self.assertEqual(db(db.tt.aa >= datetime.datetime(1971, 1, 1)).count(), 2) |
---|
1170 | |
---|
1171 | if IS_MONGODB: |
---|
1172 | self.assertEqual(db(db.tt.aa.year() == 1971).count(), 2) |
---|
1173 | self.assertEqual(db(db.tt.aa.month() > 11).count(), 2) |
---|
1174 | self.assertEqual(db(db.tt.aa.day() >= 21).count(), 3) |
---|
1175 | self.assertEqual(db(db.tt.aa.hour() < 10).count(), 1) |
---|
1176 | self.assertEqual(db(db.tt.aa.minutes() <= 30).count(), 2) |
---|
1177 | self.assertEqual(db(db.tt.aa.seconds() != 31).count(), 3) |
---|
1178 | self.assertEqual(db(db.tt.aa.epoch() < 365 * 24 * 3600).delete(), 1) |
---|
1179 | drop(db.tt) |
---|
1180 | |
---|
1181 | db.define_table("tt", Field("aa", "time")) |
---|
1182 | t0 = datetime.time(10, 30, 55) |
---|
1183 | db.tt.insert(aa=t0) |
---|
1184 | self.assertEqual(db().select(db.tt.aa)[0].aa, t0) |
---|
1185 | drop(db.tt) |
---|
1186 | |
---|
1187 | db.define_table("tt", Field("aa", "date")) |
---|
1188 | t0 = datetime.date.today() |
---|
1189 | db.tt.insert(aa=t0) |
---|
1190 | self.assertEqual(db().select(db.tt.aa)[0].aa, t0) |
---|
1191 | drop(db.tt) |
---|
1192 | db.close() |
---|
1193 | |
---|
1194 | |
---|
1195 | @unittest.skipIf(IS_GAE or IS_IMAP, "Expressions are not supported") |
---|
1196 | class TestExpressions(unittest.TestCase): |
---|
1197 | def testRun(self): |
---|
1198 | if IS_MONGODB: |
---|
1199 | DAL_OPTS = ( |
---|
1200 | (True, {"adapter_args": {"safe": True}}), |
---|
1201 | (False, {"adapter_args": {"safe": False}}), |
---|
1202 | ) |
---|
1203 | for dal_opt in DAL_OPTS: |
---|
1204 | db = DAL(DEFAULT_URI, check_reserved=["all"], **dal_opt[1]) |
---|
1205 | db.define_table( |
---|
1206 | "tt", |
---|
1207 | Field("aa", "integer"), |
---|
1208 | Field("bb", "integer", default=0), |
---|
1209 | Field("cc"), |
---|
1210 | ) |
---|
1211 | self.assertEqual(isinstance(db.tt.insert(aa=1), long), dal_opt[0]) |
---|
1212 | self.assertEqual(isinstance(db.tt.insert(aa=2), long), dal_opt[0]) |
---|
1213 | self.assertEqual(isinstance(db.tt.insert(aa=3), long), dal_opt[0]) |
---|
1214 | |
---|
1215 | # test update |
---|
1216 | self.assertEqual( |
---|
1217 | db(db.tt.aa == 3).update(aa=db.tt.aa + 1, bb=db.tt.bb + 2), 1 |
---|
1218 | ) |
---|
1219 | self.assertEqual(db(db.tt.aa == 4).count(), 1) |
---|
1220 | self.assertEqual(db(db.tt.bb == 2).count(), 1) |
---|
1221 | self.assertEqual(db(db.tt.aa == -2).count(), 0) |
---|
1222 | self.assertEqual(db(db.tt.aa == 4).update(aa=db.tt.aa * 2, bb=5), 1) |
---|
1223 | self.assertEqual(db(db.tt.bb == 5).count(), 1) |
---|
1224 | self.assertEqual(db(db.tt.aa + 1 == 9).count(), 1) |
---|
1225 | self.assertEqual(db(db.tt.aa + 1 == 9).update(aa=db.tt.aa - 2, cc="cc"), 1) |
---|
1226 | self.assertEqual(db(db.tt.cc == "cc").count(), 1) |
---|
1227 | self.assertEqual(db(db.tt.aa == 6).count(), 1) |
---|
1228 | self.assertEqual(db(db.tt.aa == 6).update(bb=db.tt.aa * (db.tt.bb - 3)), 1) |
---|
1229 | self.assertEqual(db(db.tt.bb == 12).count(), 1) |
---|
1230 | self.assertEqual(db(db.tt.aa == 6).count(), 1) |
---|
1231 | self.assertEqual( |
---|
1232 | db(db.tt.aa == 6).update(aa=db.tt.aa % 4 + 1, cc=db.tt.cc + "1" + "1"), |
---|
1233 | 1, |
---|
1234 | ) |
---|
1235 | self.assertEqual(db(db.tt.cc == "cc11").count(), 1) |
---|
1236 | self.assertEqual(db(db.tt.aa == 3).count(), 1) |
---|
1237 | |
---|
1238 | # test comparsion expression based count |
---|
1239 | self.assertEqual(db(db.tt.aa != db.tt.aa).count(), 0) |
---|
1240 | self.assertEqual(db(db.tt.aa == db.tt.aa).count(), 3) |
---|
1241 | |
---|
1242 | # test select aggregations |
---|
1243 | sum = (db.tt.aa + 1).sum() |
---|
1244 | self.assertEqual(db(db.tt.aa + 1 >= 3).select(sum).first()[sum], 7) |
---|
1245 | self.assertEqual(db((1 == 0) & (db.tt.aa >= db.tt.aa)).count(), 0) |
---|
1246 | self.assertEqual(db(db.tt.aa * 2 == -2).select(sum).first()[sum], None) |
---|
1247 | |
---|
1248 | count = db.tt.aa.count() |
---|
1249 | avg = db.tt.aa.avg() |
---|
1250 | min = db.tt.aa.min() |
---|
1251 | max = db.tt.aa.max() |
---|
1252 | result = db(db.tt).select(sum, count, avg, min, max).first() |
---|
1253 | self.assertEqual(result[sum], 9) |
---|
1254 | self.assertEqual(result[count], 3) |
---|
1255 | self.assertEqual(result[avg], 2) |
---|
1256 | self.assertEqual(result[min], 1) |
---|
1257 | self.assertEqual(result[max], 3) |
---|
1258 | |
---|
1259 | # Test basic expressions evaluated at python level |
---|
1260 | self.assertEqual(db((1 == 1) & (db.tt.aa >= 2)).count(), 2) |
---|
1261 | self.assertEqual(db((1 == 1) | (db.tt.aa >= 2)).count(), 3) |
---|
1262 | self.assertEqual(db((1 == 0) & (db.tt.aa >= 2)).count(), 0) |
---|
1263 | self.assertEqual(db((1 == 0) | (db.tt.aa >= 2)).count(), 2) |
---|
1264 | |
---|
1265 | # test abs() |
---|
1266 | self.assertEqual(db(db.tt.aa == 2).update(aa=db.tt.aa * -10), 1) |
---|
1267 | abs = db.tt.aa.abs().with_alias("abs") |
---|
1268 | result = db(db.tt.aa == -20).select(abs).first() |
---|
1269 | self.assertEqual(result[abs], 20) |
---|
1270 | self.assertEqual(result["abs"], 20) |
---|
1271 | abs = db.tt.aa.abs() / 10 + 5 |
---|
1272 | exp = abs.min() * 2 + 1 |
---|
1273 | result = db(db.tt.aa == -20).select(exp).first() |
---|
1274 | self.assertEqual(result[exp], 15) |
---|
1275 | |
---|
1276 | # test case() |
---|
1277 | condition = db.tt.aa > 2 |
---|
1278 | case = condition.case(db.tt.aa + 2, db.tt.aa - 2) |
---|
1279 | my_case = case.with_alias("my_case") |
---|
1280 | result = db().select(my_case) |
---|
1281 | self.assertEqual(len(result), 3) |
---|
1282 | self.assertEqual(result[0][my_case], -1) |
---|
1283 | self.assertEqual(result[0]["my_case"], -1) |
---|
1284 | self.assertEqual(result[1]["my_case"], -22) |
---|
1285 | self.assertEqual(result[2]["my_case"], 5) |
---|
1286 | |
---|
1287 | # test expression based delete |
---|
1288 | self.assertEqual(db(db.tt.aa + 1 >= 4).count(), 1) |
---|
1289 | self.assertEqual(db(db.tt.aa + 1 >= 4).delete(), 1) |
---|
1290 | self.assertEqual(db(db.tt.aa).count(), 2) |
---|
1291 | |
---|
1292 | # cleanup |
---|
1293 | drop(db.tt) |
---|
1294 | db.close() |
---|
1295 | |
---|
1296 | def testUpdate(self): |
---|
1297 | db = DAL(DEFAULT_URI, check_reserved=["all"]) |
---|
1298 | |
---|
1299 | # some db's only support seconds |
---|
1300 | datetime_datetime_today = datetime.datetime.today() |
---|
1301 | datetime_datetime_today = datetime_datetime_today.replace(microsecond=0) |
---|
1302 | one_day = datetime.timedelta(1) |
---|
1303 | one_sec = datetime.timedelta(0, 1) |
---|
1304 | |
---|
1305 | update_vals = ( |
---|
1306 | ("string", "x", "y"), |
---|
1307 | ("text", "x", "y"), |
---|
1308 | ("password", "x", "y"), |
---|
1309 | ("integer", 1, 2), |
---|
1310 | ("bigint", 1, 2), |
---|
1311 | ("float", 1.0, 2.0), |
---|
1312 | ("double", 1.0, 2.0), |
---|
1313 | ("boolean", True, False), |
---|
1314 | ("date", datetime.date.today(), datetime.date.today() + one_day), |
---|
1315 | ( |
---|
1316 | "datetime", |
---|
1317 | datetime.datetime(1971, 12, 21, 10, 30, 55, 0), |
---|
1318 | datetime_datetime_today, |
---|
1319 | ), |
---|
1320 | ( |
---|
1321 | "time", |
---|
1322 | datetime_datetime_today.time(), |
---|
1323 | (datetime_datetime_today + one_sec).time(), |
---|
1324 | ), |
---|
1325 | ) |
---|
1326 | |
---|
1327 | for uv in update_vals: |
---|
1328 | db.define_table("tt", Field("aa", "integer", default=0), Field("bb", uv[0])) |
---|
1329 | self.assertTrue(isinstance(db.tt.insert(bb=uv[1]), long)) |
---|
1330 | self.assertEqual(db(db.tt.aa + 1 == 1).select(db.tt.bb)[0].bb, uv[1]) |
---|
1331 | self.assertEqual(db(db.tt.aa + 1 == 1).update(bb=uv[2]), 1) |
---|
1332 | self.assertEqual(db(db.tt.aa / 3 == 0).select(db.tt.bb)[0].bb, uv[2]) |
---|
1333 | db.tt.drop() |
---|
1334 | db.close() |
---|
1335 | |
---|
1336 | def testSubstring(self): |
---|
1337 | if IS_MONGODB: |
---|
1338 | # MongoDB does not support string length |
---|
1339 | end = 3 |
---|
1340 | else: |
---|
1341 | end = -2 |
---|
1342 | db = DAL(DEFAULT_URI, check_reserved=["all"]) |
---|
1343 | t0 = db.define_table("t0", Field("name")) |
---|
1344 | input_name = "web2py" |
---|
1345 | t0.insert(name=input_name) |
---|
1346 | exp_slice = t0.name.lower()[4:6] |
---|
1347 | exp_slice_no_max = t0.name.lower()[4:] |
---|
1348 | exp_slice_neg_max = t0.name.lower()[2:end] |
---|
1349 | exp_slice_neg_start = t0.name.lower()[end:] |
---|
1350 | exp_item = t0.name.lower()[3] |
---|
1351 | out = ( |
---|
1352 | db(t0) |
---|
1353 | .select( |
---|
1354 | exp_slice, |
---|
1355 | exp_item, |
---|
1356 | exp_slice_no_max, |
---|
1357 | exp_slice_neg_max, |
---|
1358 | exp_slice_neg_start, |
---|
1359 | ) |
---|
1360 | .first() |
---|
1361 | ) |
---|
1362 | self.assertEqual(out[exp_slice], input_name[4:6]) |
---|
1363 | self.assertEqual(out[exp_item], input_name[3]) |
---|
1364 | self.assertEqual(out[exp_slice_no_max], input_name[4:]) |
---|
1365 | self.assertEqual(out[exp_slice_neg_max], input_name[2:end]) |
---|
1366 | self.assertEqual(out[exp_slice_neg_start], input_name[end:]) |
---|
1367 | t0.drop() |
---|
1368 | db.close() |
---|
1369 | |
---|
1370 | def testOps(self): |
---|
1371 | db = DAL(DEFAULT_URI, check_reserved=["all"]) |
---|
1372 | t0 = db.define_table("t0", Field("vv", "integer")) |
---|
1373 | self.assertTrue(isinstance(db.t0.insert(vv=1), long)) |
---|
1374 | self.assertTrue(isinstance(db.t0.insert(vv=2), long)) |
---|
1375 | self.assertTrue(isinstance(db.t0.insert(vv=3), long)) |
---|
1376 | sum = db.t0.vv.sum() |
---|
1377 | count = db.t0.vv.count() |
---|
1378 | avg = db.t0.vv.avg() |
---|
1379 | op = sum / count |
---|
1380 | op1 = (sum / count).with_alias("tot") |
---|
1381 | self.assertEqual(db(t0).select(op).first()[op], 2) |
---|
1382 | self.assertEqual(db(t0).select(op1).first()[op1], 2) |
---|
1383 | self.assertEqual(db(t0).select(op1).first()["tot"], 2) |
---|
1384 | op2 = avg * count |
---|
1385 | self.assertEqual(db(t0).select(op2).first()[op2], 6) |
---|
1386 | # the following is not possible at least on sqlite |
---|
1387 | sum = db.t0.vv.sum().with_alias("s") |
---|
1388 | count = db.t0.vv.count().with_alias("c") |
---|
1389 | op = sum / count |
---|
1390 | with self.assertRaises(SyntaxError): |
---|
1391 | self.assertEqual(db(t0).select(op).first()[op], 2) |
---|
1392 | t0.drop() |
---|
1393 | db.close() |
---|
1394 | |
---|
1395 | |
---|
1396 | @unittest.skip("JOIN queries are not supported") |
---|
1397 | class TestJoin(unittest.TestCase): |
---|
1398 | def testRun(self): |
---|
1399 | db = DAL(DEFAULT_URI, check_reserved=["all"]) |
---|
1400 | db.define_table("t1", Field("aa")) |
---|
1401 | db.define_table("t2", Field("aa"), Field("b", db.t1)) |
---|
1402 | i1 = db.t1.insert(aa="1") |
---|
1403 | i2 = db.t1.insert(aa="2") |
---|
1404 | i3 = db.t1.insert(aa="3") |
---|
1405 | db.t2.insert(aa="4", b=i1) |
---|
1406 | db.t2.insert(aa="5", b=i2) |
---|
1407 | db.t2.insert(aa="6", b=i2) |
---|
1408 | self.assertEqual( |
---|
1409 | len(db(db.t1.id == db.t2.b).select(orderby=db.t1.aa | db.t2.aa)), 3 |
---|
1410 | ) |
---|
1411 | self.assertEqual( |
---|
1412 | db(db.t1.id == db.t2.b).select(orderby=db.t1.aa | db.t2.aa)[2].t1.aa, "2" |
---|
1413 | ) |
---|
1414 | self.assertEqual( |
---|
1415 | db(db.t1.id == db.t2.b).select(orderby=db.t1.aa | db.t2.aa)[2].t2.aa, "6" |
---|
1416 | ) |
---|
1417 | self.assertEqual( |
---|
1418 | len( |
---|
1419 | db().select( |
---|
1420 | db.t1.ALL, |
---|
1421 | db.t2.ALL, |
---|
1422 | left=db.t2.on(db.t1.id == db.t2.b), |
---|
1423 | orderby=db.t1.aa | db.t2.aa, |
---|
1424 | ) |
---|
1425 | ), |
---|
1426 | 4, |
---|
1427 | ) |
---|
1428 | self.assertEqual( |
---|
1429 | db() |
---|
1430 | .select( |
---|
1431 | db.t1.ALL, |
---|
1432 | db.t2.ALL, |
---|
1433 | left=db.t2.on(db.t1.id == db.t2.b), |
---|
1434 | orderby=db.t1.aa | db.t2.aa, |
---|
1435 | )[2] |
---|
1436 | .t1.aa, |
---|
1437 | "2", |
---|
1438 | ) |
---|
1439 | self.assertEqual( |
---|
1440 | db() |
---|
1441 | .select( |
---|
1442 | db.t1.ALL, |
---|
1443 | db.t2.ALL, |
---|
1444 | left=db.t2.on(db.t1.id == db.t2.b), |
---|
1445 | orderby=db.t1.aa | db.t2.aa, |
---|
1446 | )[2] |
---|
1447 | .t2.aa, |
---|
1448 | "6", |
---|
1449 | ) |
---|
1450 | self.assertEqual( |
---|
1451 | db() |
---|
1452 | .select( |
---|
1453 | db.t1.ALL, |
---|
1454 | db.t2.ALL, |
---|
1455 | left=db.t2.on(db.t1.id == db.t2.b), |
---|
1456 | orderby=db.t1.aa | db.t2.aa, |
---|
1457 | )[3] |
---|
1458 | .t1.aa, |
---|
1459 | "3", |
---|
1460 | ) |
---|
1461 | self.assertEqual( |
---|
1462 | db() |
---|
1463 | .select( |
---|
1464 | db.t1.ALL, |
---|
1465 | db.t2.ALL, |
---|
1466 | left=db.t2.on(db.t1.id == db.t2.b), |
---|
1467 | orderby=db.t1.aa | db.t2.aa, |
---|
1468 | )[3] |
---|
1469 | .t2.aa, |
---|
1470 | None, |
---|
1471 | ) |
---|
1472 | self.assertEqual( |
---|
1473 | len( |
---|
1474 | db().select( |
---|
1475 | db.t1.aa, |
---|
1476 | db.t2.id.count(), |
---|
1477 | left=db.t2.on(db.t1.id == db.t2.b), |
---|
1478 | orderby=db.t1.aa, |
---|
1479 | groupby=db.t1.aa, |
---|
1480 | ) |
---|
1481 | ), |
---|
1482 | 3, |
---|
1483 | ) |
---|
1484 | self.assertEqual( |
---|
1485 | db() |
---|
1486 | .select( |
---|
1487 | db.t1.aa, |
---|
1488 | db.t2.id.count(), |
---|
1489 | left=db.t2.on(db.t1.id == db.t2.b), |
---|
1490 | orderby=db.t1.aa, |
---|
1491 | groupby=db.t1.aa, |
---|
1492 | )[0] |
---|
1493 | ._extra[db.t2.id.count()], |
---|
1494 | 1, |
---|
1495 | ) |
---|
1496 | self.assertEqual( |
---|
1497 | db() |
---|
1498 | .select( |
---|
1499 | db.t1.aa, |
---|
1500 | db.t2.id.count(), |
---|
1501 | left=db.t2.on(db.t1.id == db.t2.b), |
---|
1502 | orderby=db.t1.aa, |
---|
1503 | groupby=db.t1.aa, |
---|
1504 | )[1] |
---|
1505 | ._extra[db.t2.id.count()], |
---|
1506 | 2, |
---|
1507 | ) |
---|
1508 | self.assertEqual( |
---|
1509 | db() |
---|
1510 | .select( |
---|
1511 | db.t1.aa, |
---|
1512 | db.t2.id.count(), |
---|
1513 | left=db.t2.on(db.t1.id == db.t2.b), |
---|
1514 | orderby=db.t1.aa, |
---|
1515 | groupby=db.t1.aa, |
---|
1516 | )[2] |
---|
1517 | ._extra[db.t2.id.count()], |
---|
1518 | 0, |
---|
1519 | ) |
---|
1520 | drop(db.t2) |
---|
1521 | drop(db.t1) |
---|
1522 | |
---|
1523 | db.define_table("person", Field("name")) |
---|
1524 | id = db.person.insert(name="max") |
---|
1525 | self.assertEqual(id.name, "max") |
---|
1526 | db.define_table("dog", Field("name"), Field("ownerperson", "reference person")) |
---|
1527 | db.dog.insert(name="skipper", ownerperson=1) |
---|
1528 | row = db(db.person.id == db.dog.ownerperson).select().first() |
---|
1529 | self.assertEqual(row[db.person.name], "max") |
---|
1530 | self.assertEqual(row["person.name"], "max") |
---|
1531 | drop(db.dog) |
---|
1532 | self.assertEqual(len(db.person._referenced_by), 0) |
---|
1533 | drop(db.person) |
---|
1534 | db.close() |
---|
1535 | |
---|
1536 | |
---|
1537 | @unittest.skipIf( |
---|
1538 | IS_GAE or IS_IMAP, |
---|
1539 | 'TODO: Datastore throws "AttributeError: Row object has no attribute _extra"', |
---|
1540 | ) |
---|
1541 | class TestMinMaxSumAvg(unittest.TestCase): |
---|
1542 | def testRun(self): |
---|
1543 | db = DAL(DEFAULT_URI, check_reserved=["all"]) |
---|
1544 | db.define_table("tt", Field("aa", "integer")) |
---|
1545 | self.assertEqual(isinstance(db.tt.insert(aa=1), long), True) |
---|
1546 | self.assertEqual(isinstance(db.tt.insert(aa=2), long), True) |
---|
1547 | self.assertEqual(isinstance(db.tt.insert(aa=3), long), True) |
---|
1548 | s = db.tt.aa.min() |
---|
1549 | self.assertEqual(db(db.tt.id > 0).select(s)[0]._extra[s], 1) |
---|
1550 | self.assertEqual(db(db.tt.id > 0).select(s).first()[s], 1) |
---|
1551 | self.assertEqual(db().select(s).first()[s], 1) |
---|
1552 | s = db.tt.aa.max() |
---|
1553 | self.assertEqual(db().select(s).first()[s], 3) |
---|
1554 | s = db.tt.aa.sum() |
---|
1555 | self.assertEqual(db().select(s).first()[s], 6) |
---|
1556 | s = db.tt.aa.count() |
---|
1557 | self.assertEqual(db().select(s).first()[s], 3) |
---|
1558 | s = db.tt.aa.avg() |
---|
1559 | self.assertEqual(db().select(s).first()[s], 2) |
---|
1560 | drop(db.tt) |
---|
1561 | db.close() |
---|
1562 | |
---|
1563 | |
---|
1564 | @unittest.skipIf(IS_IMAP, "Skip IMAP") |
---|
1565 | class TestMigrations(unittest.TestCase): |
---|
1566 | def testRun(self): |
---|
1567 | db = DAL(DEFAULT_URI, check_reserved=["all"]) |
---|
1568 | db.define_table("tt", Field("aa"), migrate=".storage.table") |
---|
1569 | db.commit() |
---|
1570 | db.close() |
---|
1571 | db = DAL(DEFAULT_URI, check_reserved=["all"]) |
---|
1572 | db.define_table("tt", Field("aa"), Field("b"), migrate=".storage.table") |
---|
1573 | db.commit() |
---|
1574 | db.close() |
---|
1575 | db = DAL(DEFAULT_URI, check_reserved=["all"]) |
---|
1576 | db.define_table("tt", Field("aa"), Field("b", "text"), migrate=".storage.table") |
---|
1577 | db.commit() |
---|
1578 | db.close() |
---|
1579 | db = DAL(DEFAULT_URI, check_reserved=["all"]) |
---|
1580 | db.define_table("tt", Field("aa"), migrate=".storage.table") |
---|
1581 | drop(db.tt) |
---|
1582 | db.commit() |
---|
1583 | db.close() |
---|
1584 | |
---|
1585 | def tearDown(self): |
---|
1586 | if os.path.exists(".storage.db"): |
---|
1587 | os.unlink(".storage.db") |
---|
1588 | if os.path.exists(".storage.table"): |
---|
1589 | os.unlink(".storage.table") |
---|
1590 | |
---|
1591 | |
---|
1592 | @unittest.skipIf(IS_IMAP, "Skip IMAP") |
---|
1593 | class TestReference(unittest.TestCase): |
---|
1594 | def testRun(self): |
---|
1595 | scenarios = ( |
---|
1596 | (True, "CASCADE"), |
---|
1597 | (False, "CASCADE"), |
---|
1598 | (False, "SET NULL"), |
---|
1599 | ) |
---|
1600 | for (b, ondelete) in scenarios: |
---|
1601 | db = DAL(DEFAULT_URI, check_reserved=["all"], bigint_id=b) |
---|
1602 | db.define_table( |
---|
1603 | "tt", Field("name"), Field("aa", "reference tt", ondelete=ondelete) |
---|
1604 | ) |
---|
1605 | db.commit() |
---|
1606 | x = db.tt.insert(name="xxx") |
---|
1607 | self.assertTrue(isinstance(x, long)) |
---|
1608 | self.assertEqual(x.id, x) |
---|
1609 | self.assertEqual(x["id"], x) |
---|
1610 | x.aa = x |
---|
1611 | x.update_record() |
---|
1612 | x1 = db.tt[x] |
---|
1613 | self.assertEqual(x1.aa, x) |
---|
1614 | self.assertEqual(x1.aa.aa.aa.aa.aa.aa.name, "xxx") |
---|
1615 | y = db.tt.insert(name="yyy", aa=x1) |
---|
1616 | self.assertEqual(y.aa, x1.id) |
---|
1617 | self.assertTrue(isinstance(db.tt.insert(name="zzz"), long)) |
---|
1618 | self.assertEqual(db(db.tt.name).count(), 3) |
---|
1619 | if IS_MONGODB: |
---|
1620 | db(db.tt.id == x).delete() |
---|
1621 | expected_count = { |
---|
1622 | "SET NULL": 2, |
---|
1623 | "CASCADE": 1, |
---|
1624 | } |
---|
1625 | self.assertEqual(db(db.tt.name).count(), expected_count[ondelete]) |
---|
1626 | if ondelete == "SET NULL": |
---|
1627 | self.assertEqual(db(db.tt.name == "yyy").select()[0].aa, None) |
---|
1628 | drop(db.tt) |
---|
1629 | db.commit() |
---|
1630 | db.close() |
---|
1631 | |
---|
1632 | |
---|
1633 | @unittest.skipIf(IS_IMAP, "Skip IMAP") |
---|
1634 | class TestClientLevelOps(unittest.TestCase): |
---|
1635 | def testRun(self): |
---|
1636 | db = DAL(DEFAULT_URI, check_reserved=["all"]) |
---|
1637 | db.define_table("tt", Field("aa")) |
---|
1638 | db.commit() |
---|
1639 | db.tt.insert(aa="test") |
---|
1640 | rows1 = db(db.tt.aa == "test").select() |
---|
1641 | rows2 = db(db.tt.aa == "test").select() |
---|
1642 | rows3 = rows1 + rows2 |
---|
1643 | assert len(rows3) == 2 |
---|
1644 | rows4 = rows1 & rows2 |
---|
1645 | assert len(rows4) == 1 |
---|
1646 | rows5 = rows1 | rows2 |
---|
1647 | assert len(rows5) == 1 |
---|
1648 | rows6 = rows1.find(lambda row: row.aa == "test") |
---|
1649 | assert len(rows6) == 1 |
---|
1650 | rows7 = rows2.exclude(lambda row: row.aa == "test") |
---|
1651 | assert len(rows7) == 1 |
---|
1652 | rows8 = rows5.sort(lambda row: row.aa) |
---|
1653 | assert len(rows8) == 1 |
---|
1654 | drop(db.tt) |
---|
1655 | db.commit() |
---|
1656 | db.close() |
---|
1657 | |
---|
1658 | |
---|
1659 | @unittest.skipIf(IS_IMAP, "TODO: IMAP test") |
---|
1660 | class TestVirtualFields(unittest.TestCase): |
---|
1661 | def testRun(self): |
---|
1662 | db = DAL(DEFAULT_URI, check_reserved=["all"]) |
---|
1663 | db.define_table("tt", Field("aa")) |
---|
1664 | db.commit() |
---|
1665 | db.tt.insert(aa="test") |
---|
1666 | |
---|
1667 | class Compute: |
---|
1668 | def a_upper(row): |
---|
1669 | return row.tt.aa.upper() |
---|
1670 | |
---|
1671 | db.tt.virtualfields.append(Compute()) |
---|
1672 | assert db(db.tt.id > 0).select().first().a_upper == "TEST" |
---|
1673 | drop(db.tt) |
---|
1674 | db.commit() |
---|
1675 | db.close() |
---|
1676 | |
---|
1677 | |
---|
1678 | @unittest.skipIf(IS_IMAP, "TODO: IMAP test") |
---|
1679 | class TestComputedFields(unittest.TestCase): |
---|
1680 | def testRun(self): |
---|
1681 | db = DAL(DEFAULT_URI, check_reserved=["all"]) |
---|
1682 | db.define_table( |
---|
1683 | "tt", |
---|
1684 | Field("aa"), |
---|
1685 | Field("bb", default="x"), |
---|
1686 | Field("cc", compute=lambda r: r.aa + r.bb), |
---|
1687 | ) |
---|
1688 | db.commit() |
---|
1689 | id = db.tt.insert(aa="z") |
---|
1690 | self.assertEqual(db.tt[id].cc, "zx") |
---|
1691 | drop(db.tt) |
---|
1692 | db.commit() |
---|
1693 | |
---|
1694 | # test checking that a compute field can refer to earlier-defined computed fields |
---|
1695 | db.define_table( |
---|
1696 | "tt", |
---|
1697 | Field("aa"), |
---|
1698 | Field("bb", default="x"), |
---|
1699 | Field("cc", compute=lambda r: r.aa + r.bb), |
---|
1700 | Field("dd", compute=lambda r: r.bb + r.cc), |
---|
1701 | ) |
---|
1702 | db.commit() |
---|
1703 | id = db.tt.insert(aa="z") |
---|
1704 | self.assertEqual(db.tt[id].dd, "xzx") |
---|
1705 | drop(db.tt) |
---|
1706 | db.commit() |
---|
1707 | db.close() |
---|
1708 | |
---|
1709 | |
---|
1710 | @unittest.skipIf(IS_IMAP, "TODO: IMAP test") |
---|
1711 | class TestCommonFilters(unittest.TestCase): |
---|
1712 | def testRun(self): |
---|
1713 | db = DAL(DEFAULT_URI, check_reserved=["all"]) |
---|
1714 | db.define_table("t1", Field("aa", "integer")) |
---|
1715 | db.define_table("t2", Field("aa", "integer"), Field("b", db.t1)) |
---|
1716 | i1 = db.t1.insert(aa=1) |
---|
1717 | i2 = db.t1.insert(aa=2) |
---|
1718 | i3 = db.t1.insert(aa=3) |
---|
1719 | db.t2.insert(aa=4, b=i1) |
---|
1720 | db.t2.insert(aa=5, b=i2) |
---|
1721 | db.t2.insert(aa=6, b=i2) |
---|
1722 | db.t1._common_filter = lambda q: db.t1.aa > 1 |
---|
1723 | self.assertEqual(db(db.t1).count(), 2) |
---|
1724 | self.assertEqual(db(db.t1).count(), 2) |
---|
1725 | db.t2._common_filter = lambda q: db.t2.aa < 6 |
---|
1726 | # test delete |
---|
1727 | self.assertEqual(db(db.t2).count(), 2) |
---|
1728 | db(db.t2).delete() |
---|
1729 | self.assertEqual(db(db.t2).count(), 0) |
---|
1730 | db.t2._common_filter = None |
---|
1731 | self.assertEqual(db(db.t2).count(), 1) |
---|
1732 | # test update |
---|
1733 | db.t2.insert(aa=4, b=i1) |
---|
1734 | db.t2.insert(aa=5, b=i2) |
---|
1735 | db.t2._common_filter = lambda q: db.t2.aa < 6 |
---|
1736 | self.assertEqual(db(db.t2).count(), 2) |
---|
1737 | db(db.t2).update(aa=6) |
---|
1738 | self.assertEqual(db(db.t2).count(), 0) |
---|
1739 | db.t2._common_filter = None |
---|
1740 | self.assertEqual(db(db.t2).count(), 3) |
---|
1741 | drop(db.t2) |
---|
1742 | drop(db.t1) |
---|
1743 | db.close() |
---|
1744 | |
---|
1745 | |
---|
1746 | @unittest.skipIf(IS_IMAP, "Skip IMAP test") |
---|
1747 | class TestImportExportFields(unittest.TestCase): |
---|
1748 | def testRun(self): |
---|
1749 | db = DAL(DEFAULT_URI, check_reserved=["all"]) |
---|
1750 | db.define_table("person", Field("name")) |
---|
1751 | db.define_table("pet", Field("friend", db.person), Field("name")) |
---|
1752 | for n in range(2): |
---|
1753 | db(db.pet).delete() |
---|
1754 | db(db.person).delete() |
---|
1755 | for k in range(10): |
---|
1756 | id = db.person.insert(name=str(k)) |
---|
1757 | db.pet.insert(friend=id, name=str(k)) |
---|
1758 | db.commit() |
---|
1759 | stream = StringIO() |
---|
1760 | db.export_to_csv_file(stream) |
---|
1761 | db(db.pet).delete() |
---|
1762 | db(db.person).delete() |
---|
1763 | stream = StringIO(stream.getvalue()) |
---|
1764 | db.import_from_csv_file(stream) |
---|
1765 | assert db(db.person).count() == 10 |
---|
1766 | assert db(db.pet.name).count() == 10 |
---|
1767 | drop(db.pet) |
---|
1768 | drop(db.person) |
---|
1769 | db.commit() |
---|
1770 | db.close() |
---|
1771 | |
---|
1772 | |
---|
1773 | @unittest.skipIf(IS_IMAP, "Skip IMAP test") |
---|
1774 | class TestImportExportUuidFields(unittest.TestCase): |
---|
1775 | def testRun(self): |
---|
1776 | db = DAL(DEFAULT_URI, check_reserved=["all"]) |
---|
1777 | db.define_table("person", Field("name"), Field("uuid")) |
---|
1778 | db.define_table("pet", Field("friend", db.person), Field("name")) |
---|
1779 | for n in range(2): |
---|
1780 | db(db.pet).delete() |
---|
1781 | db(db.person).delete() |
---|
1782 | for k in range(10): |
---|
1783 | id = db.person.insert(name=str(k), uuid=str(k)) |
---|
1784 | db.pet.insert(friend=id, name=str(k)) |
---|
1785 | db.commit() |
---|
1786 | stream = StringIO() |
---|
1787 | db.export_to_csv_file(stream) |
---|
1788 | db(db.person).delete() |
---|
1789 | db(db.pet).delete() |
---|
1790 | stream = StringIO(stream.getvalue()) |
---|
1791 | db.import_from_csv_file(stream) |
---|
1792 | assert db(db.person).count() == 10 |
---|
1793 | assert db(db.pet).count() == 10 |
---|
1794 | drop(db.pet) |
---|
1795 | drop(db.person) |
---|
1796 | db.commit() |
---|
1797 | db.close() |
---|
1798 | |
---|
1799 | |
---|
1800 | @unittest.skipIf(IS_IMAP, "Skip IMAP test") |
---|
1801 | class TestDALDictImportExport(unittest.TestCase): |
---|
1802 | def testRun(self): |
---|
1803 | db = DAL(DEFAULT_URI, check_reserved=["all"]) |
---|
1804 | db.define_table("person", Field("name", default="Michael"), Field("uuid")) |
---|
1805 | db.define_table("pet", Field("friend", db.person), Field("name")) |
---|
1806 | dbdict = db.as_dict(flat=True, sanitize=False) |
---|
1807 | assert isinstance(dbdict, dict) |
---|
1808 | uri = dbdict["uri"] |
---|
1809 | assert isinstance(uri, basestring) and uri |
---|
1810 | assert len(dbdict["tables"]) == 2 |
---|
1811 | assert len(dbdict["tables"][0]["fields"]) == 3 |
---|
1812 | assert dbdict["tables"][0]["fields"][1]["type"] == db.person.name.type |
---|
1813 | assert dbdict["tables"][0]["fields"][1]["default"] == db.person.name.default |
---|
1814 | |
---|
1815 | db2 = DAL(**dbdict) |
---|
1816 | assert len(db.tables) == len(db2.tables) |
---|
1817 | assert hasattr(db2, "pet") and isinstance(db2.pet, Table) |
---|
1818 | assert hasattr(db2.pet, "friend") and isinstance(db2.pet.friend, Field) |
---|
1819 | drop(db.pet) |
---|
1820 | db.commit() |
---|
1821 | |
---|
1822 | db2.commit() |
---|
1823 | |
---|
1824 | have_serializers = True |
---|
1825 | try: |
---|
1826 | import serializers |
---|
1827 | |
---|
1828 | dbjson = db.as_json(sanitize=False) |
---|
1829 | assert isinstance(dbjson, basestring) and len(dbjson) > 0 |
---|
1830 | |
---|
1831 | unicode_keys = True |
---|
1832 | if sys.version < "2.6.5": |
---|
1833 | unicode_keys = False |
---|
1834 | db3 = DAL(**serializers.loads_json(dbjson, unicode_keys=unicode_keys)) |
---|
1835 | assert ( |
---|
1836 | hasattr(db3, "person") |
---|
1837 | and hasattr(db3.person, "uuid") |
---|
1838 | and db3.person.uuid.type == db.person.uuid.type |
---|
1839 | ) |
---|
1840 | drop(db3.person) |
---|
1841 | db3.commit() |
---|
1842 | db3.close() |
---|
1843 | except ImportError: |
---|
1844 | pass |
---|
1845 | |
---|
1846 | mpfc = "Monty Python's Flying Circus" |
---|
1847 | dbdict4 = { |
---|
1848 | "uri": DEFAULT_URI, |
---|
1849 | "tables": [ |
---|
1850 | { |
---|
1851 | "tablename": "tvshow", |
---|
1852 | "fields": [ |
---|
1853 | {"fieldname": "name", "default": mpfc}, |
---|
1854 | {"fieldname": "rating", "type": "double"}, |
---|
1855 | ], |
---|
1856 | }, |
---|
1857 | { |
---|
1858 | "tablename": "staff", |
---|
1859 | "fields": [ |
---|
1860 | {"fieldname": "name", "default": "Michael"}, |
---|
1861 | {"fieldname": "food", "default": "Spam"}, |
---|
1862 | {"fieldname": "tvshow", "type": "reference tvshow"}, |
---|
1863 | ], |
---|
1864 | }, |
---|
1865 | ], |
---|
1866 | } |
---|
1867 | db4 = DAL(**dbdict4) |
---|
1868 | assert "staff" in db4.tables |
---|
1869 | assert "name" in db4.staff |
---|
1870 | assert db4.tvshow.rating.type == "double" |
---|
1871 | assert ( |
---|
1872 | isinstance(db4.tvshow.insert(), long), |
---|
1873 | isinstance(db4.tvshow.insert(name="Loriot"), long), |
---|
1874 | isinstance(db4.tvshow.insert(name="Il Mattatore"), long), |
---|
1875 | ) == (True, True, True) |
---|
1876 | assert isinstance(db4(db4.tvshow).select().first().id, long) == True |
---|
1877 | assert db4(db4.tvshow).select().first().name == mpfc |
---|
1878 | |
---|
1879 | drop(db4.staff) |
---|
1880 | drop(db4.tvshow) |
---|
1881 | db4.commit() |
---|
1882 | |
---|
1883 | dbdict5 = {"uri": DEFAULT_URI} |
---|
1884 | db5 = DAL(**dbdict5) |
---|
1885 | assert db5.tables in ([], None) |
---|
1886 | assert not (str(db5) in ("", None)) |
---|
1887 | |
---|
1888 | dbdict6 = { |
---|
1889 | "uri": DEFAULT_URI, |
---|
1890 | "tables": [ |
---|
1891 | {"tablename": "staff"}, |
---|
1892 | { |
---|
1893 | "tablename": "tvshow", |
---|
1894 | "fields": [ |
---|
1895 | {"fieldname": "name"}, |
---|
1896 | {"fieldname": "rating", "type": "double"}, |
---|
1897 | ], |
---|
1898 | }, |
---|
1899 | ], |
---|
1900 | } |
---|
1901 | db6 = DAL(**dbdict6) |
---|
1902 | |
---|
1903 | assert len(db6["staff"].fields) == 1 |
---|
1904 | assert "name" in db6["tvshow"].fields |
---|
1905 | |
---|
1906 | assert db6.staff.insert() is not None |
---|
1907 | assert isinstance(db6(db6.staff).select().first().id, long) == True |
---|
1908 | |
---|
1909 | drop(db6.staff) |
---|
1910 | drop(db6.tvshow) |
---|
1911 | db6.commit() |
---|
1912 | db.close() |
---|
1913 | db2.close() |
---|
1914 | db4.close() |
---|
1915 | db5.close() |
---|
1916 | db6.close() |
---|
1917 | |
---|
1918 | |
---|
1919 | @unittest.skipIf(IS_IMAP, "TODO: IMAP test") |
---|
1920 | class TestSelectAsDict(unittest.TestCase): |
---|
1921 | def testSelect(self): |
---|
1922 | db = DAL(DEFAULT_URI, check_reserved=["all"]) |
---|
1923 | db.define_table( |
---|
1924 | "a_table", Field("b_field"), Field("a_field"), |
---|
1925 | ) |
---|
1926 | db.a_table.insert(a_field="aa1", b_field="bb1") |
---|
1927 | rtn = ( |
---|
1928 | db(db.a_table) |
---|
1929 | .select(db.a_table.id, db.a_table.b_field, db.a_table.a_field) |
---|
1930 | .as_list() |
---|
1931 | ) |
---|
1932 | self.assertEqual(rtn[0]["b_field"], "bb1") |
---|
1933 | keys = rtn[0].keys() |
---|
1934 | self.assertEqual(len(keys), 3) |
---|
1935 | self.assertEqual( |
---|
1936 | ("id" in keys, "b_field" in keys, "a_field" in keys), (True, True, True) |
---|
1937 | ) |
---|
1938 | drop(db.a_table) |
---|
1939 | db.close() |
---|
1940 | |
---|
1941 | |
---|
1942 | @unittest.skipIf(IS_IMAP, "TODO: IMAP test") |
---|
1943 | class TestRNameTable(unittest.TestCase): |
---|
1944 | # tests for highly experimental rname attribute |
---|
1945 | def testSelect(self): |
---|
1946 | db = DAL(DEFAULT_URI, check_reserved=["all"]) |
---|
1947 | rname = _quote(db, "a very complicated tablename") |
---|
1948 | db.define_table("easy_name", Field("a_field"), rname=rname) |
---|
1949 | rtn = db.easy_name.insert(a_field="a") |
---|
1950 | self.assertEqual(isinstance(rtn.id, long), True) |
---|
1951 | rtn = db(db.easy_name.a_field == "a").select() |
---|
1952 | self.assertEqual(len(rtn), 1) |
---|
1953 | self.assertEqual(isinstance(rtn[0].id, long), True) |
---|
1954 | self.assertEqual(rtn[0].a_field, "a") |
---|
1955 | db.easy_name.insert(a_field="b") |
---|
1956 | self.assertEqual(db(db.easy_name).count(), 2) |
---|
1957 | rtn = db(db.easy_name.a_field == "a").update(a_field="c") |
---|
1958 | self.assertEqual(rtn, 1) |
---|
1959 | |
---|
1960 | # clean up |
---|
1961 | drop(db.easy_name) |
---|
1962 | db.close() |
---|
1963 | |
---|
1964 | @unittest.skip("JOIN queries are not supported") |
---|
1965 | def testJoin(self): |
---|
1966 | db = DAL(DEFAULT_URI, check_reserved=["all"]) |
---|
1967 | rname = _quote(db, "this is table t1") |
---|
1968 | rname2 = _quote(db, "this is table t2") |
---|
1969 | db.define_table("t1", Field("aa"), rname=rname) |
---|
1970 | db.define_table("t2", Field("aa"), Field("b", db.t1), rname=rname2) |
---|
1971 | i1 = db.t1.insert(aa="1") |
---|
1972 | i2 = db.t1.insert(aa="2") |
---|
1973 | i3 = db.t1.insert(aa="3") |
---|
1974 | db.t2.insert(aa="4", b=i1) |
---|
1975 | db.t2.insert(aa="5", b=i2) |
---|
1976 | db.t2.insert(aa="6", b=i2) |
---|
1977 | self.assertEqual( |
---|
1978 | len(db(db.t1.id == db.t2.b).select(orderby=db.t1.aa | db.t2.aa)), 3 |
---|
1979 | ) |
---|
1980 | self.assertEqual( |
---|
1981 | db(db.t1.id == db.t2.b).select(orderby=db.t1.aa | db.t2.aa)[2].t1.aa, "2" |
---|
1982 | ) |
---|
1983 | self.assertEqual( |
---|
1984 | db(db.t1.id == db.t2.b).select(orderby=db.t1.aa | db.t2.aa)[2].t2.aa, "6" |
---|
1985 | ) |
---|
1986 | self.assertEqual( |
---|
1987 | len( |
---|
1988 | db().select( |
---|
1989 | db.t1.ALL, |
---|
1990 | db.t2.ALL, |
---|
1991 | left=db.t2.on(db.t1.id == db.t2.b), |
---|
1992 | orderby=db.t1.aa | db.t2.aa, |
---|
1993 | ) |
---|
1994 | ), |
---|
1995 | 4, |
---|
1996 | ) |
---|
1997 | self.assertEqual( |
---|
1998 | db() |
---|
1999 | .select( |
---|
2000 | db.t1.ALL, |
---|
2001 | db.t2.ALL, |
---|
2002 | left=db.t2.on(db.t1.id == db.t2.b), |
---|
2003 | orderby=db.t1.aa | db.t2.aa, |
---|
2004 | )[2] |
---|
2005 | .t1.aa, |
---|
2006 | "2", |
---|
2007 | ) |
---|
2008 | self.assertEqual( |
---|
2009 | db() |
---|
2010 | .select( |
---|
2011 | db.t1.ALL, |
---|
2012 | db.t2.ALL, |
---|
2013 | left=db.t2.on(db.t1.id == db.t2.b), |
---|
2014 | orderby=db.t1.aa | db.t2.aa, |
---|
2015 | )[2] |
---|
2016 | .t2.aa, |
---|
2017 | "6", |
---|
2018 | ) |
---|
2019 | self.assertEqual( |
---|
2020 | db() |
---|
2021 | .select( |
---|
2022 | db.t1.ALL, |
---|
2023 | db.t2.ALL, |
---|
2024 | left=db.t2.on(db.t1.id == db.t2.b), |
---|
2025 | orderby=db.t1.aa | db.t2.aa, |
---|
2026 | )[3] |
---|
2027 | .t1.aa, |
---|
2028 | "3", |
---|
2029 | ) |
---|
2030 | self.assertEqual( |
---|
2031 | db() |
---|
2032 | .select( |
---|
2033 | db.t1.ALL, |
---|
2034 | db.t2.ALL, |
---|
2035 | left=db.t2.on(db.t1.id == db.t2.b), |
---|
2036 | orderby=db.t1.aa | db.t2.aa, |
---|
2037 | )[3] |
---|
2038 | .t2.aa, |
---|
2039 | None, |
---|
2040 | ) |
---|
2041 | self.assertEqual( |
---|
2042 | len( |
---|
2043 | db().select( |
---|
2044 | db.t1.aa, |
---|
2045 | db.t2.id.count(), |
---|
2046 | left=db.t2.on(db.t1.id == db.t2.b), |
---|
2047 | orderby=db.t1.aa, |
---|
2048 | groupby=db.t1.aa, |
---|
2049 | ) |
---|
2050 | ), |
---|
2051 | 3, |
---|
2052 | ) |
---|
2053 | self.assertEqual( |
---|
2054 | db() |
---|
2055 | .select( |
---|
2056 | db.t1.aa, |
---|
2057 | db.t2.id.count(), |
---|
2058 | left=db.t2.on(db.t1.id == db.t2.b), |
---|
2059 | orderby=db.t1.aa, |
---|
2060 | groupby=db.t1.aa, |
---|
2061 | )[0] |
---|
2062 | ._extra[db.t2.id.count()], |
---|
2063 | 1, |
---|
2064 | ) |
---|
2065 | self.assertEqual( |
---|
2066 | db() |
---|
2067 | .select( |
---|
2068 | db.t1.aa, |
---|
2069 | db.t2.id.count(), |
---|
2070 | left=db.t2.on(db.t1.id == db.t2.b), |
---|
2071 | orderby=db.t1.aa, |
---|
2072 | groupby=db.t1.aa, |
---|
2073 | )[1] |
---|
2074 | ._extra[db.t2.id.count()], |
---|
2075 | 2, |
---|
2076 | ) |
---|
2077 | self.assertEqual( |
---|
2078 | db() |
---|
2079 | .select( |
---|
2080 | db.t1.aa, |
---|
2081 | db.t2.id.count(), |
---|
2082 | left=db.t2.on(db.t1.id == db.t2.b), |
---|
2083 | orderby=db.t1.aa, |
---|
2084 | groupby=db.t1.aa, |
---|
2085 | )[2] |
---|
2086 | ._extra[db.t2.id.count()], |
---|
2087 | 0, |
---|
2088 | ) |
---|
2089 | drop(db.t2) |
---|
2090 | drop(db.t1) |
---|
2091 | |
---|
2092 | db.define_table("person", Field("name"), rname=rname) |
---|
2093 | id = db.person.insert(name="max") |
---|
2094 | self.assertEqual(id.name, "max") |
---|
2095 | db.define_table( |
---|
2096 | "dog", Field("name"), Field("ownerperson", "reference person"), rname=rname2 |
---|
2097 | ) |
---|
2098 | db.dog.insert(name="skipper", ownerperson=1) |
---|
2099 | row = db(db.person.id == db.dog.ownerperson).select().first() |
---|
2100 | self.assertEqual(row[db.person.name], "max") |
---|
2101 | self.assertEqual(row["person.name"], "max") |
---|
2102 | drop(db.dog) |
---|
2103 | self.assertEqual(len(db.person._referenced_by), 0) |
---|
2104 | drop(db.person) |
---|
2105 | db.close() |
---|
2106 | |
---|
2107 | |
---|
2108 | @unittest.skipIf(IS_IMAP, "TODO: IMAP test") |
---|
2109 | @unittest.skipIf(IS_GAE, "TODO: Datastore AGGREGATE Not Supported") |
---|
2110 | class TestRNameFields(unittest.TestCase): |
---|
2111 | # tests for highly experimental rname attribute |
---|
2112 | def testSelect(self): |
---|
2113 | db = DAL(DEFAULT_URI, check_reserved=["all"]) |
---|
2114 | rname = _quote(db, "a very complicated fieldname") |
---|
2115 | rname2 = _quote(db, "rating from 1 to 10") |
---|
2116 | db.define_table( |
---|
2117 | "easy_name", |
---|
2118 | Field("a_field", rname=rname), |
---|
2119 | Field("rating", "integer", rname=rname2, default=2), |
---|
2120 | ) |
---|
2121 | rtn = db.easy_name.insert(a_field="a") |
---|
2122 | self.assertEqual(isinstance(rtn.id, long), True) |
---|
2123 | rtn = db(db.easy_name.a_field == "a").select() |
---|
2124 | self.assertEqual(len(rtn), 1) |
---|
2125 | self.assertEqual(isinstance(rtn[0].id, long), True) |
---|
2126 | self.assertEqual(rtn[0].a_field, "a") |
---|
2127 | db.easy_name.insert(a_field="b") |
---|
2128 | rtn = db(db.easy_name.id > 0).delete() |
---|
2129 | self.assertEqual(rtn, 2) |
---|
2130 | rtn = db(db.easy_name.id > 0).count() |
---|
2131 | self.assertEqual(rtn, 0) |
---|
2132 | db.easy_name.insert(a_field="a") |
---|
2133 | db.easy_name.insert(a_field="b") |
---|
2134 | rtn = db(db.easy_name.id > 0).count() |
---|
2135 | self.assertEqual(rtn, 2) |
---|
2136 | rtn = db(db.easy_name.a_field == "a").update(a_field="c") |
---|
2137 | rtn = db(db.easy_name.a_field == "c").count() |
---|
2138 | self.assertEqual(rtn, 1) |
---|
2139 | rtn = db(db.easy_name.a_field != "c").count() |
---|
2140 | self.assertEqual(rtn, 1) |
---|
2141 | avg = db.easy_name.rating.avg() |
---|
2142 | rtn = db(db.easy_name.id > 0).select(avg) |
---|
2143 | self.assertEqual(rtn[0][avg], 2) |
---|
2144 | |
---|
2145 | rname = _quote(db, "this is the person name") |
---|
2146 | db.define_table( |
---|
2147 | "person", Field("name", default="Michael", rname=rname), Field("uuid") |
---|
2148 | ) |
---|
2149 | michael = db.person.insert() # default insert |
---|
2150 | john = db.person.insert(name="John") |
---|
2151 | luke = db.person.insert(name="Luke") |
---|
2152 | |
---|
2153 | rtn = db(db.person.id > 0).select() |
---|
2154 | self.assertEqual(len(rtn), 3) |
---|
2155 | self.assertEqual(rtn[0].id, michael) |
---|
2156 | self.assertEqual(rtn[0].name, "Michael") |
---|
2157 | self.assertEqual(rtn[1].id, john) |
---|
2158 | self.assertEqual(rtn[1].name, "John") |
---|
2159 | # fetch owners, eventually with pet |
---|
2160 | # main point is retrieving Luke with no pets |
---|
2161 | rtn = db(db.person.id > 0).select() |
---|
2162 | self.assertEqual(rtn[0].id, michael) |
---|
2163 | self.assertEqual(rtn[0].name, "Michael") |
---|
2164 | self.assertEqual(rtn[2].name, "Luke") |
---|
2165 | self.assertEqual(rtn[2].id, luke) |
---|
2166 | # as dict |
---|
2167 | rtn = db(db.person.id > 0).select().as_dict() |
---|
2168 | self.assertEqual(rtn[michael]["name"], "Michael") |
---|
2169 | # as list |
---|
2170 | rtn = db(db.person.id > 0).select().as_list() |
---|
2171 | self.assertEqual(rtn[0]["name"], "Michael") |
---|
2172 | # isempty |
---|
2173 | rtn = db(db.person.id > 0).isempty() |
---|
2174 | self.assertEqual(rtn, False) |
---|
2175 | |
---|
2176 | # clean up |
---|
2177 | drop(db.person) |
---|
2178 | drop(db.easy_name) |
---|
2179 | db.close() |
---|
2180 | |
---|
2181 | @unittest.skipIf( |
---|
2182 | IS_GAE, "TODO: Datastore does not accept dict objects as json field input." |
---|
2183 | ) |
---|
2184 | def testRun(self): |
---|
2185 | db = DAL(DEFAULT_URI, check_reserved=["all"]) |
---|
2186 | rname = _quote(db, "a very complicated fieldname") |
---|
2187 | for ft in ["string", "text", "password", "upload", "blob"]: |
---|
2188 | db.define_table("tt", Field("aa", ft, default="", rname=rname)) |
---|
2189 | cv = "x" |
---|
2190 | self.assertEqual(isinstance(db.tt.insert(aa="x"), long), True) |
---|
2191 | if IS_MONGODB and not PY2 and ft == "blob": |
---|
2192 | cv = to_bytes(cv) |
---|
2193 | self.assertEqual(db().select(db.tt.aa)[0].aa, cv) |
---|
2194 | drop(db.tt) |
---|
2195 | db.define_table("tt", Field("aa", "integer", default=1, rname=rname)) |
---|
2196 | self.assertEqual(isinstance(db.tt.insert(aa=3), long), True) |
---|
2197 | self.assertEqual(db().select(db.tt.aa)[0].aa, 3) |
---|
2198 | drop(db.tt) |
---|
2199 | db.define_table("tt", Field("aa", "double", default=1, rname=rname)) |
---|
2200 | self.assertEqual(isinstance(db.tt.insert(aa=3.1), long), True) |
---|
2201 | self.assertEqual(db().select(db.tt.aa)[0].aa, 3.1) |
---|
2202 | drop(db.tt) |
---|
2203 | db.define_table("tt", Field("aa", "boolean", default=True, rname=rname)) |
---|
2204 | self.assertEqual(isinstance(db.tt.insert(aa=True), long), True) |
---|
2205 | self.assertEqual(db().select(db.tt.aa)[0].aa, True) |
---|
2206 | drop(db.tt) |
---|
2207 | db.define_table("tt", Field("aa", "json", default={}, rname=rname)) |
---|
2208 | self.assertEqual(isinstance(db.tt.insert(aa={}), long), True) |
---|
2209 | self.assertEqual(db().select(db.tt.aa)[0].aa, {}) |
---|
2210 | drop(db.tt) |
---|
2211 | db.define_table( |
---|
2212 | "tt", Field("aa", "date", default=datetime.date.today(), rname=rname) |
---|
2213 | ) |
---|
2214 | t0 = datetime.date.today() |
---|
2215 | self.assertEqual(isinstance(db.tt.insert(aa=t0), long), True) |
---|
2216 | self.assertEqual(db().select(db.tt.aa)[0].aa, t0) |
---|
2217 | drop(db.tt) |
---|
2218 | db.define_table( |
---|
2219 | "tt", |
---|
2220 | Field("aa", "datetime", default=datetime.datetime.today(), rname=rname), |
---|
2221 | ) |
---|
2222 | t0 = datetime.datetime(1971, 12, 21, 10, 30, 55, 0,) |
---|
2223 | id = db.tt.insert(aa=t0) |
---|
2224 | self.assertEqual(db().select(db.tt.aa)[0].aa, t0) |
---|
2225 | |
---|
2226 | ## Row APIs |
---|
2227 | row = db().select(db.tt.aa)[0] |
---|
2228 | self.assertEqual(db.tt[id].aa, t0) |
---|
2229 | self.assertEqual(db.tt["aa"], db.tt.aa) |
---|
2230 | self.assertEqual(db.tt(id).aa, t0) |
---|
2231 | self.assertTrue(db.tt(id, aa=None) == None) |
---|
2232 | self.assertFalse(db.tt(id, aa=t0) == None) |
---|
2233 | self.assertEqual(row.aa, t0) |
---|
2234 | self.assertEqual(row["aa"], t0) |
---|
2235 | self.assertEqual(row["tt.aa"], t0) |
---|
2236 | self.assertEqual(row("tt.aa"), t0) |
---|
2237 | |
---|
2238 | ## Lazy and Virtual fields |
---|
2239 | db.tt.b = Field.Virtual(lambda row: row.tt.aa) |
---|
2240 | db.tt.c = Field.Lazy(lambda row: row.tt.aa) |
---|
2241 | row = db().select(db.tt.aa)[0] |
---|
2242 | self.assertEqual(row.b, t0) |
---|
2243 | self.assertEqual(row.c(), t0) |
---|
2244 | |
---|
2245 | drop(db.tt) |
---|
2246 | db.define_table("tt", Field("aa", "time", default="11:30", rname=rname)) |
---|
2247 | t0 = datetime.time(10, 30, 55) |
---|
2248 | self.assertEqual(isinstance(db.tt.insert(aa=t0), long), True) |
---|
2249 | self.assertEqual(db().select(db.tt.aa)[0].aa, t0) |
---|
2250 | drop(db.tt) |
---|
2251 | db.close() |
---|
2252 | |
---|
2253 | def testInsert(self): |
---|
2254 | db = DAL(DEFAULT_URI, check_reserved=["all"]) |
---|
2255 | rname = _quote(db, "a very complicated fieldname") |
---|
2256 | db.define_table("tt", Field("aa", rname=rname)) |
---|
2257 | self.assertEqual(isinstance(db.tt.insert(aa="1"), long), True) |
---|
2258 | self.assertEqual(isinstance(db.tt.insert(aa="1"), long), True) |
---|
2259 | self.assertEqual(isinstance(db.tt.insert(aa="1"), long), True) |
---|
2260 | self.assertEqual(db(db.tt.aa == "1").count(), 3) |
---|
2261 | self.assertEqual(db(db.tt.aa == "2").isempty(), True) |
---|
2262 | self.assertEqual(db(db.tt.aa == "1").update(aa="2"), 3) |
---|
2263 | self.assertEqual(db(db.tt.aa == "2").count(), 3) |
---|
2264 | self.assertEqual(db(db.tt.aa == "2").isempty(), False) |
---|
2265 | self.assertEqual(db(db.tt.aa == "2").delete(), 3) |
---|
2266 | self.assertEqual(db(db.tt.aa == "2").isempty(), True) |
---|
2267 | drop(db.tt) |
---|
2268 | db.close() |
---|
2269 | |
---|
2270 | @unittest.skip("JOIN queries are not supported") |
---|
2271 | def testJoin(self): |
---|
2272 | db = DAL(DEFAULT_URI, check_reserved=["all"]) |
---|
2273 | rname = _quote(db, "this is field aa") |
---|
2274 | rname2 = _quote(db, "this is field b") |
---|
2275 | db.define_table("t1", Field("aa", rname=rname)) |
---|
2276 | db.define_table("t2", Field("aa", rname=rname), Field("b", db.t1, rname=rname2)) |
---|
2277 | i1 = db.t1.insert(aa="1") |
---|
2278 | i2 = db.t1.insert(aa="2") |
---|
2279 | i3 = db.t1.insert(aa="3") |
---|
2280 | db.t2.insert(aa="4", b=i1) |
---|
2281 | db.t2.insert(aa="5", b=i2) |
---|
2282 | db.t2.insert(aa="6", b=i2) |
---|
2283 | self.assertEqual( |
---|
2284 | len(db(db.t1.id == db.t2.b).select(orderby=db.t1.aa | db.t2.aa)), 3 |
---|
2285 | ) |
---|
2286 | self.assertEqual( |
---|
2287 | db(db.t1.id == db.t2.b).select(orderby=db.t1.aa | db.t2.aa)[2].t1.aa, "2" |
---|
2288 | ) |
---|
2289 | self.assertEqual( |
---|
2290 | db(db.t1.id == db.t2.b).select(orderby=db.t1.aa | db.t2.aa)[2].t2.aa, "6" |
---|
2291 | ) |
---|
2292 | self.assertEqual( |
---|
2293 | len( |
---|
2294 | db().select( |
---|
2295 | db.t1.ALL, |
---|
2296 | db.t2.ALL, |
---|
2297 | left=db.t2.on(db.t1.id == db.t2.b), |
---|
2298 | orderby=db.t1.aa | db.t2.aa, |
---|
2299 | ) |
---|
2300 | ), |
---|
2301 | 4, |
---|
2302 | ) |
---|
2303 | self.assertEqual( |
---|
2304 | db() |
---|
2305 | .select( |
---|
2306 | db.t1.ALL, |
---|
2307 | db.t2.ALL, |
---|
2308 | left=db.t2.on(db.t1.id == db.t2.b), |
---|
2309 | orderby=db.t1.aa | db.t2.aa, |
---|
2310 | )[2] |
---|
2311 | .t1.aa, |
---|
2312 | "2", |
---|
2313 | ) |
---|
2314 | self.assertEqual( |
---|
2315 | db() |
---|
2316 | .select( |
---|
2317 | db.t1.ALL, |
---|
2318 | db.t2.ALL, |
---|
2319 | left=db.t2.on(db.t1.id == db.t2.b), |
---|
2320 | orderby=db.t1.aa | db.t2.aa, |
---|
2321 | )[2] |
---|
2322 | .t2.aa, |
---|
2323 | "6", |
---|
2324 | ) |
---|
2325 | self.assertEqual( |
---|
2326 | db() |
---|
2327 | .select( |
---|
2328 | db.t1.ALL, |
---|
2329 | db.t2.ALL, |
---|
2330 | left=db.t2.on(db.t1.id == db.t2.b), |
---|
2331 | orderby=db.t1.aa | db.t2.aa, |
---|
2332 | )[3] |
---|
2333 | .t1.aa, |
---|
2334 | "3", |
---|
2335 | ) |
---|
2336 | self.assertEqual( |
---|
2337 | db() |
---|
2338 | .select( |
---|
2339 | db.t1.ALL, |
---|
2340 | db.t2.ALL, |
---|
2341 | left=db.t2.on(db.t1.id == db.t2.b), |
---|
2342 | orderby=db.t1.aa | db.t2.aa, |
---|
2343 | )[3] |
---|
2344 | .t2.aa, |
---|
2345 | None, |
---|
2346 | ) |
---|
2347 | self.assertEqual( |
---|
2348 | len( |
---|
2349 | db().select( |
---|
2350 | db.t1.aa, |
---|
2351 | db.t2.id.count(), |
---|
2352 | left=db.t2.on(db.t1.id == db.t2.b), |
---|
2353 | orderby=db.t1.aa, |
---|
2354 | groupby=db.t1.aa, |
---|
2355 | ) |
---|
2356 | ), |
---|
2357 | 3, |
---|
2358 | ) |
---|
2359 | self.assertEqual( |
---|
2360 | db() |
---|
2361 | .select( |
---|
2362 | db.t1.aa, |
---|
2363 | db.t2.id.count(), |
---|
2364 | left=db.t2.on(db.t1.id == db.t2.b), |
---|
2365 | orderby=db.t1.aa, |
---|
2366 | groupby=db.t1.aa, |
---|
2367 | )[0] |
---|
2368 | ._extra[db.t2.id.count()], |
---|
2369 | 1, |
---|
2370 | ) |
---|
2371 | self.assertEqual( |
---|
2372 | db() |
---|
2373 | .select( |
---|
2374 | db.t1.aa, |
---|
2375 | db.t2.id.count(), |
---|
2376 | left=db.t2.on(db.t1.id == db.t2.b), |
---|
2377 | orderby=db.t1.aa, |
---|
2378 | groupby=db.t1.aa, |
---|
2379 | )[1] |
---|
2380 | ._extra[db.t2.id.count()], |
---|
2381 | 2, |
---|
2382 | ) |
---|
2383 | self.assertEqual( |
---|
2384 | db() |
---|
2385 | .select( |
---|
2386 | db.t1.aa, |
---|
2387 | db.t2.id.count(), |
---|
2388 | left=db.t2.on(db.t1.id == db.t2.b), |
---|
2389 | orderby=db.t1.aa, |
---|
2390 | groupby=db.t1.aa, |
---|
2391 | )[2] |
---|
2392 | ._extra[db.t2.id.count()], |
---|
2393 | 0, |
---|
2394 | ) |
---|
2395 | drop(db.t2) |
---|
2396 | drop(db.t1) |
---|
2397 | |
---|
2398 | db.define_table("person", Field("name", rname=rname)) |
---|
2399 | id = db.person.insert(name="max") |
---|
2400 | self.assertEqual(id.name, "max") |
---|
2401 | db.define_table( |
---|
2402 | "dog", |
---|
2403 | Field("name", rname=rname), |
---|
2404 | Field("ownerperson", "reference person", rname=rname2), |
---|
2405 | ) |
---|
2406 | db.dog.insert(name="skipper", ownerperson=1) |
---|
2407 | row = db(db.person.id == db.dog.ownerperson).select().first() |
---|
2408 | self.assertEqual(row[db.person.name], "max") |
---|
2409 | self.assertEqual(row["person.name"], "max") |
---|
2410 | drop(db.dog) |
---|
2411 | self.assertEqual(len(db.person._referenced_by), 0) |
---|
2412 | drop(db.person) |
---|
2413 | db.close() |
---|
2414 | |
---|
2415 | |
---|
2416 | @unittest.skipIf(IS_IMAP, "TODO: IMAP test") |
---|
2417 | class TestQuoting(unittest.TestCase): |
---|
2418 | |
---|
2419 | # tests for case sensitivity |
---|
2420 | def testCase(self): |
---|
2421 | return |
---|
2422 | db = DAL(DEFAULT_URI, check_reserved=["all"], ignore_field_case=False) |
---|
2423 | |
---|
2424 | # test table case |
---|
2425 | t0 = db.define_table("B", Field("f", "string")) |
---|
2426 | |
---|
2427 | t1 = db.define_table("b", Field("B", t0), Field("words", "text")) |
---|
2428 | |
---|
2429 | blather = "blah blah and so" |
---|
2430 | t0[0] = {"f": "content"} |
---|
2431 | t1[0] = {"B": int(t0[1]["id"]), "words": blather} |
---|
2432 | |
---|
2433 | r = db(db.B.id == db.b.B).select() |
---|
2434 | |
---|
2435 | self.assertEqual(r[0].b.words, blather) |
---|
2436 | |
---|
2437 | drop(t1) |
---|
2438 | drop(t0) |
---|
2439 | |
---|
2440 | # test field case |
---|
2441 | t0 = db.define_table("table is a test", Field("a_a"), Field("a_A")) |
---|
2442 | |
---|
2443 | t0[0] = dict(a_a="a_a", a_A="a_A") |
---|
2444 | |
---|
2445 | self.assertEqual(t0[1].a_a, "a_a") |
---|
2446 | self.assertEqual(t0[1].a_A, "a_A") |
---|
2447 | |
---|
2448 | drop(t0) |
---|
2449 | db.close() |
---|
2450 | |
---|
2451 | def testPKFK(self): |
---|
2452 | |
---|
2453 | # test primary keys |
---|
2454 | |
---|
2455 | db = DAL(DEFAULT_URI, check_reserved=["all"], ignore_field_case=False) |
---|
2456 | # test table without surrogate key. Length must is limited to |
---|
2457 | # 100 because of MySQL limitations: it cannot handle more than |
---|
2458 | # 767 bytes in unique keys. |
---|
2459 | |
---|
2460 | t0 = db.define_table("t0", Field("Code", length=100), primarykey=["Code"]) |
---|
2461 | t2 = db.define_table("t2", Field("f"), Field("t0_Code", "reference t0")) |
---|
2462 | t3 = db.define_table( |
---|
2463 | "t3", Field("f", length=100), Field("t0_Code", t0.Code), primarykey=["f"] |
---|
2464 | ) |
---|
2465 | t4 = db.define_table( |
---|
2466 | "t4", Field("f", length=100), Field("t0", t0), primarykey=["f"] |
---|
2467 | ) |
---|
2468 | |
---|
2469 | try: |
---|
2470 | t5 = db.define_table( |
---|
2471 | "t5", |
---|
2472 | Field("f", length=100), |
---|
2473 | Field("t0", "reference no_table_wrong_reference"), |
---|
2474 | primarykey=["f"], |
---|
2475 | ) |
---|
2476 | except Exception as e: |
---|
2477 | self.assertTrue(isinstance(e, KeyError)) |
---|
2478 | |
---|
2479 | drop(t0, "cascade") |
---|
2480 | drop(t2) |
---|
2481 | drop(t3) |
---|
2482 | drop(t4) |
---|
2483 | db.close() |
---|
2484 | |
---|
2485 | |
---|
2486 | class TestTableAndFieldCase(unittest.TestCase): |
---|
2487 | """ |
---|
2488 | at the Python level we should not allow db.C and db.c because of .table conflicts on windows |
---|
2489 | but it should be possible to map two different names into distinct tables "c" and "C" at the Python level |
---|
2490 | By default Python models names should be mapped into lower case table names and assume case insensitivity. |
---|
2491 | """ |
---|
2492 | |
---|
2493 | def testme(self): |
---|
2494 | return |
---|
2495 | |
---|
2496 | |
---|
2497 | class TestQuotesByDefault(unittest.TestCase): |
---|
2498 | """ |
---|
2499 | all default tables names should be quoted unless an explicit mapping has been given for a table. |
---|
2500 | """ |
---|
2501 | |
---|
2502 | def testme(self): |
---|
2503 | return |
---|
2504 | |
---|
2505 | |
---|
2506 | @unittest.skipIf(IS_IMAP, "TODO: IMAP test") |
---|
2507 | class TestRecordVersioning(unittest.TestCase): |
---|
2508 | def testRun(self): |
---|
2509 | db = DAL(DEFAULT_URI, check_reserved=["all"]) |
---|
2510 | tt = db.define_table( |
---|
2511 | "tt", Field("name"), Field("is_active", "boolean", default=True) |
---|
2512 | ) |
---|
2513 | db.tt._enable_record_versioning(archive_name="tt_archive") |
---|
2514 | self.assertTrue("tt_archive" in db) |
---|
2515 | i_id = db.tt.insert(name="web2py1") |
---|
2516 | db.tt.insert(name="web2py2") |
---|
2517 | db(db.tt.name == "web2py2").delete() |
---|
2518 | self.assertEqual(len(db(db.tt).select()), 1) |
---|
2519 | self.assertEqual(db(db.tt).count(), 1) |
---|
2520 | db(db.tt.id == i_id).update(name="web2py3") |
---|
2521 | self.assertEqual(len(db(db.tt).select()), 1) |
---|
2522 | self.assertEqual(db(db.tt).count(), 1) |
---|
2523 | self.assertEqual(len(db(db.tt_archive).select()), 2) |
---|
2524 | self.assertEqual(db(db.tt_archive).count(), 2) |
---|
2525 | drop(db.tt_archive) |
---|
2526 | # it allows tt to be dropped |
---|
2527 | db.tt._before_delete = [] |
---|
2528 | drop(tt) |
---|
2529 | db.close() |
---|
2530 | |
---|
2531 | |
---|
2532 | @unittest.skipIf(IS_IMAP, "TODO: IMAP test") |
---|
2533 | class TestConnection(unittest.TestCase): |
---|
2534 | def testRun(self): |
---|
2535 | # check for adapter reconnect without parameters |
---|
2536 | db1 = DAL(DEFAULT_URI, check_reserved=["all"]) |
---|
2537 | db1.define_table("tt", Field("aa", "integer")) |
---|
2538 | self.assertEqual(isinstance(db1.tt.insert(aa=1), long), True) |
---|
2539 | self.assertEqual(db1(db1.tt.aa == 1).count(), 1) |
---|
2540 | drop(db1.tt) |
---|
2541 | db1._adapter.close() |
---|
2542 | db1._adapter.reconnect() |
---|
2543 | db1.define_table("tt", Field("aa", "integer")) |
---|
2544 | self.assertEqual(isinstance(db1.tt.insert(aa=1), long), True) |
---|
2545 | self.assertEqual(db1(db1.tt.aa == 1).count(), 1) |
---|
2546 | drop(db1.tt) |
---|
2547 | db1.close() |
---|
2548 | |
---|
2549 | # check connection are reused with pool_size |
---|
2550 | connections = {} |
---|
2551 | for a in range(10): |
---|
2552 | db2 = DAL(DEFAULT_URI, check_reserved=["all"], pool_size=5) |
---|
2553 | c = db2._adapter.connection |
---|
2554 | connections[id(c)] = c |
---|
2555 | db2.close() |
---|
2556 | self.assertEqual(len(connections), 1) |
---|
2557 | c = [connections[x] for x in connections][0] |
---|
2558 | c.commit() |
---|
2559 | c.close() |
---|
2560 | |
---|
2561 | # check correct use of pool_size |
---|
2562 | dbs = [] |
---|
2563 | for a in range(10): |
---|
2564 | db3 = DAL(DEFAULT_URI, check_reserved=["all"], pool_size=5) |
---|
2565 | db3._adapter.get_connection() |
---|
2566 | dbs.append(db3) |
---|
2567 | for db in dbs: |
---|
2568 | db.close() |
---|
2569 | self.assertEqual(len(db3._adapter.POOLS[DEFAULT_URI]), 5) |
---|
2570 | for c in db3._adapter.POOLS[DEFAULT_URI]: |
---|
2571 | c.close() |
---|
2572 | db3._adapter.POOLS[DEFAULT_URI] = [] |
---|
2573 | |
---|
2574 | |
---|
2575 | @unittest.skipIf(IS_IMAP, "TODO: IMAP test") |
---|
2576 | class TestBasicOps(unittest.TestCase): |
---|
2577 | def testRun(self): |
---|
2578 | db = DAL(DEFAULT_URI, check_reserved=["all"]) |
---|
2579 | tt = db.define_table( |
---|
2580 | "tt", Field("name"), Field("is_active", "boolean", default=True) |
---|
2581 | ) |
---|
2582 | i_id = db.tt.insert(name="web2py1") |
---|
2583 | db.tt.insert(name="web2py2") |
---|
2584 | db(db.tt.name == "web2py2").delete() |
---|
2585 | self.assertEqual(len(db(db.tt).select()), 1) |
---|
2586 | self.assertEqual(db(db.tt).count(), 1) |
---|
2587 | db(db.tt.id == i_id).update(name="web2py3") |
---|
2588 | self.assertEqual(len(db(db.tt).select()), 1) |
---|
2589 | self.assertEqual(db(db.tt).count(), 1) |
---|
2590 | drop(tt) |
---|
2591 | db.close() |
---|
2592 | |
---|
2593 | |
---|
2594 | @unittest.skipIf(IS_IMAP, "TODO: IMAP test") |
---|
2595 | @unittest.skipIf(IS_GAE, 'TODO: Datastore "unsupported operand type"') |
---|
2596 | class TestSQLCustomType(unittest.TestCase): |
---|
2597 | def testRun(self): |
---|
2598 | db = DAL(DEFAULT_URI, check_reserved=["all"]) |
---|
2599 | from pydal.helpers.classes import SQLCustomType |
---|
2600 | |
---|
2601 | native_double = "double" |
---|
2602 | native_string = "string" |
---|
2603 | if hasattr(db._adapter, "types"): |
---|
2604 | native_double = db._adapter.types["double"] |
---|
2605 | try: |
---|
2606 | native_string = db._adapter.types["string"] % {"length": 256} |
---|
2607 | except: |
---|
2608 | native_string = db._adapter.types["string"] |
---|
2609 | basic_t = SQLCustomType(type="double", native=native_double) |
---|
2610 | basic_t_str = SQLCustomType(type="string", native=native_string) |
---|
2611 | t0 = db.define_table( |
---|
2612 | "t0", Field("price", basic_t), Field("product", basic_t_str) |
---|
2613 | ) |
---|
2614 | r_id = t0.insert(price=None, product=None) |
---|
2615 | row = db(t0.id == r_id).select(t0.ALL).first() |
---|
2616 | self.assertEqual(row["price"], None) |
---|
2617 | self.assertEqual(row["product"], None) |
---|
2618 | r_id = t0.insert(price=1.2, product="car") |
---|
2619 | row = db(t0.id == r_id).select(t0.ALL).first() |
---|
2620 | self.assertEqual(row["price"], 1.2) |
---|
2621 | self.assertEqual(row["product"], "car") |
---|
2622 | t0.drop() |
---|
2623 | db.close() |
---|
2624 | |
---|
2625 | |
---|
2626 | @unittest.skipIf(IS_GAE or IS_IMAP, "Skip test lazy") |
---|
2627 | class TestLazy(unittest.TestCase): |
---|
2628 | def testRun(self): |
---|
2629 | db = DAL(DEFAULT_URI, check_reserved=["all"], lazy_tables=True) |
---|
2630 | t0 = db.define_table("t0", Field("name")) |
---|
2631 | self.assertTrue(("t0" in db._LAZY_TABLES.keys())) |
---|
2632 | db.t0.insert(name="1") |
---|
2633 | self.assertFalse(("t0" in db._LAZY_TABLES.keys())) |
---|
2634 | db.t0.drop() |
---|
2635 | db.close() |
---|
2636 | |
---|
2637 | def testLazyGetter(self): |
---|
2638 | db = DAL(DEFAULT_URI, lazy_tables=True) |
---|
2639 | db.define_table("tt", Field("value", "integer")) |
---|
2640 | db.define_table( |
---|
2641 | "ttt", Field("value", "integer"), Field("tt_id", "reference tt"), |
---|
2642 | ) |
---|
2643 | # Force table definition |
---|
2644 | db.ttt.value.writable = False |
---|
2645 | idd = db.tt.insert(value=0) |
---|
2646 | db.ttt.insert(tt_id=idd) |
---|
2647 | db.ttt.drop() |
---|
2648 | db.tt.drop() |
---|
2649 | db.close() |
---|
2650 | |
---|
2651 | def testRowNone(self): |
---|
2652 | db = DAL(DEFAULT_URI, lazy_tables=True) |
---|
2653 | tt = db.define_table("tt", Field("value", "integer")) |
---|
2654 | db.tt.insert(value=None) |
---|
2655 | row = db(db.tt).select(db.tt.ALL).first() |
---|
2656 | self.assertEqual(row.value, None) |
---|
2657 | self.assertEqual(row[db.tt.value], None) |
---|
2658 | self.assertEqual(row["tt.value"], None) |
---|
2659 | self.assertEqual(row.get("tt.value"), None) |
---|
2660 | self.assertEqual(row["value"], None) |
---|
2661 | self.assertEqual(row.get("value"), None) |
---|
2662 | db.tt.drop() |
---|
2663 | db.close() |
---|
2664 | |
---|
2665 | |
---|
2666 | class TestRedefine(unittest.TestCase): |
---|
2667 | def testRun(self): |
---|
2668 | db = DAL(DEFAULT_URI, check_reserved=["all"], lazy_tables=True, migrate=False) |
---|
2669 | db.define_table("t_a", Field("code")) |
---|
2670 | self.assertTrue("code" in db.t_a) |
---|
2671 | self.assertTrue("code" in db["t_a"]) |
---|
2672 | db.define_table("t_a", Field("code_a"), redefine=True) |
---|
2673 | self.assertFalse("code" in db.t_a) |
---|
2674 | self.assertFalse("code" in db["t_a"]) |
---|
2675 | self.assertTrue("code_a" in db.t_a) |
---|
2676 | self.assertTrue("code_a" in db["t_a"]) |
---|
2677 | db.close() |
---|
2678 | |
---|
2679 | |
---|
2680 | @unittest.skipIf(IS_IMAP, "TODO: IMAP test") |
---|
2681 | class TestUpdateInsert(unittest.TestCase): |
---|
2682 | def testRun(self): |
---|
2683 | db = DAL(DEFAULT_URI, check_reserved=["all"]) |
---|
2684 | t0 = db.define_table("t0", Field("name")) |
---|
2685 | i_id = db.t0.update_or_insert((db.t0.id == 1), name="web2py") |
---|
2686 | u_id = db.t0.update_or_insert((db.t0.id == i_id), name="web2py2") |
---|
2687 | self.assertTrue(i_id != None) |
---|
2688 | self.assertTrue(u_id == None) |
---|
2689 | self.assertEqual(len(db(db.t0).select()), 1) |
---|
2690 | self.assertEqual(db(db.t0).count(), 1) |
---|
2691 | self.assertEqual(db(db.t0.name == "web2py").count(), 0) |
---|
2692 | self.assertEqual(db(db.t0.name == "web2py2").count(), 1) |
---|
2693 | drop(t0) |
---|
2694 | db.close() |
---|
2695 | |
---|
2696 | |
---|
2697 | @unittest.skipIf(IS_IMAP, "TODO: IMAP test") |
---|
2698 | class TestBulkInsert(unittest.TestCase): |
---|
2699 | def testRun(self): |
---|
2700 | db = DAL(DEFAULT_URI, check_reserved=["all"]) |
---|
2701 | t0 = db.define_table("t0", Field("name")) |
---|
2702 | global ctr |
---|
2703 | ctr = 0 |
---|
2704 | |
---|
2705 | def test_after_insert(i, r): |
---|
2706 | self.assertIsInstance(i, OpRow) |
---|
2707 | global ctr |
---|
2708 | ctr += 1 |
---|
2709 | return True |
---|
2710 | |
---|
2711 | t0._after_insert.append(test_after_insert) |
---|
2712 | items = [{"name": "web2py_%s" % pos} for pos in range(0, 10, 1)] |
---|
2713 | t0.bulk_insert(items) |
---|
2714 | self.assertTrue(db(t0).count() == len(items)) |
---|
2715 | for pos in range(0, 10, 1): |
---|
2716 | self.assertEqual(len(db(t0.name == "web2py_%s" % pos).select()), 1) |
---|
2717 | self.assertEqual(db(t0.name == "web2py_%s" % pos).count(), 1) |
---|
2718 | self.assertTrue(ctr == len(items)) |
---|
2719 | drop(t0) |
---|
2720 | db.close() |
---|
2721 | |
---|
2722 | |
---|
2723 | if __name__ == "__main__": |
---|
2724 | unittest.main() |
---|
2725 | tearDownModule() |
---|