source: OpenRLabs-Git/deploy/rlabs-docker/web2py-rlabs/gluon/packages/dal/tests/nosql.py

main
Last change on this file was 42bd667, checked in by David Fuertes <dfuertes@…>, 4 years ago

Historial Limpio

  • Property mode set to 100755
File size: 97.4 KB
Line 
1# -*- coding: utf-8 -*-
2"""
3    Unit tests for NoSQL adapters
4"""
5
6from __future__ import print_function
7import sys
8import os
9import glob
10import datetime
11from ._compat import unittest
12
13from pydal._compat import PY2, basestring, StringIO, to_bytes, long
14from pydal import DAL, Field
15from pydal.objects import Table, Query, Expression
16from pydal.helpers.classes import SQLALL, OpRow
17from pydal.exceptions import NotOnNOSQLError
18from ._adapt import DEFAULT_URI, IS_IMAP, drop, IS_GAE, IS_MONGODB, _quote
19
20if IS_IMAP:
21    from pydal.adapters import IMAPAdapter
22    from pydal.contrib import mockimaplib
23
24    IMAPAdapter.driver = mockimaplib
25elif IS_MONGODB:
26    from pydal.adapters.mongo import Expansion
27elif IS_GAE:
28    # setup GAE dummy database
29    from google.appengine.ext import testbed
30
31    gaetestbed = testbed.Testbed()
32    gaetestbed.activate()
33    gaetestbed.init_datastore_v3_stub()
34    gaetestbed.init_memcache_stub()
35
36print("Testing against %s engine (%s)" % (DEFAULT_URI.partition(":")[0], DEFAULT_URI))
37
38ALLOWED_DATATYPES = [
39    "string",
40    "text",
41    "integer",
42    "boolean",
43    "double",
44    "blob",
45    "date",
46    "time",
47    "datetime",
48    "upload",
49    "password",
50    "json",
51]
52
53
54def setUpModule():
55    if not IS_IMAP:
56        db = DAL(DEFAULT_URI, check_reserved=["all"])
57
58        def clean_table(db, tablename):
59            try:
60                db.define_table(tablename)
61            except Exception as e:
62                pass
63            try:
64                drop(db[tablename])
65            except Exception as e:
66                pass
67
68        for tablename in [
69            "tt",
70            "t0",
71            "t1",
72            "t2",
73            "t3",
74            "t4",
75            "easy_name",
76            "tt_archive",
77            "pet_farm",
78            "person",
79        ]:
80            clean_table(db, tablename)
81        db.close()
82
83
84def tearDownModule():
85    if os.path.isfile("sql.log"):
86        os.unlink("sql.log")
87    for a in glob.glob("*.table"):
88        os.unlink(a)
89
90
91@unittest.skipIf(not IS_MONGODB, "Skipping MongoDB Tests")
92class TestMongo(unittest.TestCase):
93    """ Tests specific to MongoDB,  error and side path exercisers, etc
94    """
95
96    def testVersionCheck(self):
97        driver_args = {"fake_version": "2.9 Phony"}
98        with self.assertRaises(Exception):
99            db = DAL(
100                DEFAULT_URI, attempts=1, check_reserved=["all"], driver_args=driver_args
101            )
102
103    def testRun(self):
104        db = DAL(DEFAULT_URI, check_reserved=["all"])
105        db.define_table("tt", Field("aa", "reference"))
106        with self.assertRaises(ValueError):
107            db.tt.insert(aa="x")
108        with self.assertRaises(ValueError):
109            db.tt.insert(aa="_")
110        with self.assertRaises(TypeError):
111            db.tt.insert(aa=3.1)
112        self.assertEqual(isinstance(db.tt.insert(aa="<random>"), long), True)
113        self.assertEqual(isinstance(db.tt.insert(aa="1"), long), True)
114        self.assertEqual(isinstance(db.tt.insert(aa="0x1"), long), True)
115        with self.assertRaises(RuntimeError):
116            db(db.tt.aa + 1 == 1).update(aa=0)
117        drop(db.tt)
118
119        db.define_table("tt", Field("aa", "date"))
120        self.assertEqual(isinstance(db.tt.insert(aa=None), long), True)
121        self.assertEqual(db().select(db.tt.aa)[0].aa, None)
122        drop(db.tt)
123
124        db.define_table("tt", Field("aa", "time"))
125        self.assertEqual(isinstance(db.tt.insert(aa=None), long), True)
126        self.assertEqual(db().select(db.tt.aa)[0].aa, None)
127        with self.assertRaises(RuntimeError):
128            db(db.tt.aa <= None).count()
129        with self.assertRaises(NotImplementedError):
130            db._adapter.select(
131                Query(db, db._adapter.dialect.aggregate, db.tt.aa, "UNKNOWN"),
132                [db.tt.aa],
133                {},
134            )
135        with self.assertRaises(NotImplementedError):
136            db._adapter.select(
137                Expression(
138                    db, db._adapter.dialect.extract, db.tt.aa, "UNKNOWN", "integer"
139                ),
140                [db.tt.aa],
141                {},
142            )
143        drop(db.tt)
144
145        db.define_table("tt", Field("aa", "integer"))
146        case = (db.tt.aa == 0).case(db.tt.aa + 2)
147        with self.assertRaises(SyntaxError):
148            db(case).count()
149        drop(db.tt)
150
151        db.define_table(
152            "tt", Field("aa"), Field("bb", "integer"), Field("cc", "list:integer")
153        )
154        db.tt.insert(aa="aa")
155
156        with self.assertRaises(NotImplementedError):
157            db((db.tt.aa + 1).contains(db.tt.aa)).count()
158        with self.assertRaises(NotImplementedError):
159            db(db.tt.cc.contains(db.tt.aa)).count()
160        with self.assertRaises(NotImplementedError):
161            db(db.tt.aa.contains(db.tt.cc)).count()
162        with self.assertRaises(NotImplementedError):
163            db(db.tt.aa.contains(1.0)).count()
164        with self.assertRaises(NotImplementedError):
165            db().select(db.tt.aa.lower()[4:-1]).first()
166        with self.assertRaises(NotOnNOSQLError):
167            db(db.tt.aa.belongs(db()._select(db.tt.aa))).count()
168        with self.assertRaises(RuntimeError):
169            db(db.tt.aa.lower()).update(aa="bb")
170        with self.assertRaises(NotImplementedError):
171            db(db.tt).select(orderby="<random>")
172        with self.assertRaises(RuntimeError):
173            db().select()
174        with self.assertRaises(RuntimeError):
175            Expansion(
176                db._adapter,
177                "delete",
178                Query(db, db._adapter.dialect.eq, db.tt.aa, "x"),
179                [True],
180            )
181        with self.assertRaises(RuntimeError):
182            Expansion(
183                db._adapter,
184                "delete",
185                Query(db, db._adapter.dialect.eq, db.tt.aa, "x"),
186                [True],
187            )
188        with self.assertRaises(RuntimeError):
189            expanded = Expansion(
190                db._adapter,
191                "count",
192                Query(db, db._adapter.dialect.eq, db.tt.aa, "x"),
193                [True],
194            )
195        expanded = Expansion(
196            db._adapter, "count", Query(db, db._adapter.dialect.eq, db.tt.aa, "x"), []
197        )
198        self.assertEqual(db._adapter.expand(expanded).query_dict, {"aa": "x"})
199
200        if db._adapter.server_version_major >= 2.6:
201            with self.assertRaises(RuntimeError):
202                db(db.tt).update(id=1)
203        else:
204            db(db.tt).update(id=1)
205        self.assertNotEqual(db(db.tt.aa == "aa").select(db.tt.id).response[0][0], 1)
206        drop(db.tt)
207
208        db.close()
209
210        for safe in [False, True, False]:
211            db = DAL(DEFAULT_URI, check_reserved=["all"])
212            db.define_table("tt", Field("aa"))
213            self.assertEqual(isinstance(db.tt.insert(aa="x"), long), True)
214            with self.assertRaises(RuntimeError):
215                db._adapter.delete(db["tt"], "x", safe=safe)
216            self.assertEqual(
217                db._adapter.delete(
218                    db["tt"],
219                    Query(db, db._adapter.dialect.eq, db.tt.aa, "x"),
220                    safe=safe,
221                ),
222                1,
223            )
224            self.assertEqual(db(db.tt.aa == "x").count(), 0)
225            self.assertEqual(
226                db._adapter.update(
227                    db["tt"],
228                    Query(db, db._adapter.dialect.eq, db.tt.aa, "x"),
229                    db["tt"]._fields_and_values_for_update({"aa": "x"}).op_values(),
230                    safe=safe,
231                ),
232                0,
233            )
234            drop(db.tt)
235            db.close()
236
237    def testJoin(self):
238        db = DAL(DEFAULT_URI, check_reserved=["all"])
239        db.define_table("tt", Field("aa", "integer"), Field("b", "reference tt"))
240        i1 = db.tt.insert(aa=1)
241        db.tt.insert(aa=4, b=i1)
242        q = db.tt.b == db.tt.id
243        with self.assertRaises(NotOnNOSQLError):
244            db(db.tt).select(left=db.tt.on(q))
245        with self.assertRaises(NotOnNOSQLError):
246            db(db.tt).select(join=db.tt.on(q))
247        with self.assertRaises(NotOnNOSQLError):
248            db(db.tt).select(db.tt.on(q))
249        with self.assertRaises(TypeError):
250            db(db.tt).select(UNKNOWN=True)
251        db(db.tt).select(for_update=True)
252        self.assertEqual(db(db.tt).count(), 2)
253        db.tt.truncate()
254        self.assertEqual(db(db.tt).count(), 0)
255        drop(db.tt)
256        db.close()
257
258
259@unittest.skipIf(IS_IMAP, "Skip IMAP")
260class TestFields(unittest.TestCase):
261    def testFieldName(self):
262        """
263        - a "str" something
264        - not a method or property of Table
265        - "dotted-notation" friendly:
266            - a valid python identifier
267            - not a python keyword
268            - not starting with underscore or an integer
269            - not containing dots
270
271        Basically, anything alphanumeric, no symbols, only underscore as
272        punctuation
273        """
274
275        # Check that Fields cannot start with underscores
276        self.assertRaises(SyntaxError, Field, "_abc", "string")
277
278        # Check that Fields cannot contain punctuation other than underscores
279        self.assertRaises(SyntaxError, Field, "a.bc", "string")
280
281        # Check that Fields cannot be a name of a method or property of Table
282        for x in ["drop", "on", "truncate"]:
283            self.assertRaises(SyntaxError, Field, x, "string")
284
285        # Check that Fields allows underscores in the body of a field name.
286        self.assertTrue(
287            Field("a_bc", "string"),
288            "Field isn't allowing underscores in fieldnames.  It should.",
289        )
290
291        # Check that Field names don't allow a python keyword
292        self.assertRaises(SyntaxError, Field, "True", "string")
293        self.assertRaises(SyntaxError, Field, "elif", "string")
294        self.assertRaises(SyntaxError, Field, "while", "string")
295
296        # Check that Field names don't allow a non-valid python identifier
297        non_valid_examples = ["1x", "xx$%@%", "xx yy", "yy\na", "yy\n"]
298        for a in non_valid_examples:
299            self.assertRaises(SyntaxError, Field, a, "string")
300
301        # Check that Field names don't allow a unicode string
302        non_valid_examples = non_valid_examples = [
303            "ℙƴ☂ℌøἤ",
304            u"ℙƴ☂ℌøἤ",
305            u"àè",
306            u"ṧøмℯ",
307            u"тεṧт",
308            u"♥αłüℯṧ",
309            u"ℊεᾔ℮яαт℮∂",
310            u"♭ƴ",
311            u"ᾔ☤ρℌℓ☺ḓ",
312        ]
313        for a in non_valid_examples:
314            self.assertRaises(SyntaxError, Field, a, "string")
315
316    def testFieldTypes(self):
317
318        # Check that string, and password default length is 512
319        for typ in ["string", "password"]:
320            self.assertTrue(
321                Field("abc", typ).length == 512,
322                "Default length for type '%s' is not 512 or 255" % typ,
323            )
324
325        # Check that upload default length is 512
326        self.assertTrue(
327            Field("abc", "upload").length == 512,
328            "Default length for type 'upload' is not 512",
329        )
330
331        # Check that Tables passed in the type creates a reference
332        self.assertTrue(
333            Field("abc", Table(None, "temp")).type == "reference temp",
334            "Passing a Table does not result in a reference type.",
335        )
336
337    def testFieldLabels(self):
338
339        # Check that a label is successfully built from the supplied fieldname
340        self.assertTrue(
341            Field("abc", "string").label == "Abc", "Label built is incorrect"
342        )
343        self.assertTrue(
344            Field("abc_def", "string").label == "Abc Def", "Label built is incorrect"
345        )
346
347    def testFieldFormatters(self):  # Formatter should be called Validator
348
349        # Test the default formatters
350        for typ in ALLOWED_DATATYPES:
351            f = Field("abc", typ)
352            if typ not in ["date", "time", "datetime"]:
353                isinstance(f.formatter("test"), str)
354            else:
355                isinstance(f.formatter(datetime.datetime.now()), str)
356
357    def testRun(self):
358        db = DAL(DEFAULT_URI, check_reserved=["all"])
359        import pickle
360
361        # some db's only support milliseconds
362        datetime_datetime_today = datetime.datetime.today()
363        datetime_datetime_today = datetime_datetime_today.replace(
364            microsecond=datetime_datetime_today.microsecond
365            - datetime_datetime_today.microsecond % 1000
366        )
367
368        insert_vals = [
369            ("string", "x", ""),
370            ("string", "A\xc3\xa9 A", ""),
371            ("text", "x", ""),
372            ("password", "x", ""),
373            ("upload", "x", ""),
374            ("double", 3.1, 1),
375            ("integer", 3, 1),
376            ("boolean", True, True),
377            ("date", datetime.date.today(), datetime.date.today()),
378            (
379                "datetime",
380                datetime.datetime(1971, 12, 21, 10, 30, 55, 0),
381                datetime_datetime_today,
382            ),
383            ("time", datetime_datetime_today.time(), datetime_datetime_today.time()),
384            ("blob", "x", ""),
385            ("blob", b"xyzzy", ""),
386            # pickling a tuple will create a string which is not UTF-8 able.
387            ("blob", pickle.dumps((0,), pickle.HIGHEST_PROTOCOL), ""),
388        ]
389
390        if not IS_GAE:
391            # these are unsupported by GAE
392            insert_vals.append(("blob", bytearray("a", "utf-8"), ""))
393            insert_vals.append(("json", {"a": "b", "c": [1, 2]}, {}))
394
395        for iv in insert_vals:
396            db.define_table("tt", Field("aa", iv[0], default=iv[2]))
397            # empty string stored to blob returns None
398            default_return = None if iv[0] == "blob" and iv[2] == "" else iv[2]
399            self.assertTrue(isinstance(db.tt.insert(), long))
400            self.assertTrue(isinstance(db.tt.insert(aa=iv[1]), long))
401            self.assertTrue(isinstance(db.tt.insert(aa=None), long))
402            cv = iv[1]
403            if IS_MONGODB and not PY2 and iv[0] == "blob":
404                cv = to_bytes(iv[1])
405            self.assertEqual(db().select(db.tt.aa)[0].aa, default_return)
406            self.assertEqual(db().select(db.tt.aa)[1].aa, cv)
407            self.assertEqual(db().select(db.tt.aa)[2].aa, None)
408
409            if not IS_GAE:
410                ## field aliases
411                row = db().select(db.tt.aa.with_alias("zz"))[1]
412                self.assertEqual(row["zz"], cv)
413
414            drop(db.tt)
415
416        ## Row APIs
417        db.define_table(
418            "tt", Field("aa", "datetime", default=datetime.datetime.today())
419        )
420        t0 = datetime.datetime(1971, 12, 21, 10, 30, 55, 0)
421        id = db.tt.insert(aa=t0)
422        self.assertEqual(isinstance(id, long), True)
423
424        row = db().select(db.tt.aa)[0]
425        self.assertEqual(db.tt[id].aa, t0)
426        self.assertEqual(db.tt["aa"], db.tt.aa)
427        self.assertEqual(db.tt(id).aa, t0)
428        self.assertTrue(db.tt(id, aa=None) == None)
429        self.assertFalse(db.tt(id, aa=t0) == None)
430        self.assertEqual(row.aa, t0)
431        self.assertEqual(row["aa"], t0)
432        self.assertEqual(row["tt.aa"], t0)
433        self.assertEqual(row("tt.aa"), t0)
434
435        ## Lazy and Virtual fields
436        db.tt.b = Field.Virtual(lambda row: row.tt.aa)
437        # test for FieldVirtual.bind
438        self.assertEqual(db.tt.b.tablename, "tt")
439        self.assertEqual(db.tt.b.name, "b")
440        db.tt.c = Field.Lazy(lambda row: row.tt.aa)
441        # test for FieldMethod.bind
442        self.assertEqual(db.tt.c.name, "c")
443        rows = db().select(db.tt.aa)
444        row = rows[0]
445        self.assertEqual(row.b, t0)
446        self.assertEqual(row.c(), t0)
447        # test for BasicRows.colnames_fields
448        rows.colnames.insert(0, "tt.b")
449        rows.colnames.insert(1, "tt.c")
450        colnames_fields = rows.colnames_fields
451        self.assertIs(colnames_fields[0], db.tt.b)
452        self.assertIs(colnames_fields[1], db.tt.c)
453        drop(db.tt)
454
455        db.define_table("tt", Field("aa", "time", default="11:30"))
456        t0 = datetime.time(10, 30, 55)
457        self.assertEqual(isinstance(db.tt.insert(aa=t0), long), True)
458        self.assertEqual(db().select(db.tt.aa)[0].aa, t0)
459        drop(db.tt)
460        db.close()
461
462
463@unittest.skipIf(IS_IMAP, "Skip IMAP")
464class TestTables(unittest.TestCase):
465    def testTableNames(self):
466        """
467        - a "str" something
468        - not a method or property of DAL
469        - "dotted-notation" friendly:
470            - a valid python identifier
471            - not a python keyword
472            - not starting with underscore or an integer
473            - not containing dots
474
475        Basically, anything alphanumeric, no symbols, only underscore as
476        punctuation
477        """
478
479        # Check that Tables cannot start with underscores
480        self.assertRaises(SyntaxError, Table, None, "_abc")
481
482        # Check that Tables cannot contain punctuation other than underscores
483        self.assertRaises(SyntaxError, Table, None, "a.bc")
484
485        # Check that Tables cannot be a name of a method or property of DAL
486        for x in ["define_table", "tables", "as_dict"]:
487            self.assertRaises(SyntaxError, Table, None, x)
488
489        # Check that Table allows underscores in the body of a field name.
490        self.assertTrue(
491            Table(None, "a_bc"),
492            "Table isn't allowing underscores in tablename.  It should.",
493        )
494
495        # Check that Table names don't allow a python keyword
496        self.assertRaises(SyntaxError, Table, None, "True")
497        self.assertRaises(SyntaxError, Table, None, "elif")
498        self.assertRaises(SyntaxError, Table, None, "while")
499
500        # Check that Table names don't allow a non-valid python identifier
501        non_valid_examples = ["1x", "xx$%@%", "xx yy", "yy\na", "yy\n"]
502        for a in non_valid_examples:
503            self.assertRaises(SyntaxError, Table, None, a)
504
505        # Check that Table names don't allow a unicode string
506        non_valid_examples = [
507            "ℙƴ☂ℌøἤ",
508            u"ℙƴ☂ℌøἤ",
509            u"àè",
510            u"ṧøмℯ",
511            u"тεṧт",
512            u"♥αłüℯṧ",
513            u"ℊεᾔ℮яαт℮∂",
514            u"♭ƴ",
515            u"ᾔ☤ρℌℓ☺ḓ",
516        ]
517        for a in non_valid_examples:
518            self.assertRaises(SyntaxError, Table, None, a)
519
520
521@unittest.skipIf(IS_IMAP, "Skip IMAP")
522class TestAll(unittest.TestCase):
523    def setUp(self):
524        self.pt = Table(None, "PseudoTable", Field("name"), Field("birthdate"))
525
526    def testSQLALL(self):
527        ans = "PseudoTable.id, PseudoTable.name, PseudoTable.birthdate"
528        self.assertEqual(str(SQLALL(self.pt)), ans)
529
530
531@unittest.skipIf(IS_IMAP, "Skip IMAP")
532class TestTable(unittest.TestCase):
533    def testTableCreation(self):
534
535        # Check for error when not passing type other than Field or Table
536
537        self.assertRaises(SyntaxError, Table, None, "test", None)
538
539        persons = Table(
540            None, "persons", Field("firstname", "string"), Field("lastname", "string")
541        )
542
543        # Does it have the correct fields?
544
545        self.assertTrue(set(persons.fields).issuperset(set(["firstname", "lastname"])))
546
547        # ALL is set correctly
548
549        self.assertTrue("persons.firstname, persons.lastname" in str(persons.ALL))
550
551    def testTableAlias(self):
552        db = DAL(DEFAULT_URI, check_reserved=["all"])
553        persons = Table(
554            db, "persons", Field("firstname", "string"), Field("lastname", "string")
555        )
556        aliens = persons.with_alias("aliens")
557
558        # Are the different table instances with the same fields
559
560        self.assertTrue(persons is not aliens)
561        self.assertTrue(set(persons.fields) == set(aliens.fields))
562        db.close()
563
564    def testTableInheritance(self):
565        persons = Table(
566            None, "persons", Field("firstname", "string"), Field("lastname", "string")
567        )
568        customers = Table(
569            None, "customers", Field("items_purchased", "integer"), persons
570        )
571        self.assertTrue(
572            set(customers.fields).issuperset(
573                set(["items_purchased", "firstname", "lastname"])
574            )
575        )
576
577
578class TestInsert(unittest.TestCase):
579    def testRun(self):
580        if IS_IMAP:
581            imap = DAL(DEFAULT_URI)
582            imap.define_tables()
583            self.assertEqual(
584                imap.Draft.insert(
585                    to="nurse@example.com",
586                    subject="Nurse!",
587                    sender="gumby@example.com",
588                    content="Nurse!\r\nNurse!",
589                ),
590                2,
591            )
592            self.assertEqual(imap.Draft[2].subject, "Nurse!")
593            self.assertEqual(imap.Draft[2].sender, "gumby@example.com")
594            self.assertEqual(isinstance(imap.Draft[2].uid, long), True)
595            self.assertEqual(imap.Draft[2].content[0]["text"], "Nurse!\r\nNurse!")
596            imap.close()
597        else:
598            db = DAL(DEFAULT_URI, check_reserved=["all"])
599            db.define_table("tt", Field("aa"))
600            self.assertEqual(isinstance(db.tt.insert(aa="1"), long), True)
601            self.assertEqual(isinstance(db.tt.insert(aa="1"), long), True)
602            self.assertEqual(isinstance(db.tt.insert(aa="1"), long), True)
603            self.assertEqual(db(db.tt.aa == "1").count(), 3)
604            self.assertEqual(db(db.tt.aa == "2").isempty(), True)
605            self.assertEqual(db(db.tt.aa == "1").update(aa="2"), 3)
606            self.assertEqual(db(db.tt.aa == "2").count(), 3)
607            self.assertEqual(db(db.tt.aa == "2").isempty(), False)
608            self.assertEqual(db(db.tt.aa == "2").delete(), 3)
609            self.assertEqual(db(db.tt.aa == "2").isempty(), True)
610
611            def callable():
612                return "aa"
613
614            self.assertTrue(isinstance(db.tt.insert(aa=callable), long))
615            self.assertEqual(db(db.tt.aa == "aa").count(), 1)
616
617            drop(db.tt)
618            db.close()
619
620
621@unittest.skipIf(IS_IMAP, "Skip IMAP")
622class TestSelect(unittest.TestCase):
623    def testRun(self):
624        db = DAL(DEFAULT_URI, check_reserved=["all"])
625        db.define_table("tt", Field("aa"))
626        self.assertEqual(isinstance(db.tt.insert(aa="1"), long), True)
627        self.assertEqual(isinstance(db.tt.insert(aa="2"), long), True)
628        self.assertEqual(isinstance(db.tt.insert(aa="3"), long), True)
629        self.assertEqual(db(db.tt.id > 0).count(), 3)
630        self.assertEqual(db(db.tt.aa).count(), 3)
631        self.assertEqual(db(db.tt.id).count(), 3)
632        self.assertEqual(db(db.tt.id != None).count(), 3)
633
634        self.assertEqual(db(db.tt.id > 0).select(orderby=~db.tt.aa)[0].aa, "3")
635        self.assertEqual(
636            db(db.tt.id > 0).select(orderby=~db.tt.aa | db.tt.id)[0].aa, "3"
637        )
638        self.assertEqual(len(db(db.tt.id > 0).select(limitby=(1, 2))), 1)
639        self.assertEqual(db(db.tt.id > 0).select(limitby=(1, 2))[0].aa, "2")
640        self.assertEqual(len(db().select(db.tt.ALL)), 3)
641        self.assertEqual(db(db.tt.aa == None).count(), 0)
642        self.assertEqual(db(db.tt.aa != None).count(), 3)
643        self.assertEqual(db(db.tt.aa > "1").count(), 2)
644        self.assertEqual(db(db.tt.aa >= "1").count(), 3)
645        self.assertEqual(db(db.tt.aa == "1").count(), 1)
646        self.assertEqual(db(db.tt.aa != "1").count(), 2)
647        self.assertEqual(db(db.tt.aa < "3").count(), 2)
648        self.assertEqual(db(db.tt.aa <= "3").count(), 3)
649        self.assertEqual(db(db.tt.aa > "1")(db.tt.aa < "3").count(), 1)
650        self.assertEqual(db((db.tt.aa > "1") & (db.tt.aa < "3")).count(), 1)
651        self.assertEqual(db((db.tt.aa > "1") | (db.tt.aa < "3")).count(), 3)
652        # Test not operator
653        self.assertEqual(db(~(db.tt.aa != "1")).count(), 1)
654        self.assertEqual(db(~(db.tt.aa == "1")).count(), 2)
655        self.assertEqual(db((db.tt.aa > "1") & ~(db.tt.aa > "2")).count(), 1)
656        self.assertEqual(db(~(db.tt.aa > "1") & (db.tt.aa > "2")).count(), 0)
657        self.assertEqual(db(~((db.tt.aa < "1") | (db.tt.aa > "2"))).count(), 2)
658        self.assertEqual(db(~((db.tt.aa >= "1") & (db.tt.aa <= "2"))).count(), 1)
659        drop(db.tt)
660        db.close()
661
662    def testListInteger(self):
663        db = DAL(DEFAULT_URI, check_reserved=["all"])
664        db.define_table("tt", Field("aa", "list:integer"))
665        l = [0, 1, 2, 3, 4, 5]
666        db.tt.insert(aa=l)
667        self.assertEqual(db(db.tt).select("tt.aa").first()[db.tt.aa], l)
668        drop(db.tt)
669        db.close()
670
671    def testListString(self):
672        db = DAL(DEFAULT_URI, check_reserved=["all"])
673        db.define_table("tt", Field("aa", "list:string"))
674        l = ["a", "b", "c"]
675        db.tt.insert(aa=l)
676        self.assertEqual(db(db.tt).select("tt.aa").first()[db.tt.aa], l)
677        drop(db.tt)
678        db.close()
679
680    def testListReference(self):
681        db = DAL(DEFAULT_URI, check_reserved=["all"])
682        on_deletes = (
683            "CASCADE",
684            "SET NULL",
685        )
686        for ondelete in on_deletes:
687            db.define_table("t0", Field("aa", "string"))
688            db.define_table(
689                "tt", Field("t0_id", "list:reference t0", ondelete=ondelete)
690            )
691            id_a1 = db.t0.insert(aa="test1")
692            id_a2 = db.t0.insert(aa="test2")
693            ref1 = [id_a1]
694            ref2 = [id_a2]
695            ref3 = [id_a1, id_a2]
696            db.tt.insert(t0_id=ref1)
697            self.assertEqual(db(db.tt).select(db.tt.t0_id).last()[db.tt.t0_id], ref1)
698            db.tt.insert(t0_id=ref2)
699            self.assertEqual(db(db.tt).select(db.tt.t0_id).last()[db.tt.t0_id], ref2)
700            db.tt.insert(t0_id=ref3)
701            self.assertEqual(db(db.tt).select(db.tt.t0_id).last()[db.tt.t0_id], ref3)
702
703            if IS_MONGODB:
704                self.assertEqual(db(db.tt.t0_id == ref3).count(), 1)
705
706                self.assertEqual(db(db.tt.t0_id.contains(id_a1)).count(), 2)
707                self.assertEqual(db(db.tt.t0_id.contains(id_a2)).count(), 2)
708                db(db.t0.aa == "test1").delete()
709                if ondelete == "SET NULL":
710                    self.assertEqual(db(db.tt).count(), 3)
711                    self.assertEqual(db(db.tt).select()[0].t0_id, [])
712                if ondelete == "CASCADE":
713                    self.assertEqual(db(db.tt).count(), 2)
714                    self.assertEqual(db(db.tt).select()[0].t0_id, ref2)
715
716            drop(db.tt)
717            drop(db.t0)
718        db.close()
719
720    @unittest.skipIf(IS_GAE, "no groupby in appengine")
721    def testGroupByAndDistinct(self):
722        db = DAL(DEFAULT_URI, check_reserved=["all"])
723        db.define_table(
724            "tt", Field("aa"), Field("bb", "integer"), Field("cc", "integer")
725        )
726        db.tt.insert(aa="4", bb=1, cc=1)
727        db.tt.insert(aa="3", bb=2, cc=1)
728        db.tt.insert(aa="3", bb=1, cc=1)
729        db.tt.insert(aa="1", bb=1, cc=1)
730        db.tt.insert(aa="1", bb=2, cc=1)
731        db.tt.insert(aa="1", bb=3, cc=1)
732        db.tt.insert(aa="1", bb=4, cc=1)
733        db.tt.insert(aa="2", bb=1, cc=1)
734        db.tt.insert(aa="2", bb=2, cc=1)
735        db.tt.insert(aa="2", bb=3, cc=1)
736        self.assertEqual(db(db.tt).count(), 10)
737
738        # test groupby
739        result = db().select(db.tt.aa, db.tt.bb.sum(), groupby=db.tt.aa)
740        self.assertEqual(len(result), 4)
741        result = db().select(
742            db.tt.aa, db.tt.bb.sum(), groupby=db.tt.aa, orderby=db.tt.aa
743        )
744        self.assertEqual(tuple(result.response[2]), ("3", 3))
745        result = db().select(
746            db.tt.aa, db.tt.bb.sum(), groupby=db.tt.aa, orderby=~db.tt.aa
747        )
748        self.assertEqual(tuple(result.response[1]), ("3", 3))
749        result = db().select(
750            db.tt.aa,
751            db.tt.bb,
752            db.tt.cc.sum(),
753            groupby=db.tt.aa | db.tt.bb,
754            orderby=(db.tt.aa | ~db.tt.bb),
755        )
756        self.assertEqual(tuple(result.response[4]), ("2", 3, 1))
757        result = db().select(
758            db.tt.aa,
759            db.tt.bb.sum(),
760            groupby=db.tt.aa,
761            orderby=~db.tt.aa,
762            limitby=(1, 2),
763        )
764        self.assertEqual(len(result), 1)
765        self.assertEqual(tuple(result.response[0]), ("3", 3))
766        result = db().select(db.tt.aa, db.tt.bb.sum(), groupby=db.tt.aa, limitby=(0, 3))
767        self.assertEqual(len(result), 3)
768        self.assertEqual(tuple(result.response[2]), ("3", 3))
769
770        # test having
771        self.assertEqual(
772            len(
773                db().select(
774                    db.tt.aa,
775                    db.tt.bb.sum(),
776                    groupby=db.tt.aa,
777                    having=db.tt.bb.sum() > 2,
778                )
779            ),
780            3,
781        )
782
783        # test distinct
784        result = db().select(db.tt.aa, db.tt.cc, distinct=True)
785        self.assertEqual(len(result), 4)
786        result = db().select(db.tt.cc, distinct=True, groupby=db.tt.cc)
787        self.assertEqual(len(result), 1)
788        self.assertEqual(result[0].cc, 1)
789        result = db().select(db.tt.aa, distinct=True, orderby=~db.tt.aa)
790        self.assertEqual(result[2].aa, "2")
791        self.assertEqual(result[1].aa, "3")
792        result = db().select(
793            db.tt.aa, db.tt.bb, distinct=True, orderby=(db.tt.aa | ~db.tt.bb)
794        )
795        self.assertEqual(tuple(result.response[4]), ("2", 3))
796        result = db().select(
797            db.tt.aa, distinct=db.tt.aa, orderby=~db.tt.aa, limitby=(1, 2)
798        )
799        self.assertEqual(len(result), 1)
800        self.assertEqual(result[0].aa, "3")
801
802        # test count distinct
803        db.tt.insert(aa="2", bb=3, cc=1)
804        self.assertEqual(db(db.tt).count(distinct=db.tt.aa), 4)
805        self.assertEqual(db(db.tt).count(distinct=db.tt.aa | db.tt.bb), 10)
806        self.assertEqual(db(db.tt).count(distinct=db.tt.aa | db.tt.bb | db.tt.cc), 10)
807        self.assertEqual(db(db.tt).count(distinct=True), 10)
808        self.assertEqual(db(db.tt.aa).count(db.tt.aa), 4)
809        self.assertEqual(db(db.tt.aa).count(), 11)
810        count = db.tt.aa.count()
811        self.assertEqual(db(db.tt).select(count).first()[count], 11)
812
813        count = db.tt.aa.count(distinct=True)
814        sum = db.tt.bb.sum()
815        result = db(db.tt).select(count, sum)
816        self.assertEqual(tuple(result.response[0]), (4, 23))
817        self.assertEqual(result.first()[count], 4)
818        self.assertEqual(result.first()[sum], 23)
819
820        if not IS_MONGODB or db._adapter.server_version_major >= 2.6:
821            # mongo < 2.6 does not support $size
822            count = db.tt.aa.count(distinct=True) + db.tt.bb.count(distinct=True)
823            self.assertEqual(db(db.tt).select(count).first()[count], 8)
824
825        drop(db.tt)
826        db.close()
827
828    @unittest.skipIf(IS_GAE, "no coalesce in appengine")
829    def testCoalesce(self):
830        db = DAL(DEFAULT_URI, check_reserved=["all"])
831        db.define_table("tt", Field("aa"), Field("bb"), Field("cc"), Field("dd"))
832        db.tt.insert(aa="xx")
833        db.tt.insert(aa="xx", bb="yy")
834        db.tt.insert(aa="xx", bb="yy", cc="zz")
835        db.tt.insert(aa="xx", bb="yy", cc="zz", dd="")
836        result = db(db.tt).select(db.tt.dd.coalesce(db.tt.cc, db.tt.bb, db.tt.aa))
837        self.assertEqual(result.response[0][0], "xx")
838        self.assertEqual(result.response[1][0], "yy")
839        self.assertEqual(result.response[2][0], "zz")
840        self.assertEqual(result.response[3][0], "")
841        db.tt.drop()
842
843        db.define_table("tt", Field("aa", "integer"), Field("bb"))
844        db.tt.insert(bb="")
845        db.tt.insert(aa=1)
846        result = db(db.tt).select(db.tt.aa.coalesce_zero())
847        self.assertEqual(result.response[0][0], 0)
848        self.assertEqual(result.response[1][0], 1)
849
850        db.tt.drop()
851        db.close()
852
853
854@unittest.skipIf(IS_IMAP, "TODO: IMAP test")
855class TestAddMethod(unittest.TestCase):
856    def testRun(self):
857        db = DAL(DEFAULT_URI, check_reserved=["all"])
858        db.define_table("tt", Field("aa"))
859
860        @db.tt.add_method.all
861        def select_all(table, orderby=None):
862            return table._db(table).select(orderby=orderby)
863
864        self.assertEqual(isinstance(db.tt.insert(aa="1"), long), True)
865        self.assertEqual(isinstance(db.tt.insert(aa="2"), long), True)
866        self.assertEqual(isinstance(db.tt.insert(aa="3"), long), True)
867        self.assertEqual(len(db.tt.all()), 3)
868        drop(db.tt)
869        db.close()
870
871
872@unittest.skipIf(IS_IMAP, "TODO: IMAP test")
873class TestBelongs(unittest.TestCase):
874    def __init__(self, *args, **vars):
875        unittest.TestCase.__init__(self, *args, **vars)
876        self.db = None
877
878    def setUp(self):
879        db = DAL(DEFAULT_URI, check_reserved=["all"])
880        db.define_table("tt", Field("aa"))
881        self.i_id = db.tt.insert(aa="1")
882        self.assertEqual(isinstance(self.i_id, long), True)
883        self.assertEqual(isinstance(db.tt.insert(aa="2"), long), True)
884        self.assertEqual(isinstance(db.tt.insert(aa="3"), long), True)
885        self.db = db
886
887    def testRun(self):
888        db = self.db
889        self.assertEqual(db(db.tt.aa.belongs(("1", "3"))).count(), 2)
890        self.assertEqual(db(db.tt.aa.belongs(["1", "3"])).count(), 2)
891        self.assertEqual(db(db.tt.aa.belongs(["1", "3"])).count(), 2)
892        self.assertEqual(db(db.tt.id.belongs([self.i_id])).count(), 1)
893        self.assertEqual(db(db.tt.id.belongs([])).count(), 0)
894
895    @unittest.skipIf(
896        IS_GAE or IS_MONGODB,
897        "Datastore/Mongodb belongs() does not accept nested queries",
898    )
899    def testNested(self):
900        db = self.db
901        self.assertEqual(
902            db(db.tt.aa.belongs(db(db.tt.id == self.i_id)._select(db.tt.aa))).count(), 1
903        )
904
905        self.assertEqual(
906            db(
907                db.tt.aa.belongs(db(db.tt.aa.belongs(("1", "3")))._select(db.tt.aa))
908            ).count(),
909            2,
910        )
911        self.assertEqual(
912            db(
913                db.tt.aa.belongs(
914                    db(
915                        db.tt.aa.belongs(
916                            db(db.tt.aa.belongs(("1", "3")))._select(db.tt.aa)
917                        )
918                    )._select(db.tt.aa)
919                )
920            ).count(),
921            2,
922        )
923
924    def tearDown(self):
925        db = self.db
926        drop(db.tt)
927        db.close()
928        self.db = None
929
930
931@unittest.skipIf(
932    IS_GAE or IS_IMAP, "Contains not supported on GAE Datastore. TODO: IMAP tests"
933)
934class TestContains(unittest.TestCase):
935    def testRun(self):
936        db = DAL(DEFAULT_URI, check_reserved=["all"])
937        db.define_table("tt", Field("aa", "list:string"), Field("bb", "string"))
938        self.assertEqual(
939            isinstance(db.tt.insert(aa=["aaa", "bbb"], bb="aaa"), long), True
940        )
941        self.assertEqual(
942            isinstance(db.tt.insert(aa=["bbb", "ddd"], bb="abb"), long), True
943        )
944        self.assertEqual(
945            isinstance(db.tt.insert(aa=["eee", "aaa"], bb="acc"), long), True
946        )
947        self.assertEqual(db(db.tt.aa.contains("aaa")).count(), 2)
948        self.assertEqual(db(db.tt.aa.contains("bbb")).count(), 2)
949        self.assertEqual(db(db.tt.aa.contains("aa")).count(), 0)
950        self.assertEqual(db(db.tt.bb.contains("a")).count(), 3)
951        self.assertEqual(db(db.tt.bb.contains("b")).count(), 1)
952        self.assertEqual(db(db.tt.bb.contains("d")).count(), 0)
953        self.assertEqual(db(db.tt.aa.contains(db.tt.bb)).count(), 1)
954
955        # case-sensitivity tests, if 1 it isn't
956        is_case_insensitive = db(db.tt.bb.contains("AAA", case_sensitive=True)).count()
957        if is_case_insensitive:
958            self.assertEqual(db(db.tt.aa.contains("AAA")).count(), 2)
959            self.assertEqual(db(db.tt.bb.contains("A")).count(), 3)
960        else:
961            self.assertEqual(
962                db(db.tt.aa.contains("AAA", case_sensitive=True)).count(), 0
963            )
964            self.assertEqual(db(db.tt.bb.contains("A", case_sensitive=True)).count(), 0)
965            self.assertEqual(
966                db(db.tt.aa.contains("AAA", case_sensitive=False)).count(), 2
967            )
968            self.assertEqual(
969                db(db.tt.bb.contains("A", case_sensitive=False)).count(), 3
970            )
971        db.tt.drop()
972
973        # integers in string fields
974        db.define_table(
975            "tt",
976            Field("aa", "list:string"),
977            Field("bb", "string"),
978            Field("cc", "integer"),
979        )
980        self.assertEqual(
981            isinstance(db.tt.insert(aa=["123", "456"], bb="123", cc=12), long), True
982        )
983        self.assertEqual(
984            isinstance(db.tt.insert(aa=["124", "456"], bb="123", cc=123), long), True
985        )
986        self.assertEqual(
987            isinstance(db.tt.insert(aa=["125", "457"], bb="23", cc=125), long), True
988        )
989        self.assertEqual(db(db.tt.aa.contains(123)).count(), 1)
990        self.assertEqual(db(db.tt.aa.contains(23)).count(), 0)
991        self.assertEqual(db(db.tt.aa.contains(db.tt.cc)).count(), 1)
992        self.assertEqual(db(db.tt.bb.contains(123)).count(), 2)
993        self.assertEqual(db(db.tt.bb.contains(23)).count(), 3)
994        self.assertEqual(db(db.tt.bb.contains(db.tt.cc)).count(), 2)
995        db.tt.drop()
996
997        # string field contains string field
998        db.define_table("tt", Field("aa"), Field("bb"))
999        db.tt.insert(aa="aaa", bb="%aaa")
1000        db.tt.insert(aa="aaa", bb="aaa")
1001        self.assertEqual(db(db.tt.aa.contains(db.tt.bb)).count(), 1)
1002        drop(db.tt)
1003
1004        # escaping
1005        db.define_table("tt", Field("aa"))
1006        db.tt.insert(aa="perc%ent")
1007        db.tt.insert(aa="percent")
1008        db.tt.insert(aa="percxyzent")
1009        db.tt.insert(aa="under_score")
1010        db.tt.insert(aa="underxscore")
1011        db.tt.insert(aa="underyscore")
1012        self.assertEqual(db(db.tt.aa.contains("perc%ent")).count(), 1)
1013        self.assertEqual(db(db.tt.aa.contains("under_score")).count(), 1)
1014        drop(db.tt)
1015        db.close()
1016
1017
1018@unittest.skipIf(IS_GAE, "Like not supported on GAE Datastore.")
1019@unittest.skipIf(IS_IMAP, "TODO: IMAP test")
1020class TestLike(unittest.TestCase):
1021    def setUp(self):
1022        db = DAL(DEFAULT_URI, check_reserved=["all"])
1023        db.define_table("tt", Field("aa"))
1024        self.assertEqual(isinstance(db.tt.insert(aa="abc"), long), True)
1025        self.db = db
1026
1027    def tearDown(self):
1028        db = self.db
1029        drop(db.tt)
1030        db.close()
1031        self.db = None
1032
1033    def testRun(self):
1034        db = self.db
1035        self.assertEqual(db(db.tt.aa.like("a%")).count(), 1)
1036        self.assertEqual(db(db.tt.aa.like("%b%")).count(), 1)
1037        self.assertEqual(db(db.tt.aa.like("%c")).count(), 1)
1038        self.assertEqual(db(db.tt.aa.like("%d%")).count(), 0)
1039        self.assertEqual(db(db.tt.aa.like("ab_")).count(), 1)
1040        self.assertEqual(db(db.tt.aa.like("a_c")).count(), 1)
1041        self.assertEqual(db(db.tt.aa.like("_bc")).count(), 1)
1042
1043        self.assertEqual(db(db.tt.aa.like("A%", case_sensitive=False)).count(), 1)
1044        self.assertEqual(db(db.tt.aa.like("%B%", case_sensitive=False)).count(), 1)
1045        self.assertEqual(db(db.tt.aa.like("%C", case_sensitive=False)).count(), 1)
1046        self.assertEqual(db(db.tt.aa.ilike("A%")).count(), 1)
1047        self.assertEqual(db(db.tt.aa.ilike("%B%")).count(), 1)
1048        self.assertEqual(db(db.tt.aa.ilike("%C")).count(), 1)
1049
1050        # DAL maps like() (and contains(), startswith(), endswith())
1051        # to the LIKE operator, that in ANSI-SQL is case-sensitive
1052        # There are backends supporting case-sensitivity by default
1053        # and backends that needs additional care to turn
1054        # case-sensitivity on. To discern among those, let's run
1055        # this query comparing previously inserted 'abc' with 'ABC':
1056        # if the result is 0, then the backend recognizes
1057        # case-sensitivity, if 1 it isn't
1058        is_case_insensitive = db(db.tt.aa.like("%ABC%")).count()
1059        self.assertEqual(db(db.tt.aa.like("A%")).count(), is_case_insensitive)
1060        self.assertEqual(db(db.tt.aa.like("%B%")).count(), is_case_insensitive)
1061        self.assertEqual(db(db.tt.aa.like("%C")).count(), is_case_insensitive)
1062
1063    def testUpperLower(self):
1064        db = self.db
1065        self.assertEqual(db(db.tt.aa.upper().like("A%")).count(), 1)
1066        self.assertEqual(db(db.tt.aa.upper().like("%B%")).count(), 1)
1067        self.assertEqual(db(db.tt.aa.upper().like("%C")).count(), 1)
1068        self.assertEqual(db(db.tt.aa.lower().like("%c")).count(), 1)
1069
1070    def testStartsEndsWith(self):
1071        db = self.db
1072        self.assertEqual(db(db.tt.aa.startswith("a")).count(), 1)
1073        self.assertEqual(db(db.tt.aa.endswith("c")).count(), 1)
1074        self.assertEqual(db(db.tt.aa.startswith("c")).count(), 0)
1075        self.assertEqual(db(db.tt.aa.endswith("a")).count(), 0)
1076
1077    def testEscaping(self):
1078        db = self.db
1079        term = "ahbc".replace("h", "\\")  # funny but to avoid any doubts...
1080        db.tt.insert(aa="a%bc")
1081        db.tt.insert(aa="a_bc")
1082        db.tt.insert(aa=term)
1083        self.assertEqual(db(db.tt.aa.like("%ax%bc%", escape="x")).count(), 1)
1084        self.assertEqual(db(db.tt.aa.like("%ax_bc%", escape="x")).count(), 1)
1085        self.assertEqual(db(db.tt.aa.like("%" + term + "%")).count(), 1)
1086        db(db.tt.id > 0).delete()
1087        # test "literal" like, i.e. exactly as LIKE in the backend
1088        db.tt.insert(aa="perc%ent")
1089        db.tt.insert(aa="percent")
1090        db.tt.insert(aa="percxyzent")
1091        db.tt.insert(aa="under_score")
1092        db.tt.insert(aa="underxscore")
1093        db.tt.insert(aa="underyscore")
1094        self.assertEqual(db(db.tt.aa.like("%perc%ent%")).count(), 3)
1095        self.assertEqual(db(db.tt.aa.like("%under_score%")).count(), 3)
1096        db(db.tt.id > 0).delete()
1097        # escaping with startswith and endswith
1098        db.tt.insert(aa="%percent")
1099        db.tt.insert(aa="xpercent")
1100        db.tt.insert(aa="discount%")
1101        db.tt.insert(aa="discountx")
1102        self.assertEqual(db(db.tt.aa.endswith("discount%")).count(), 1)
1103        self.assertEqual(db(db.tt.aa.like("discount%%")).count(), 2)
1104        self.assertEqual(db(db.tt.aa.startswith("%percent")).count(), 1)
1105        self.assertEqual(db(db.tt.aa.like("%%percent")).count(), 2)
1106
1107    def testRegexp(self):
1108        db = self.db
1109        db(db.tt.id > 0).delete()
1110        db.tt.insert(aa="%percent")
1111        db.tt.insert(aa="xpercent")
1112        db.tt.insert(aa="discount%")
1113        db.tt.insert(aa="discountx")
1114        try:
1115            self.assertEqual(db(db.tt.aa.regexp("count")).count(), 2)
1116        except NotImplementedError:
1117            pass
1118        else:
1119            self.assertEqual(db(db.tt.aa.lower().regexp("count")).count(), 2)
1120            self.assertEqual(
1121                db(
1122                    db.tt.aa.upper().regexp("COUNT") & db.tt.aa.lower().regexp("count")
1123                ).count(),
1124                2,
1125            )
1126            self.assertEqual(
1127                db(
1128                    db.tt.aa.upper().regexp("COUNT") | (db.tt.aa.lower() == "xpercent")
1129                ).count(),
1130                3,
1131            )
1132
1133    def testLikeInteger(self):
1134        db = self.db
1135        db.tt.drop()
1136        db.define_table("tt", Field("aa", "integer"))
1137        self.assertEqual(isinstance(db.tt.insert(aa=1111111111), long), True)
1138        self.assertEqual(isinstance(db.tt.insert(aa=1234567), long), True)
1139        self.assertEqual(db(db.tt.aa.like("1%")).count(), 2)
1140        self.assertEqual(db(db.tt.aa.like("1_3%")).count(), 1)
1141        self.assertEqual(db(db.tt.aa.like("2%")).count(), 0)
1142        self.assertEqual(db(db.tt.aa.like("_2%")).count(), 1)
1143        self.assertEqual(db(db.tt.aa.like("12%")).count(), 1)
1144        self.assertEqual(db(db.tt.aa.like("012%")).count(), 0)
1145        self.assertEqual(db(db.tt.aa.like("%45%")).count(), 1)
1146        self.assertEqual(db(db.tt.aa.like("%54%")).count(), 0)
1147
1148
1149@unittest.skipIf(IS_IMAP, "TODO: IMAP test")
1150class TestDatetime(unittest.TestCase):
1151    def testRun(self):
1152        db = DAL(DEFAULT_URI, check_reserved=["all"])
1153        db.define_table("tt", Field("aa", "datetime"))
1154        self.assertEqual(
1155            isinstance(db.tt.insert(aa=datetime.datetime(1971, 12, 21, 11, 30)), long),
1156            True,
1157        )
1158        self.assertEqual(
1159            isinstance(db.tt.insert(aa=datetime.datetime(1971, 11, 21, 10, 30)), long),
1160            True,
1161        )
1162        self.assertEqual(
1163            isinstance(db.tt.insert(aa=datetime.datetime(1970, 12, 21, 9, 31)), long),
1164            True,
1165        )
1166        self.assertEqual(
1167            db(db.tt.aa == datetime.datetime(1971, 12, 21, 11, 30)).count(), 1
1168        )
1169        self.assertEqual(db(db.tt.aa >= datetime.datetime(1971, 1, 1)).count(), 2)
1170
1171        if IS_MONGODB:
1172            self.assertEqual(db(db.tt.aa.year() == 1971).count(), 2)
1173            self.assertEqual(db(db.tt.aa.month() > 11).count(), 2)
1174            self.assertEqual(db(db.tt.aa.day() >= 21).count(), 3)
1175            self.assertEqual(db(db.tt.aa.hour() < 10).count(), 1)
1176            self.assertEqual(db(db.tt.aa.minutes() <= 30).count(), 2)
1177            self.assertEqual(db(db.tt.aa.seconds() != 31).count(), 3)
1178            self.assertEqual(db(db.tt.aa.epoch() < 365 * 24 * 3600).delete(), 1)
1179        drop(db.tt)
1180
1181        db.define_table("tt", Field("aa", "time"))
1182        t0 = datetime.time(10, 30, 55)
1183        db.tt.insert(aa=t0)
1184        self.assertEqual(db().select(db.tt.aa)[0].aa, t0)
1185        drop(db.tt)
1186
1187        db.define_table("tt", Field("aa", "date"))
1188        t0 = datetime.date.today()
1189        db.tt.insert(aa=t0)
1190        self.assertEqual(db().select(db.tt.aa)[0].aa, t0)
1191        drop(db.tt)
1192        db.close()
1193
1194
1195@unittest.skipIf(IS_GAE or IS_IMAP, "Expressions are not supported")
1196class TestExpressions(unittest.TestCase):
1197    def testRun(self):
1198        if IS_MONGODB:
1199            DAL_OPTS = (
1200                (True, {"adapter_args": {"safe": True}}),
1201                (False, {"adapter_args": {"safe": False}}),
1202            )
1203        for dal_opt in DAL_OPTS:
1204            db = DAL(DEFAULT_URI, check_reserved=["all"], **dal_opt[1])
1205            db.define_table(
1206                "tt",
1207                Field("aa", "integer"),
1208                Field("bb", "integer", default=0),
1209                Field("cc"),
1210            )
1211            self.assertEqual(isinstance(db.tt.insert(aa=1), long), dal_opt[0])
1212            self.assertEqual(isinstance(db.tt.insert(aa=2), long), dal_opt[0])
1213            self.assertEqual(isinstance(db.tt.insert(aa=3), long), dal_opt[0])
1214
1215            # test update
1216            self.assertEqual(
1217                db(db.tt.aa == 3).update(aa=db.tt.aa + 1, bb=db.tt.bb + 2), 1
1218            )
1219            self.assertEqual(db(db.tt.aa == 4).count(), 1)
1220            self.assertEqual(db(db.tt.bb == 2).count(), 1)
1221            self.assertEqual(db(db.tt.aa == -2).count(), 0)
1222            self.assertEqual(db(db.tt.aa == 4).update(aa=db.tt.aa * 2, bb=5), 1)
1223            self.assertEqual(db(db.tt.bb == 5).count(), 1)
1224            self.assertEqual(db(db.tt.aa + 1 == 9).count(), 1)
1225            self.assertEqual(db(db.tt.aa + 1 == 9).update(aa=db.tt.aa - 2, cc="cc"), 1)
1226            self.assertEqual(db(db.tt.cc == "cc").count(), 1)
1227            self.assertEqual(db(db.tt.aa == 6).count(), 1)
1228            self.assertEqual(db(db.tt.aa == 6).update(bb=db.tt.aa * (db.tt.bb - 3)), 1)
1229            self.assertEqual(db(db.tt.bb == 12).count(), 1)
1230            self.assertEqual(db(db.tt.aa == 6).count(), 1)
1231            self.assertEqual(
1232                db(db.tt.aa == 6).update(aa=db.tt.aa % 4 + 1, cc=db.tt.cc + "1" + "1"),
1233                1,
1234            )
1235            self.assertEqual(db(db.tt.cc == "cc11").count(), 1)
1236            self.assertEqual(db(db.tt.aa == 3).count(), 1)
1237
1238            # test comparsion expression based count
1239            self.assertEqual(db(db.tt.aa != db.tt.aa).count(), 0)
1240            self.assertEqual(db(db.tt.aa == db.tt.aa).count(), 3)
1241
1242            # test select aggregations
1243            sum = (db.tt.aa + 1).sum()
1244            self.assertEqual(db(db.tt.aa + 1 >= 3).select(sum).first()[sum], 7)
1245            self.assertEqual(db((1 == 0) & (db.tt.aa >= db.tt.aa)).count(), 0)
1246            self.assertEqual(db(db.tt.aa * 2 == -2).select(sum).first()[sum], None)
1247
1248            count = db.tt.aa.count()
1249            avg = db.tt.aa.avg()
1250            min = db.tt.aa.min()
1251            max = db.tt.aa.max()
1252            result = db(db.tt).select(sum, count, avg, min, max).first()
1253            self.assertEqual(result[sum], 9)
1254            self.assertEqual(result[count], 3)
1255            self.assertEqual(result[avg], 2)
1256            self.assertEqual(result[min], 1)
1257            self.assertEqual(result[max], 3)
1258
1259            # Test basic expressions evaluated at python level
1260            self.assertEqual(db((1 == 1) & (db.tt.aa >= 2)).count(), 2)
1261            self.assertEqual(db((1 == 1) | (db.tt.aa >= 2)).count(), 3)
1262            self.assertEqual(db((1 == 0) & (db.tt.aa >= 2)).count(), 0)
1263            self.assertEqual(db((1 == 0) | (db.tt.aa >= 2)).count(), 2)
1264
1265            # test abs()
1266            self.assertEqual(db(db.tt.aa == 2).update(aa=db.tt.aa * -10), 1)
1267            abs = db.tt.aa.abs().with_alias("abs")
1268            result = db(db.tt.aa == -20).select(abs).first()
1269            self.assertEqual(result[abs], 20)
1270            self.assertEqual(result["abs"], 20)
1271            abs = db.tt.aa.abs() / 10 + 5
1272            exp = abs.min() * 2 + 1
1273            result = db(db.tt.aa == -20).select(exp).first()
1274            self.assertEqual(result[exp], 15)
1275
1276            # test case()
1277            condition = db.tt.aa > 2
1278            case = condition.case(db.tt.aa + 2, db.tt.aa - 2)
1279            my_case = case.with_alias("my_case")
1280            result = db().select(my_case)
1281            self.assertEqual(len(result), 3)
1282            self.assertEqual(result[0][my_case], -1)
1283            self.assertEqual(result[0]["my_case"], -1)
1284            self.assertEqual(result[1]["my_case"], -22)
1285            self.assertEqual(result[2]["my_case"], 5)
1286
1287            # test expression based delete
1288            self.assertEqual(db(db.tt.aa + 1 >= 4).count(), 1)
1289            self.assertEqual(db(db.tt.aa + 1 >= 4).delete(), 1)
1290            self.assertEqual(db(db.tt.aa).count(), 2)
1291
1292            # cleanup
1293            drop(db.tt)
1294            db.close()
1295
1296    def testUpdate(self):
1297        db = DAL(DEFAULT_URI, check_reserved=["all"])
1298
1299        # some db's only support seconds
1300        datetime_datetime_today = datetime.datetime.today()
1301        datetime_datetime_today = datetime_datetime_today.replace(microsecond=0)
1302        one_day = datetime.timedelta(1)
1303        one_sec = datetime.timedelta(0, 1)
1304
1305        update_vals = (
1306            ("string", "x", "y"),
1307            ("text", "x", "y"),
1308            ("password", "x", "y"),
1309            ("integer", 1, 2),
1310            ("bigint", 1, 2),
1311            ("float", 1.0, 2.0),
1312            ("double", 1.0, 2.0),
1313            ("boolean", True, False),
1314            ("date", datetime.date.today(), datetime.date.today() + one_day),
1315            (
1316                "datetime",
1317                datetime.datetime(1971, 12, 21, 10, 30, 55, 0),
1318                datetime_datetime_today,
1319            ),
1320            (
1321                "time",
1322                datetime_datetime_today.time(),
1323                (datetime_datetime_today + one_sec).time(),
1324            ),
1325        )
1326
1327        for uv in update_vals:
1328            db.define_table("tt", Field("aa", "integer", default=0), Field("bb", uv[0]))
1329            self.assertTrue(isinstance(db.tt.insert(bb=uv[1]), long))
1330            self.assertEqual(db(db.tt.aa + 1 == 1).select(db.tt.bb)[0].bb, uv[1])
1331            self.assertEqual(db(db.tt.aa + 1 == 1).update(bb=uv[2]), 1)
1332            self.assertEqual(db(db.tt.aa / 3 == 0).select(db.tt.bb)[0].bb, uv[2])
1333            db.tt.drop()
1334        db.close()
1335
1336    def testSubstring(self):
1337        if IS_MONGODB:
1338            # MongoDB does not support string length
1339            end = 3
1340        else:
1341            end = -2
1342        db = DAL(DEFAULT_URI, check_reserved=["all"])
1343        t0 = db.define_table("t0", Field("name"))
1344        input_name = "web2py"
1345        t0.insert(name=input_name)
1346        exp_slice = t0.name.lower()[4:6]
1347        exp_slice_no_max = t0.name.lower()[4:]
1348        exp_slice_neg_max = t0.name.lower()[2:end]
1349        exp_slice_neg_start = t0.name.lower()[end:]
1350        exp_item = t0.name.lower()[3]
1351        out = (
1352            db(t0)
1353            .select(
1354                exp_slice,
1355                exp_item,
1356                exp_slice_no_max,
1357                exp_slice_neg_max,
1358                exp_slice_neg_start,
1359            )
1360            .first()
1361        )
1362        self.assertEqual(out[exp_slice], input_name[4:6])
1363        self.assertEqual(out[exp_item], input_name[3])
1364        self.assertEqual(out[exp_slice_no_max], input_name[4:])
1365        self.assertEqual(out[exp_slice_neg_max], input_name[2:end])
1366        self.assertEqual(out[exp_slice_neg_start], input_name[end:])
1367        t0.drop()
1368        db.close()
1369
1370    def testOps(self):
1371        db = DAL(DEFAULT_URI, check_reserved=["all"])
1372        t0 = db.define_table("t0", Field("vv", "integer"))
1373        self.assertTrue(isinstance(db.t0.insert(vv=1), long))
1374        self.assertTrue(isinstance(db.t0.insert(vv=2), long))
1375        self.assertTrue(isinstance(db.t0.insert(vv=3), long))
1376        sum = db.t0.vv.sum()
1377        count = db.t0.vv.count()
1378        avg = db.t0.vv.avg()
1379        op = sum / count
1380        op1 = (sum / count).with_alias("tot")
1381        self.assertEqual(db(t0).select(op).first()[op], 2)
1382        self.assertEqual(db(t0).select(op1).first()[op1], 2)
1383        self.assertEqual(db(t0).select(op1).first()["tot"], 2)
1384        op2 = avg * count
1385        self.assertEqual(db(t0).select(op2).first()[op2], 6)
1386        # the following is not possible at least on sqlite
1387        sum = db.t0.vv.sum().with_alias("s")
1388        count = db.t0.vv.count().with_alias("c")
1389        op = sum / count
1390        with self.assertRaises(SyntaxError):
1391            self.assertEqual(db(t0).select(op).first()[op], 2)
1392        t0.drop()
1393        db.close()
1394
1395
1396@unittest.skip("JOIN queries are not supported")
1397class TestJoin(unittest.TestCase):
1398    def testRun(self):
1399        db = DAL(DEFAULT_URI, check_reserved=["all"])
1400        db.define_table("t1", Field("aa"))
1401        db.define_table("t2", Field("aa"), Field("b", db.t1))
1402        i1 = db.t1.insert(aa="1")
1403        i2 = db.t1.insert(aa="2")
1404        i3 = db.t1.insert(aa="3")
1405        db.t2.insert(aa="4", b=i1)
1406        db.t2.insert(aa="5", b=i2)
1407        db.t2.insert(aa="6", b=i2)
1408        self.assertEqual(
1409            len(db(db.t1.id == db.t2.b).select(orderby=db.t1.aa | db.t2.aa)), 3
1410        )
1411        self.assertEqual(
1412            db(db.t1.id == db.t2.b).select(orderby=db.t1.aa | db.t2.aa)[2].t1.aa, "2"
1413        )
1414        self.assertEqual(
1415            db(db.t1.id == db.t2.b).select(orderby=db.t1.aa | db.t2.aa)[2].t2.aa, "6"
1416        )
1417        self.assertEqual(
1418            len(
1419                db().select(
1420                    db.t1.ALL,
1421                    db.t2.ALL,
1422                    left=db.t2.on(db.t1.id == db.t2.b),
1423                    orderby=db.t1.aa | db.t2.aa,
1424                )
1425            ),
1426            4,
1427        )
1428        self.assertEqual(
1429            db()
1430            .select(
1431                db.t1.ALL,
1432                db.t2.ALL,
1433                left=db.t2.on(db.t1.id == db.t2.b),
1434                orderby=db.t1.aa | db.t2.aa,
1435            )[2]
1436            .t1.aa,
1437            "2",
1438        )
1439        self.assertEqual(
1440            db()
1441            .select(
1442                db.t1.ALL,
1443                db.t2.ALL,
1444                left=db.t2.on(db.t1.id == db.t2.b),
1445                orderby=db.t1.aa | db.t2.aa,
1446            )[2]
1447            .t2.aa,
1448            "6",
1449        )
1450        self.assertEqual(
1451            db()
1452            .select(
1453                db.t1.ALL,
1454                db.t2.ALL,
1455                left=db.t2.on(db.t1.id == db.t2.b),
1456                orderby=db.t1.aa | db.t2.aa,
1457            )[3]
1458            .t1.aa,
1459            "3",
1460        )
1461        self.assertEqual(
1462            db()
1463            .select(
1464                db.t1.ALL,
1465                db.t2.ALL,
1466                left=db.t2.on(db.t1.id == db.t2.b),
1467                orderby=db.t1.aa | db.t2.aa,
1468            )[3]
1469            .t2.aa,
1470            None,
1471        )
1472        self.assertEqual(
1473            len(
1474                db().select(
1475                    db.t1.aa,
1476                    db.t2.id.count(),
1477                    left=db.t2.on(db.t1.id == db.t2.b),
1478                    orderby=db.t1.aa,
1479                    groupby=db.t1.aa,
1480                )
1481            ),
1482            3,
1483        )
1484        self.assertEqual(
1485            db()
1486            .select(
1487                db.t1.aa,
1488                db.t2.id.count(),
1489                left=db.t2.on(db.t1.id == db.t2.b),
1490                orderby=db.t1.aa,
1491                groupby=db.t1.aa,
1492            )[0]
1493            ._extra[db.t2.id.count()],
1494            1,
1495        )
1496        self.assertEqual(
1497            db()
1498            .select(
1499                db.t1.aa,
1500                db.t2.id.count(),
1501                left=db.t2.on(db.t1.id == db.t2.b),
1502                orderby=db.t1.aa,
1503                groupby=db.t1.aa,
1504            )[1]
1505            ._extra[db.t2.id.count()],
1506            2,
1507        )
1508        self.assertEqual(
1509            db()
1510            .select(
1511                db.t1.aa,
1512                db.t2.id.count(),
1513                left=db.t2.on(db.t1.id == db.t2.b),
1514                orderby=db.t1.aa,
1515                groupby=db.t1.aa,
1516            )[2]
1517            ._extra[db.t2.id.count()],
1518            0,
1519        )
1520        drop(db.t2)
1521        drop(db.t1)
1522
1523        db.define_table("person", Field("name"))
1524        id = db.person.insert(name="max")
1525        self.assertEqual(id.name, "max")
1526        db.define_table("dog", Field("name"), Field("ownerperson", "reference person"))
1527        db.dog.insert(name="skipper", ownerperson=1)
1528        row = db(db.person.id == db.dog.ownerperson).select().first()
1529        self.assertEqual(row[db.person.name], "max")
1530        self.assertEqual(row["person.name"], "max")
1531        drop(db.dog)
1532        self.assertEqual(len(db.person._referenced_by), 0)
1533        drop(db.person)
1534        db.close()
1535
1536
1537@unittest.skipIf(
1538    IS_GAE or IS_IMAP,
1539    'TODO: Datastore throws "AttributeError: Row object has no attribute _extra"',
1540)
1541class TestMinMaxSumAvg(unittest.TestCase):
1542    def testRun(self):
1543        db = DAL(DEFAULT_URI, check_reserved=["all"])
1544        db.define_table("tt", Field("aa", "integer"))
1545        self.assertEqual(isinstance(db.tt.insert(aa=1), long), True)
1546        self.assertEqual(isinstance(db.tt.insert(aa=2), long), True)
1547        self.assertEqual(isinstance(db.tt.insert(aa=3), long), True)
1548        s = db.tt.aa.min()
1549        self.assertEqual(db(db.tt.id > 0).select(s)[0]._extra[s], 1)
1550        self.assertEqual(db(db.tt.id > 0).select(s).first()[s], 1)
1551        self.assertEqual(db().select(s).first()[s], 1)
1552        s = db.tt.aa.max()
1553        self.assertEqual(db().select(s).first()[s], 3)
1554        s = db.tt.aa.sum()
1555        self.assertEqual(db().select(s).first()[s], 6)
1556        s = db.tt.aa.count()
1557        self.assertEqual(db().select(s).first()[s], 3)
1558        s = db.tt.aa.avg()
1559        self.assertEqual(db().select(s).first()[s], 2)
1560        drop(db.tt)
1561        db.close()
1562
1563
1564@unittest.skipIf(IS_IMAP, "Skip IMAP")
1565class TestMigrations(unittest.TestCase):
1566    def testRun(self):
1567        db = DAL(DEFAULT_URI, check_reserved=["all"])
1568        db.define_table("tt", Field("aa"), migrate=".storage.table")
1569        db.commit()
1570        db.close()
1571        db = DAL(DEFAULT_URI, check_reserved=["all"])
1572        db.define_table("tt", Field("aa"), Field("b"), migrate=".storage.table")
1573        db.commit()
1574        db.close()
1575        db = DAL(DEFAULT_URI, check_reserved=["all"])
1576        db.define_table("tt", Field("aa"), Field("b", "text"), migrate=".storage.table")
1577        db.commit()
1578        db.close()
1579        db = DAL(DEFAULT_URI, check_reserved=["all"])
1580        db.define_table("tt", Field("aa"), migrate=".storage.table")
1581        drop(db.tt)
1582        db.commit()
1583        db.close()
1584
1585    def tearDown(self):
1586        if os.path.exists(".storage.db"):
1587            os.unlink(".storage.db")
1588        if os.path.exists(".storage.table"):
1589            os.unlink(".storage.table")
1590
1591
1592@unittest.skipIf(IS_IMAP, "Skip IMAP")
1593class TestReference(unittest.TestCase):
1594    def testRun(self):
1595        scenarios = (
1596            (True, "CASCADE"),
1597            (False, "CASCADE"),
1598            (False, "SET NULL"),
1599        )
1600        for (b, ondelete) in scenarios:
1601            db = DAL(DEFAULT_URI, check_reserved=["all"], bigint_id=b)
1602            db.define_table(
1603                "tt", Field("name"), Field("aa", "reference tt", ondelete=ondelete)
1604            )
1605            db.commit()
1606            x = db.tt.insert(name="xxx")
1607            self.assertTrue(isinstance(x, long))
1608            self.assertEqual(x.id, x)
1609            self.assertEqual(x["id"], x)
1610            x.aa = x
1611            x.update_record()
1612            x1 = db.tt[x]
1613            self.assertEqual(x1.aa, x)
1614            self.assertEqual(x1.aa.aa.aa.aa.aa.aa.name, "xxx")
1615            y = db.tt.insert(name="yyy", aa=x1)
1616            self.assertEqual(y.aa, x1.id)
1617            self.assertTrue(isinstance(db.tt.insert(name="zzz"), long))
1618            self.assertEqual(db(db.tt.name).count(), 3)
1619            if IS_MONGODB:
1620                db(db.tt.id == x).delete()
1621                expected_count = {
1622                    "SET NULL": 2,
1623                    "CASCADE": 1,
1624                }
1625                self.assertEqual(db(db.tt.name).count(), expected_count[ondelete])
1626                if ondelete == "SET NULL":
1627                    self.assertEqual(db(db.tt.name == "yyy").select()[0].aa, None)
1628            drop(db.tt)
1629            db.commit()
1630            db.close()
1631
1632
1633@unittest.skipIf(IS_IMAP, "Skip IMAP")
1634class TestClientLevelOps(unittest.TestCase):
1635    def testRun(self):
1636        db = DAL(DEFAULT_URI, check_reserved=["all"])
1637        db.define_table("tt", Field("aa"))
1638        db.commit()
1639        db.tt.insert(aa="test")
1640        rows1 = db(db.tt.aa == "test").select()
1641        rows2 = db(db.tt.aa == "test").select()
1642        rows3 = rows1 + rows2
1643        assert len(rows3) == 2
1644        rows4 = rows1 & rows2
1645        assert len(rows4) == 1
1646        rows5 = rows1 | rows2
1647        assert len(rows5) == 1
1648        rows6 = rows1.find(lambda row: row.aa == "test")
1649        assert len(rows6) == 1
1650        rows7 = rows2.exclude(lambda row: row.aa == "test")
1651        assert len(rows7) == 1
1652        rows8 = rows5.sort(lambda row: row.aa)
1653        assert len(rows8) == 1
1654        drop(db.tt)
1655        db.commit()
1656        db.close()
1657
1658
1659@unittest.skipIf(IS_IMAP, "TODO: IMAP test")
1660class TestVirtualFields(unittest.TestCase):
1661    def testRun(self):
1662        db = DAL(DEFAULT_URI, check_reserved=["all"])
1663        db.define_table("tt", Field("aa"))
1664        db.commit()
1665        db.tt.insert(aa="test")
1666
1667        class Compute:
1668            def a_upper(row):
1669                return row.tt.aa.upper()
1670
1671        db.tt.virtualfields.append(Compute())
1672        assert db(db.tt.id > 0).select().first().a_upper == "TEST"
1673        drop(db.tt)
1674        db.commit()
1675        db.close()
1676
1677
1678@unittest.skipIf(IS_IMAP, "TODO: IMAP test")
1679class TestComputedFields(unittest.TestCase):
1680    def testRun(self):
1681        db = DAL(DEFAULT_URI, check_reserved=["all"])
1682        db.define_table(
1683            "tt",
1684            Field("aa"),
1685            Field("bb", default="x"),
1686            Field("cc", compute=lambda r: r.aa + r.bb),
1687        )
1688        db.commit()
1689        id = db.tt.insert(aa="z")
1690        self.assertEqual(db.tt[id].cc, "zx")
1691        drop(db.tt)
1692        db.commit()
1693
1694        # test checking that a compute field can refer to earlier-defined computed fields
1695        db.define_table(
1696            "tt",
1697            Field("aa"),
1698            Field("bb", default="x"),
1699            Field("cc", compute=lambda r: r.aa + r.bb),
1700            Field("dd", compute=lambda r: r.bb + r.cc),
1701        )
1702        db.commit()
1703        id = db.tt.insert(aa="z")
1704        self.assertEqual(db.tt[id].dd, "xzx")
1705        drop(db.tt)
1706        db.commit()
1707        db.close()
1708
1709
1710@unittest.skipIf(IS_IMAP, "TODO: IMAP test")
1711class TestCommonFilters(unittest.TestCase):
1712    def testRun(self):
1713        db = DAL(DEFAULT_URI, check_reserved=["all"])
1714        db.define_table("t1", Field("aa", "integer"))
1715        db.define_table("t2", Field("aa", "integer"), Field("b", db.t1))
1716        i1 = db.t1.insert(aa=1)
1717        i2 = db.t1.insert(aa=2)
1718        i3 = db.t1.insert(aa=3)
1719        db.t2.insert(aa=4, b=i1)
1720        db.t2.insert(aa=5, b=i2)
1721        db.t2.insert(aa=6, b=i2)
1722        db.t1._common_filter = lambda q: db.t1.aa > 1
1723        self.assertEqual(db(db.t1).count(), 2)
1724        self.assertEqual(db(db.t1).count(), 2)
1725        db.t2._common_filter = lambda q: db.t2.aa < 6
1726        # test delete
1727        self.assertEqual(db(db.t2).count(), 2)
1728        db(db.t2).delete()
1729        self.assertEqual(db(db.t2).count(), 0)
1730        db.t2._common_filter = None
1731        self.assertEqual(db(db.t2).count(), 1)
1732        # test update
1733        db.t2.insert(aa=4, b=i1)
1734        db.t2.insert(aa=5, b=i2)
1735        db.t2._common_filter = lambda q: db.t2.aa < 6
1736        self.assertEqual(db(db.t2).count(), 2)
1737        db(db.t2).update(aa=6)
1738        self.assertEqual(db(db.t2).count(), 0)
1739        db.t2._common_filter = None
1740        self.assertEqual(db(db.t2).count(), 3)
1741        drop(db.t2)
1742        drop(db.t1)
1743        db.close()
1744
1745
1746@unittest.skipIf(IS_IMAP, "Skip IMAP test")
1747class TestImportExportFields(unittest.TestCase):
1748    def testRun(self):
1749        db = DAL(DEFAULT_URI, check_reserved=["all"])
1750        db.define_table("person", Field("name"))
1751        db.define_table("pet", Field("friend", db.person), Field("name"))
1752        for n in range(2):
1753            db(db.pet).delete()
1754            db(db.person).delete()
1755            for k in range(10):
1756                id = db.person.insert(name=str(k))
1757                db.pet.insert(friend=id, name=str(k))
1758        db.commit()
1759        stream = StringIO()
1760        db.export_to_csv_file(stream)
1761        db(db.pet).delete()
1762        db(db.person).delete()
1763        stream = StringIO(stream.getvalue())
1764        db.import_from_csv_file(stream)
1765        assert db(db.person).count() == 10
1766        assert db(db.pet.name).count() == 10
1767        drop(db.pet)
1768        drop(db.person)
1769        db.commit()
1770        db.close()
1771
1772
1773@unittest.skipIf(IS_IMAP, "Skip IMAP test")
1774class TestImportExportUuidFields(unittest.TestCase):
1775    def testRun(self):
1776        db = DAL(DEFAULT_URI, check_reserved=["all"])
1777        db.define_table("person", Field("name"), Field("uuid"))
1778        db.define_table("pet", Field("friend", db.person), Field("name"))
1779        for n in range(2):
1780            db(db.pet).delete()
1781            db(db.person).delete()
1782            for k in range(10):
1783                id = db.person.insert(name=str(k), uuid=str(k))
1784                db.pet.insert(friend=id, name=str(k))
1785        db.commit()
1786        stream = StringIO()
1787        db.export_to_csv_file(stream)
1788        db(db.person).delete()
1789        db(db.pet).delete()
1790        stream = StringIO(stream.getvalue())
1791        db.import_from_csv_file(stream)
1792        assert db(db.person).count() == 10
1793        assert db(db.pet).count() == 10
1794        drop(db.pet)
1795        drop(db.person)
1796        db.commit()
1797        db.close()
1798
1799
1800@unittest.skipIf(IS_IMAP, "Skip IMAP test")
1801class TestDALDictImportExport(unittest.TestCase):
1802    def testRun(self):
1803        db = DAL(DEFAULT_URI, check_reserved=["all"])
1804        db.define_table("person", Field("name", default="Michael"), Field("uuid"))
1805        db.define_table("pet", Field("friend", db.person), Field("name"))
1806        dbdict = db.as_dict(flat=True, sanitize=False)
1807        assert isinstance(dbdict, dict)
1808        uri = dbdict["uri"]
1809        assert isinstance(uri, basestring) and uri
1810        assert len(dbdict["tables"]) == 2
1811        assert len(dbdict["tables"][0]["fields"]) == 3
1812        assert dbdict["tables"][0]["fields"][1]["type"] == db.person.name.type
1813        assert dbdict["tables"][0]["fields"][1]["default"] == db.person.name.default
1814
1815        db2 = DAL(**dbdict)
1816        assert len(db.tables) == len(db2.tables)
1817        assert hasattr(db2, "pet") and isinstance(db2.pet, Table)
1818        assert hasattr(db2.pet, "friend") and isinstance(db2.pet.friend, Field)
1819        drop(db.pet)
1820        db.commit()
1821
1822        db2.commit()
1823
1824        have_serializers = True
1825        try:
1826            import serializers
1827
1828            dbjson = db.as_json(sanitize=False)
1829            assert isinstance(dbjson, basestring) and len(dbjson) > 0
1830
1831            unicode_keys = True
1832            if sys.version < "2.6.5":
1833                unicode_keys = False
1834            db3 = DAL(**serializers.loads_json(dbjson, unicode_keys=unicode_keys))
1835            assert (
1836                hasattr(db3, "person")
1837                and hasattr(db3.person, "uuid")
1838                and db3.person.uuid.type == db.person.uuid.type
1839            )
1840            drop(db3.person)
1841            db3.commit()
1842            db3.close()
1843        except ImportError:
1844            pass
1845
1846        mpfc = "Monty Python's Flying Circus"
1847        dbdict4 = {
1848            "uri": DEFAULT_URI,
1849            "tables": [
1850                {
1851                    "tablename": "tvshow",
1852                    "fields": [
1853                        {"fieldname": "name", "default": mpfc},
1854                        {"fieldname": "rating", "type": "double"},
1855                    ],
1856                },
1857                {
1858                    "tablename": "staff",
1859                    "fields": [
1860                        {"fieldname": "name", "default": "Michael"},
1861                        {"fieldname": "food", "default": "Spam"},
1862                        {"fieldname": "tvshow", "type": "reference tvshow"},
1863                    ],
1864                },
1865            ],
1866        }
1867        db4 = DAL(**dbdict4)
1868        assert "staff" in db4.tables
1869        assert "name" in db4.staff
1870        assert db4.tvshow.rating.type == "double"
1871        assert (
1872            isinstance(db4.tvshow.insert(), long),
1873            isinstance(db4.tvshow.insert(name="Loriot"), long),
1874            isinstance(db4.tvshow.insert(name="Il Mattatore"), long),
1875        ) == (True, True, True)
1876        assert isinstance(db4(db4.tvshow).select().first().id, long) == True
1877        assert db4(db4.tvshow).select().first().name == mpfc
1878
1879        drop(db4.staff)
1880        drop(db4.tvshow)
1881        db4.commit()
1882
1883        dbdict5 = {"uri": DEFAULT_URI}
1884        db5 = DAL(**dbdict5)
1885        assert db5.tables in ([], None)
1886        assert not (str(db5) in ("", None))
1887
1888        dbdict6 = {
1889            "uri": DEFAULT_URI,
1890            "tables": [
1891                {"tablename": "staff"},
1892                {
1893                    "tablename": "tvshow",
1894                    "fields": [
1895                        {"fieldname": "name"},
1896                        {"fieldname": "rating", "type": "double"},
1897                    ],
1898                },
1899            ],
1900        }
1901        db6 = DAL(**dbdict6)
1902
1903        assert len(db6["staff"].fields) == 1
1904        assert "name" in db6["tvshow"].fields
1905
1906        assert db6.staff.insert() is not None
1907        assert isinstance(db6(db6.staff).select().first().id, long) == True
1908
1909        drop(db6.staff)
1910        drop(db6.tvshow)
1911        db6.commit()
1912        db.close()
1913        db2.close()
1914        db4.close()
1915        db5.close()
1916        db6.close()
1917
1918
1919@unittest.skipIf(IS_IMAP, "TODO: IMAP test")
1920class TestSelectAsDict(unittest.TestCase):
1921    def testSelect(self):
1922        db = DAL(DEFAULT_URI, check_reserved=["all"])
1923        db.define_table(
1924            "a_table", Field("b_field"), Field("a_field"),
1925        )
1926        db.a_table.insert(a_field="aa1", b_field="bb1")
1927        rtn = (
1928            db(db.a_table)
1929            .select(db.a_table.id, db.a_table.b_field, db.a_table.a_field)
1930            .as_list()
1931        )
1932        self.assertEqual(rtn[0]["b_field"], "bb1")
1933        keys = rtn[0].keys()
1934        self.assertEqual(len(keys), 3)
1935        self.assertEqual(
1936            ("id" in keys, "b_field" in keys, "a_field" in keys), (True, True, True)
1937        )
1938        drop(db.a_table)
1939        db.close()
1940
1941
1942@unittest.skipIf(IS_IMAP, "TODO: IMAP test")
1943class TestRNameTable(unittest.TestCase):
1944    # tests for highly experimental rname attribute
1945    def testSelect(self):
1946        db = DAL(DEFAULT_URI, check_reserved=["all"])
1947        rname = _quote(db, "a very complicated tablename")
1948        db.define_table("easy_name", Field("a_field"), rname=rname)
1949        rtn = db.easy_name.insert(a_field="a")
1950        self.assertEqual(isinstance(rtn.id, long), True)
1951        rtn = db(db.easy_name.a_field == "a").select()
1952        self.assertEqual(len(rtn), 1)
1953        self.assertEqual(isinstance(rtn[0].id, long), True)
1954        self.assertEqual(rtn[0].a_field, "a")
1955        db.easy_name.insert(a_field="b")
1956        self.assertEqual(db(db.easy_name).count(), 2)
1957        rtn = db(db.easy_name.a_field == "a").update(a_field="c")
1958        self.assertEqual(rtn, 1)
1959
1960        # clean up
1961        drop(db.easy_name)
1962        db.close()
1963
1964    @unittest.skip("JOIN queries are not supported")
1965    def testJoin(self):
1966        db = DAL(DEFAULT_URI, check_reserved=["all"])
1967        rname = _quote(db, "this is table t1")
1968        rname2 = _quote(db, "this is table t2")
1969        db.define_table("t1", Field("aa"), rname=rname)
1970        db.define_table("t2", Field("aa"), Field("b", db.t1), rname=rname2)
1971        i1 = db.t1.insert(aa="1")
1972        i2 = db.t1.insert(aa="2")
1973        i3 = db.t1.insert(aa="3")
1974        db.t2.insert(aa="4", b=i1)
1975        db.t2.insert(aa="5", b=i2)
1976        db.t2.insert(aa="6", b=i2)
1977        self.assertEqual(
1978            len(db(db.t1.id == db.t2.b).select(orderby=db.t1.aa | db.t2.aa)), 3
1979        )
1980        self.assertEqual(
1981            db(db.t1.id == db.t2.b).select(orderby=db.t1.aa | db.t2.aa)[2].t1.aa, "2"
1982        )
1983        self.assertEqual(
1984            db(db.t1.id == db.t2.b).select(orderby=db.t1.aa | db.t2.aa)[2].t2.aa, "6"
1985        )
1986        self.assertEqual(
1987            len(
1988                db().select(
1989                    db.t1.ALL,
1990                    db.t2.ALL,
1991                    left=db.t2.on(db.t1.id == db.t2.b),
1992                    orderby=db.t1.aa | db.t2.aa,
1993                )
1994            ),
1995            4,
1996        )
1997        self.assertEqual(
1998            db()
1999            .select(
2000                db.t1.ALL,
2001                db.t2.ALL,
2002                left=db.t2.on(db.t1.id == db.t2.b),
2003                orderby=db.t1.aa | db.t2.aa,
2004            )[2]
2005            .t1.aa,
2006            "2",
2007        )
2008        self.assertEqual(
2009            db()
2010            .select(
2011                db.t1.ALL,
2012                db.t2.ALL,
2013                left=db.t2.on(db.t1.id == db.t2.b),
2014                orderby=db.t1.aa | db.t2.aa,
2015            )[2]
2016            .t2.aa,
2017            "6",
2018        )
2019        self.assertEqual(
2020            db()
2021            .select(
2022                db.t1.ALL,
2023                db.t2.ALL,
2024                left=db.t2.on(db.t1.id == db.t2.b),
2025                orderby=db.t1.aa | db.t2.aa,
2026            )[3]
2027            .t1.aa,
2028            "3",
2029        )
2030        self.assertEqual(
2031            db()
2032            .select(
2033                db.t1.ALL,
2034                db.t2.ALL,
2035                left=db.t2.on(db.t1.id == db.t2.b),
2036                orderby=db.t1.aa | db.t2.aa,
2037            )[3]
2038            .t2.aa,
2039            None,
2040        )
2041        self.assertEqual(
2042            len(
2043                db().select(
2044                    db.t1.aa,
2045                    db.t2.id.count(),
2046                    left=db.t2.on(db.t1.id == db.t2.b),
2047                    orderby=db.t1.aa,
2048                    groupby=db.t1.aa,
2049                )
2050            ),
2051            3,
2052        )
2053        self.assertEqual(
2054            db()
2055            .select(
2056                db.t1.aa,
2057                db.t2.id.count(),
2058                left=db.t2.on(db.t1.id == db.t2.b),
2059                orderby=db.t1.aa,
2060                groupby=db.t1.aa,
2061            )[0]
2062            ._extra[db.t2.id.count()],
2063            1,
2064        )
2065        self.assertEqual(
2066            db()
2067            .select(
2068                db.t1.aa,
2069                db.t2.id.count(),
2070                left=db.t2.on(db.t1.id == db.t2.b),
2071                orderby=db.t1.aa,
2072                groupby=db.t1.aa,
2073            )[1]
2074            ._extra[db.t2.id.count()],
2075            2,
2076        )
2077        self.assertEqual(
2078            db()
2079            .select(
2080                db.t1.aa,
2081                db.t2.id.count(),
2082                left=db.t2.on(db.t1.id == db.t2.b),
2083                orderby=db.t1.aa,
2084                groupby=db.t1.aa,
2085            )[2]
2086            ._extra[db.t2.id.count()],
2087            0,
2088        )
2089        drop(db.t2)
2090        drop(db.t1)
2091
2092        db.define_table("person", Field("name"), rname=rname)
2093        id = db.person.insert(name="max")
2094        self.assertEqual(id.name, "max")
2095        db.define_table(
2096            "dog", Field("name"), Field("ownerperson", "reference person"), rname=rname2
2097        )
2098        db.dog.insert(name="skipper", ownerperson=1)
2099        row = db(db.person.id == db.dog.ownerperson).select().first()
2100        self.assertEqual(row[db.person.name], "max")
2101        self.assertEqual(row["person.name"], "max")
2102        drop(db.dog)
2103        self.assertEqual(len(db.person._referenced_by), 0)
2104        drop(db.person)
2105        db.close()
2106
2107
2108@unittest.skipIf(IS_IMAP, "TODO: IMAP test")
2109@unittest.skipIf(IS_GAE, "TODO: Datastore AGGREGATE Not Supported")
2110class TestRNameFields(unittest.TestCase):
2111    # tests for highly experimental rname attribute
2112    def testSelect(self):
2113        db = DAL(DEFAULT_URI, check_reserved=["all"])
2114        rname = _quote(db, "a very complicated fieldname")
2115        rname2 = _quote(db, "rating from 1 to 10")
2116        db.define_table(
2117            "easy_name",
2118            Field("a_field", rname=rname),
2119            Field("rating", "integer", rname=rname2, default=2),
2120        )
2121        rtn = db.easy_name.insert(a_field="a")
2122        self.assertEqual(isinstance(rtn.id, long), True)
2123        rtn = db(db.easy_name.a_field == "a").select()
2124        self.assertEqual(len(rtn), 1)
2125        self.assertEqual(isinstance(rtn[0].id, long), True)
2126        self.assertEqual(rtn[0].a_field, "a")
2127        db.easy_name.insert(a_field="b")
2128        rtn = db(db.easy_name.id > 0).delete()
2129        self.assertEqual(rtn, 2)
2130        rtn = db(db.easy_name.id > 0).count()
2131        self.assertEqual(rtn, 0)
2132        db.easy_name.insert(a_field="a")
2133        db.easy_name.insert(a_field="b")
2134        rtn = db(db.easy_name.id > 0).count()
2135        self.assertEqual(rtn, 2)
2136        rtn = db(db.easy_name.a_field == "a").update(a_field="c")
2137        rtn = db(db.easy_name.a_field == "c").count()
2138        self.assertEqual(rtn, 1)
2139        rtn = db(db.easy_name.a_field != "c").count()
2140        self.assertEqual(rtn, 1)
2141        avg = db.easy_name.rating.avg()
2142        rtn = db(db.easy_name.id > 0).select(avg)
2143        self.assertEqual(rtn[0][avg], 2)
2144
2145        rname = _quote(db, "this is the person name")
2146        db.define_table(
2147            "person", Field("name", default="Michael", rname=rname), Field("uuid")
2148        )
2149        michael = db.person.insert()  # default insert
2150        john = db.person.insert(name="John")
2151        luke = db.person.insert(name="Luke")
2152
2153        rtn = db(db.person.id > 0).select()
2154        self.assertEqual(len(rtn), 3)
2155        self.assertEqual(rtn[0].id, michael)
2156        self.assertEqual(rtn[0].name, "Michael")
2157        self.assertEqual(rtn[1].id, john)
2158        self.assertEqual(rtn[1].name, "John")
2159        # fetch owners, eventually with pet
2160        # main point is retrieving Luke with no pets
2161        rtn = db(db.person.id > 0).select()
2162        self.assertEqual(rtn[0].id, michael)
2163        self.assertEqual(rtn[0].name, "Michael")
2164        self.assertEqual(rtn[2].name, "Luke")
2165        self.assertEqual(rtn[2].id, luke)
2166        # as dict
2167        rtn = db(db.person.id > 0).select().as_dict()
2168        self.assertEqual(rtn[michael]["name"], "Michael")
2169        # as list
2170        rtn = db(db.person.id > 0).select().as_list()
2171        self.assertEqual(rtn[0]["name"], "Michael")
2172        # isempty
2173        rtn = db(db.person.id > 0).isempty()
2174        self.assertEqual(rtn, False)
2175
2176        # clean up
2177        drop(db.person)
2178        drop(db.easy_name)
2179        db.close()
2180
2181    @unittest.skipIf(
2182        IS_GAE, "TODO: Datastore does not accept dict objects as json field input."
2183    )
2184    def testRun(self):
2185        db = DAL(DEFAULT_URI, check_reserved=["all"])
2186        rname = _quote(db, "a very complicated fieldname")
2187        for ft in ["string", "text", "password", "upload", "blob"]:
2188            db.define_table("tt", Field("aa", ft, default="", rname=rname))
2189            cv = "x"
2190            self.assertEqual(isinstance(db.tt.insert(aa="x"), long), True)
2191            if IS_MONGODB and not PY2 and ft == "blob":
2192                cv = to_bytes(cv)
2193            self.assertEqual(db().select(db.tt.aa)[0].aa, cv)
2194            drop(db.tt)
2195        db.define_table("tt", Field("aa", "integer", default=1, rname=rname))
2196        self.assertEqual(isinstance(db.tt.insert(aa=3), long), True)
2197        self.assertEqual(db().select(db.tt.aa)[0].aa, 3)
2198        drop(db.tt)
2199        db.define_table("tt", Field("aa", "double", default=1, rname=rname))
2200        self.assertEqual(isinstance(db.tt.insert(aa=3.1), long), True)
2201        self.assertEqual(db().select(db.tt.aa)[0].aa, 3.1)
2202        drop(db.tt)
2203        db.define_table("tt", Field("aa", "boolean", default=True, rname=rname))
2204        self.assertEqual(isinstance(db.tt.insert(aa=True), long), True)
2205        self.assertEqual(db().select(db.tt.aa)[0].aa, True)
2206        drop(db.tt)
2207        db.define_table("tt", Field("aa", "json", default={}, rname=rname))
2208        self.assertEqual(isinstance(db.tt.insert(aa={}), long), True)
2209        self.assertEqual(db().select(db.tt.aa)[0].aa, {})
2210        drop(db.tt)
2211        db.define_table(
2212            "tt", Field("aa", "date", default=datetime.date.today(), rname=rname)
2213        )
2214        t0 = datetime.date.today()
2215        self.assertEqual(isinstance(db.tt.insert(aa=t0), long), True)
2216        self.assertEqual(db().select(db.tt.aa)[0].aa, t0)
2217        drop(db.tt)
2218        db.define_table(
2219            "tt",
2220            Field("aa", "datetime", default=datetime.datetime.today(), rname=rname),
2221        )
2222        t0 = datetime.datetime(1971, 12, 21, 10, 30, 55, 0,)
2223        id = db.tt.insert(aa=t0)
2224        self.assertEqual(db().select(db.tt.aa)[0].aa, t0)
2225
2226        ## Row APIs
2227        row = db().select(db.tt.aa)[0]
2228        self.assertEqual(db.tt[id].aa, t0)
2229        self.assertEqual(db.tt["aa"], db.tt.aa)
2230        self.assertEqual(db.tt(id).aa, t0)
2231        self.assertTrue(db.tt(id, aa=None) == None)
2232        self.assertFalse(db.tt(id, aa=t0) == None)
2233        self.assertEqual(row.aa, t0)
2234        self.assertEqual(row["aa"], t0)
2235        self.assertEqual(row["tt.aa"], t0)
2236        self.assertEqual(row("tt.aa"), t0)
2237
2238        ## Lazy and Virtual fields
2239        db.tt.b = Field.Virtual(lambda row: row.tt.aa)
2240        db.tt.c = Field.Lazy(lambda row: row.tt.aa)
2241        row = db().select(db.tt.aa)[0]
2242        self.assertEqual(row.b, t0)
2243        self.assertEqual(row.c(), t0)
2244
2245        drop(db.tt)
2246        db.define_table("tt", Field("aa", "time", default="11:30", rname=rname))
2247        t0 = datetime.time(10, 30, 55)
2248        self.assertEqual(isinstance(db.tt.insert(aa=t0), long), True)
2249        self.assertEqual(db().select(db.tt.aa)[0].aa, t0)
2250        drop(db.tt)
2251        db.close()
2252
2253    def testInsert(self):
2254        db = DAL(DEFAULT_URI, check_reserved=["all"])
2255        rname = _quote(db, "a very complicated fieldname")
2256        db.define_table("tt", Field("aa", rname=rname))
2257        self.assertEqual(isinstance(db.tt.insert(aa="1"), long), True)
2258        self.assertEqual(isinstance(db.tt.insert(aa="1"), long), True)
2259        self.assertEqual(isinstance(db.tt.insert(aa="1"), long), True)
2260        self.assertEqual(db(db.tt.aa == "1").count(), 3)
2261        self.assertEqual(db(db.tt.aa == "2").isempty(), True)
2262        self.assertEqual(db(db.tt.aa == "1").update(aa="2"), 3)
2263        self.assertEqual(db(db.tt.aa == "2").count(), 3)
2264        self.assertEqual(db(db.tt.aa == "2").isempty(), False)
2265        self.assertEqual(db(db.tt.aa == "2").delete(), 3)
2266        self.assertEqual(db(db.tt.aa == "2").isempty(), True)
2267        drop(db.tt)
2268        db.close()
2269
2270    @unittest.skip("JOIN queries are not supported")
2271    def testJoin(self):
2272        db = DAL(DEFAULT_URI, check_reserved=["all"])
2273        rname = _quote(db, "this is field aa")
2274        rname2 = _quote(db, "this is field b")
2275        db.define_table("t1", Field("aa", rname=rname))
2276        db.define_table("t2", Field("aa", rname=rname), Field("b", db.t1, rname=rname2))
2277        i1 = db.t1.insert(aa="1")
2278        i2 = db.t1.insert(aa="2")
2279        i3 = db.t1.insert(aa="3")
2280        db.t2.insert(aa="4", b=i1)
2281        db.t2.insert(aa="5", b=i2)
2282        db.t2.insert(aa="6", b=i2)
2283        self.assertEqual(
2284            len(db(db.t1.id == db.t2.b).select(orderby=db.t1.aa | db.t2.aa)), 3
2285        )
2286        self.assertEqual(
2287            db(db.t1.id == db.t2.b).select(orderby=db.t1.aa | db.t2.aa)[2].t1.aa, "2"
2288        )
2289        self.assertEqual(
2290            db(db.t1.id == db.t2.b).select(orderby=db.t1.aa | db.t2.aa)[2].t2.aa, "6"
2291        )
2292        self.assertEqual(
2293            len(
2294                db().select(
2295                    db.t1.ALL,
2296                    db.t2.ALL,
2297                    left=db.t2.on(db.t1.id == db.t2.b),
2298                    orderby=db.t1.aa | db.t2.aa,
2299                )
2300            ),
2301            4,
2302        )
2303        self.assertEqual(
2304            db()
2305            .select(
2306                db.t1.ALL,
2307                db.t2.ALL,
2308                left=db.t2.on(db.t1.id == db.t2.b),
2309                orderby=db.t1.aa | db.t2.aa,
2310            )[2]
2311            .t1.aa,
2312            "2",
2313        )
2314        self.assertEqual(
2315            db()
2316            .select(
2317                db.t1.ALL,
2318                db.t2.ALL,
2319                left=db.t2.on(db.t1.id == db.t2.b),
2320                orderby=db.t1.aa | db.t2.aa,
2321            )[2]
2322            .t2.aa,
2323            "6",
2324        )
2325        self.assertEqual(
2326            db()
2327            .select(
2328                db.t1.ALL,
2329                db.t2.ALL,
2330                left=db.t2.on(db.t1.id == db.t2.b),
2331                orderby=db.t1.aa | db.t2.aa,
2332            )[3]
2333            .t1.aa,
2334            "3",
2335        )
2336        self.assertEqual(
2337            db()
2338            .select(
2339                db.t1.ALL,
2340                db.t2.ALL,
2341                left=db.t2.on(db.t1.id == db.t2.b),
2342                orderby=db.t1.aa | db.t2.aa,
2343            )[3]
2344            .t2.aa,
2345            None,
2346        )
2347        self.assertEqual(
2348            len(
2349                db().select(
2350                    db.t1.aa,
2351                    db.t2.id.count(),
2352                    left=db.t2.on(db.t1.id == db.t2.b),
2353                    orderby=db.t1.aa,
2354                    groupby=db.t1.aa,
2355                )
2356            ),
2357            3,
2358        )
2359        self.assertEqual(
2360            db()
2361            .select(
2362                db.t1.aa,
2363                db.t2.id.count(),
2364                left=db.t2.on(db.t1.id == db.t2.b),
2365                orderby=db.t1.aa,
2366                groupby=db.t1.aa,
2367            )[0]
2368            ._extra[db.t2.id.count()],
2369            1,
2370        )
2371        self.assertEqual(
2372            db()
2373            .select(
2374                db.t1.aa,
2375                db.t2.id.count(),
2376                left=db.t2.on(db.t1.id == db.t2.b),
2377                orderby=db.t1.aa,
2378                groupby=db.t1.aa,
2379            )[1]
2380            ._extra[db.t2.id.count()],
2381            2,
2382        )
2383        self.assertEqual(
2384            db()
2385            .select(
2386                db.t1.aa,
2387                db.t2.id.count(),
2388                left=db.t2.on(db.t1.id == db.t2.b),
2389                orderby=db.t1.aa,
2390                groupby=db.t1.aa,
2391            )[2]
2392            ._extra[db.t2.id.count()],
2393            0,
2394        )
2395        drop(db.t2)
2396        drop(db.t1)
2397
2398        db.define_table("person", Field("name", rname=rname))
2399        id = db.person.insert(name="max")
2400        self.assertEqual(id.name, "max")
2401        db.define_table(
2402            "dog",
2403            Field("name", rname=rname),
2404            Field("ownerperson", "reference person", rname=rname2),
2405        )
2406        db.dog.insert(name="skipper", ownerperson=1)
2407        row = db(db.person.id == db.dog.ownerperson).select().first()
2408        self.assertEqual(row[db.person.name], "max")
2409        self.assertEqual(row["person.name"], "max")
2410        drop(db.dog)
2411        self.assertEqual(len(db.person._referenced_by), 0)
2412        drop(db.person)
2413        db.close()
2414
2415
2416@unittest.skipIf(IS_IMAP, "TODO: IMAP test")
2417class TestQuoting(unittest.TestCase):
2418
2419    # tests for case sensitivity
2420    def testCase(self):
2421        return
2422        db = DAL(DEFAULT_URI, check_reserved=["all"], ignore_field_case=False)
2423
2424        # test table case
2425        t0 = db.define_table("B", Field("f", "string"))
2426
2427        t1 = db.define_table("b", Field("B", t0), Field("words", "text"))
2428
2429        blather = "blah blah and so"
2430        t0[0] = {"f": "content"}
2431        t1[0] = {"B": int(t0[1]["id"]), "words": blather}
2432
2433        r = db(db.B.id == db.b.B).select()
2434
2435        self.assertEqual(r[0].b.words, blather)
2436
2437        drop(t1)
2438        drop(t0)
2439
2440        # test field case
2441        t0 = db.define_table("table is a test", Field("a_a"), Field("a_A"))
2442
2443        t0[0] = dict(a_a="a_a", a_A="a_A")
2444
2445        self.assertEqual(t0[1].a_a, "a_a")
2446        self.assertEqual(t0[1].a_A, "a_A")
2447
2448        drop(t0)
2449        db.close()
2450
2451    def testPKFK(self):
2452
2453        # test primary keys
2454
2455        db = DAL(DEFAULT_URI, check_reserved=["all"], ignore_field_case=False)
2456        # test table without surrogate key. Length must is limited to
2457        # 100 because of MySQL limitations: it cannot handle more than
2458        # 767 bytes in unique keys.
2459
2460        t0 = db.define_table("t0", Field("Code", length=100), primarykey=["Code"])
2461        t2 = db.define_table("t2", Field("f"), Field("t0_Code", "reference t0"))
2462        t3 = db.define_table(
2463            "t3", Field("f", length=100), Field("t0_Code", t0.Code), primarykey=["f"]
2464        )
2465        t4 = db.define_table(
2466            "t4", Field("f", length=100), Field("t0", t0), primarykey=["f"]
2467        )
2468
2469        try:
2470            t5 = db.define_table(
2471                "t5",
2472                Field("f", length=100),
2473                Field("t0", "reference no_table_wrong_reference"),
2474                primarykey=["f"],
2475            )
2476        except Exception as e:
2477            self.assertTrue(isinstance(e, KeyError))
2478
2479        drop(t0, "cascade")
2480        drop(t2)
2481        drop(t3)
2482        drop(t4)
2483        db.close()
2484
2485
2486class TestTableAndFieldCase(unittest.TestCase):
2487    """
2488    at the Python level we should not allow db.C and db.c because of .table conflicts on windows
2489    but it should be possible to map two different names into distinct tables "c" and "C" at the Python level
2490    By default Python models names should be mapped into lower case table names and assume case insensitivity.
2491    """
2492
2493    def testme(self):
2494        return
2495
2496
2497class TestQuotesByDefault(unittest.TestCase):
2498    """
2499    all default tables names should be quoted unless an explicit mapping has been given for a table.
2500    """
2501
2502    def testme(self):
2503        return
2504
2505
2506@unittest.skipIf(IS_IMAP, "TODO: IMAP test")
2507class TestRecordVersioning(unittest.TestCase):
2508    def testRun(self):
2509        db = DAL(DEFAULT_URI, check_reserved=["all"])
2510        tt = db.define_table(
2511            "tt", Field("name"), Field("is_active", "boolean", default=True)
2512        )
2513        db.tt._enable_record_versioning(archive_name="tt_archive")
2514        self.assertTrue("tt_archive" in db)
2515        i_id = db.tt.insert(name="web2py1")
2516        db.tt.insert(name="web2py2")
2517        db(db.tt.name == "web2py2").delete()
2518        self.assertEqual(len(db(db.tt).select()), 1)
2519        self.assertEqual(db(db.tt).count(), 1)
2520        db(db.tt.id == i_id).update(name="web2py3")
2521        self.assertEqual(len(db(db.tt).select()), 1)
2522        self.assertEqual(db(db.tt).count(), 1)
2523        self.assertEqual(len(db(db.tt_archive).select()), 2)
2524        self.assertEqual(db(db.tt_archive).count(), 2)
2525        drop(db.tt_archive)
2526        # it allows tt to be dropped
2527        db.tt._before_delete = []
2528        drop(tt)
2529        db.close()
2530
2531
2532@unittest.skipIf(IS_IMAP, "TODO: IMAP test")
2533class TestConnection(unittest.TestCase):
2534    def testRun(self):
2535        # check for adapter reconnect without parameters
2536        db1 = DAL(DEFAULT_URI, check_reserved=["all"])
2537        db1.define_table("tt", Field("aa", "integer"))
2538        self.assertEqual(isinstance(db1.tt.insert(aa=1), long), True)
2539        self.assertEqual(db1(db1.tt.aa == 1).count(), 1)
2540        drop(db1.tt)
2541        db1._adapter.close()
2542        db1._adapter.reconnect()
2543        db1.define_table("tt", Field("aa", "integer"))
2544        self.assertEqual(isinstance(db1.tt.insert(aa=1), long), True)
2545        self.assertEqual(db1(db1.tt.aa == 1).count(), 1)
2546        drop(db1.tt)
2547        db1.close()
2548
2549        # check connection are reused with pool_size
2550        connections = {}
2551        for a in range(10):
2552            db2 = DAL(DEFAULT_URI, check_reserved=["all"], pool_size=5)
2553            c = db2._adapter.connection
2554            connections[id(c)] = c
2555            db2.close()
2556        self.assertEqual(len(connections), 1)
2557        c = [connections[x] for x in connections][0]
2558        c.commit()
2559        c.close()
2560
2561        # check correct use of pool_size
2562        dbs = []
2563        for a in range(10):
2564            db3 = DAL(DEFAULT_URI, check_reserved=["all"], pool_size=5)
2565            db3._adapter.get_connection()
2566            dbs.append(db3)
2567        for db in dbs:
2568            db.close()
2569        self.assertEqual(len(db3._adapter.POOLS[DEFAULT_URI]), 5)
2570        for c in db3._adapter.POOLS[DEFAULT_URI]:
2571            c.close()
2572        db3._adapter.POOLS[DEFAULT_URI] = []
2573
2574
2575@unittest.skipIf(IS_IMAP, "TODO: IMAP test")
2576class TestBasicOps(unittest.TestCase):
2577    def testRun(self):
2578        db = DAL(DEFAULT_URI, check_reserved=["all"])
2579        tt = db.define_table(
2580            "tt", Field("name"), Field("is_active", "boolean", default=True)
2581        )
2582        i_id = db.tt.insert(name="web2py1")
2583        db.tt.insert(name="web2py2")
2584        db(db.tt.name == "web2py2").delete()
2585        self.assertEqual(len(db(db.tt).select()), 1)
2586        self.assertEqual(db(db.tt).count(), 1)
2587        db(db.tt.id == i_id).update(name="web2py3")
2588        self.assertEqual(len(db(db.tt).select()), 1)
2589        self.assertEqual(db(db.tt).count(), 1)
2590        drop(tt)
2591        db.close()
2592
2593
2594@unittest.skipIf(IS_IMAP, "TODO: IMAP test")
2595@unittest.skipIf(IS_GAE, 'TODO: Datastore "unsupported operand type"')
2596class TestSQLCustomType(unittest.TestCase):
2597    def testRun(self):
2598        db = DAL(DEFAULT_URI, check_reserved=["all"])
2599        from pydal.helpers.classes import SQLCustomType
2600
2601        native_double = "double"
2602        native_string = "string"
2603        if hasattr(db._adapter, "types"):
2604            native_double = db._adapter.types["double"]
2605            try:
2606                native_string = db._adapter.types["string"] % {"length": 256}
2607            except:
2608                native_string = db._adapter.types["string"]
2609        basic_t = SQLCustomType(type="double", native=native_double)
2610        basic_t_str = SQLCustomType(type="string", native=native_string)
2611        t0 = db.define_table(
2612            "t0", Field("price", basic_t), Field("product", basic_t_str)
2613        )
2614        r_id = t0.insert(price=None, product=None)
2615        row = db(t0.id == r_id).select(t0.ALL).first()
2616        self.assertEqual(row["price"], None)
2617        self.assertEqual(row["product"], None)
2618        r_id = t0.insert(price=1.2, product="car")
2619        row = db(t0.id == r_id).select(t0.ALL).first()
2620        self.assertEqual(row["price"], 1.2)
2621        self.assertEqual(row["product"], "car")
2622        t0.drop()
2623        db.close()
2624
2625
2626@unittest.skipIf(IS_GAE or IS_IMAP, "Skip test lazy")
2627class TestLazy(unittest.TestCase):
2628    def testRun(self):
2629        db = DAL(DEFAULT_URI, check_reserved=["all"], lazy_tables=True)
2630        t0 = db.define_table("t0", Field("name"))
2631        self.assertTrue(("t0" in db._LAZY_TABLES.keys()))
2632        db.t0.insert(name="1")
2633        self.assertFalse(("t0" in db._LAZY_TABLES.keys()))
2634        db.t0.drop()
2635        db.close()
2636
2637    def testLazyGetter(self):
2638        db = DAL(DEFAULT_URI, lazy_tables=True)
2639        db.define_table("tt", Field("value", "integer"))
2640        db.define_table(
2641            "ttt", Field("value", "integer"), Field("tt_id", "reference tt"),
2642        )
2643        # Force table definition
2644        db.ttt.value.writable = False
2645        idd = db.tt.insert(value=0)
2646        db.ttt.insert(tt_id=idd)
2647        db.ttt.drop()
2648        db.tt.drop()
2649        db.close()
2650
2651    def testRowNone(self):
2652        db = DAL(DEFAULT_URI, lazy_tables=True)
2653        tt = db.define_table("tt", Field("value", "integer"))
2654        db.tt.insert(value=None)
2655        row = db(db.tt).select(db.tt.ALL).first()
2656        self.assertEqual(row.value, None)
2657        self.assertEqual(row[db.tt.value], None)
2658        self.assertEqual(row["tt.value"], None)
2659        self.assertEqual(row.get("tt.value"), None)
2660        self.assertEqual(row["value"], None)
2661        self.assertEqual(row.get("value"), None)
2662        db.tt.drop()
2663        db.close()
2664
2665
2666class TestRedefine(unittest.TestCase):
2667    def testRun(self):
2668        db = DAL(DEFAULT_URI, check_reserved=["all"], lazy_tables=True, migrate=False)
2669        db.define_table("t_a", Field("code"))
2670        self.assertTrue("code" in db.t_a)
2671        self.assertTrue("code" in db["t_a"])
2672        db.define_table("t_a", Field("code_a"), redefine=True)
2673        self.assertFalse("code" in db.t_a)
2674        self.assertFalse("code" in db["t_a"])
2675        self.assertTrue("code_a" in db.t_a)
2676        self.assertTrue("code_a" in db["t_a"])
2677        db.close()
2678
2679
2680@unittest.skipIf(IS_IMAP, "TODO: IMAP test")
2681class TestUpdateInsert(unittest.TestCase):
2682    def testRun(self):
2683        db = DAL(DEFAULT_URI, check_reserved=["all"])
2684        t0 = db.define_table("t0", Field("name"))
2685        i_id = db.t0.update_or_insert((db.t0.id == 1), name="web2py")
2686        u_id = db.t0.update_or_insert((db.t0.id == i_id), name="web2py2")
2687        self.assertTrue(i_id != None)
2688        self.assertTrue(u_id == None)
2689        self.assertEqual(len(db(db.t0).select()), 1)
2690        self.assertEqual(db(db.t0).count(), 1)
2691        self.assertEqual(db(db.t0.name == "web2py").count(), 0)
2692        self.assertEqual(db(db.t0.name == "web2py2").count(), 1)
2693        drop(t0)
2694        db.close()
2695
2696
2697@unittest.skipIf(IS_IMAP, "TODO: IMAP test")
2698class TestBulkInsert(unittest.TestCase):
2699    def testRun(self):
2700        db = DAL(DEFAULT_URI, check_reserved=["all"])
2701        t0 = db.define_table("t0", Field("name"))
2702        global ctr
2703        ctr = 0
2704
2705        def test_after_insert(i, r):
2706            self.assertIsInstance(i, OpRow)
2707            global ctr
2708            ctr += 1
2709            return True
2710
2711        t0._after_insert.append(test_after_insert)
2712        items = [{"name": "web2py_%s" % pos} for pos in range(0, 10, 1)]
2713        t0.bulk_insert(items)
2714        self.assertTrue(db(t0).count() == len(items))
2715        for pos in range(0, 10, 1):
2716            self.assertEqual(len(db(t0.name == "web2py_%s" % pos).select()), 1)
2717            self.assertEqual(db(t0.name == "web2py_%s" % pos).count(), 1)
2718        self.assertTrue(ctr == len(items))
2719        drop(t0)
2720        db.close()
2721
2722
2723if __name__ == "__main__":
2724    unittest.main()
2725    tearDownModule()
Note: See TracBrowser for help on using the repository browser.