source: OpenRLabs-Git/deploy/rlabs-docker/web2py-rlabs/gluon/packages/dal/tests/sql.py

main
Last change on this file was 42bd667, checked in by David Fuertes <dfuertes@…>, 4 years ago

Historial Limpio

  • Property mode set to 100755
File size: 132.2 KB
Line 
1# -*- coding: utf-8 -*-
2"""
3    Basic unit tests
4"""
5
6from __future__ import print_function
7import os
8import glob
9import datetime
10import json
11import pickle
12
13from pydal._compat import basestring, StringIO, integer_types, xrange, BytesIO, to_bytes
14from pydal import DAL, Field
15from pydal.helpers.classes import SQLALL, OpRow
16from pydal.objects import Table, Expression, Row
17from ._compat import unittest
18from ._adapt import (
19    DEFAULT_URI,
20    IS_POSTGRESQL,
21    IS_SQLITE,
22    IS_MSSQL,
23    IS_MYSQL,
24    IS_TERADATA,
25    IS_NOSQL,
26    IS_ORACLE,
27)
28
29from ._helpers import DALtest
30
31long = integer_types[-1]
32
33print("Testing against %s engine (%s)" % (DEFAULT_URI.partition(":")[0], DEFAULT_URI))
34
35
36ALLOWED_DATATYPES = [
37    "string",
38    "text",
39    "integer",
40    "boolean",
41    "double",
42    "blob",
43    "date",
44    "time",
45    "datetime",
46    "upload",
47    "password",
48    "json",
49    "bigint",
50]
51
52
53def setUpModule():
54    if IS_MYSQL or IS_TERADATA or IS_ORACLE:
55        db = DAL(DEFAULT_URI, check_reserved=["all"])
56
57        def clean_table(db, tablename):
58            try:
59                db.define_table(tablename)
60            except Exception as e:
61                pass
62            try:
63                db[tablename].drop()
64            except Exception as e:
65                pass
66
67        for tablename in [
68            "tt",
69            "t0",
70            "t1",
71            "t2",
72            "t3",
73            "t4",
74            "easy_name",
75            "tt_archive",
76            "pet_farm",
77            "person",
78        ]:
79            clean_table(db, tablename)
80        db.close()
81
82
83def tearDownModule():
84    if os.path.isfile("sql.log"):
85        os.unlink("sql.log")
86    for a in glob.glob("*.table"):
87        os.unlink(a)
88
89
90class TestFields(DALtest):
91    def testFieldName(self):
92        """
93        - a "str" something
94        - not a method or property of Table
95        - "dotted-notation" friendly:
96            - a valid python identifier
97            - not a python keyword
98            - not starting with underscore or an integer
99            - not containing dots
100
101        Basically, anything alphanumeric, no symbols, only underscore as
102        punctuation
103        """
104
105        # Check that Fields cannot start with underscores
106        self.assertRaises(SyntaxError, Field, "_abc", "string")
107
108        # Check that Fields cannot contain punctuation other than underscores
109        self.assertRaises(SyntaxError, Field, "a.bc", "string")
110
111        # Check that Fields cannot be a name of a method or property of Table
112        for x in ["drop", "on", "truncate"]:
113            self.assertRaises(SyntaxError, Field, x, "string")
114
115        # Check that Fields allows underscores in the body of a field name.
116        self.assertTrue(
117            Field("a_bc", "string"),
118            "Field isn't allowing underscores in fieldnames.  It should.",
119        )
120
121        # Check that Field names don't allow a python keyword
122        self.assertRaises(SyntaxError, Field, "True", "string")
123        self.assertRaises(SyntaxError, Field, "elif", "string")
124        self.assertRaises(SyntaxError, Field, "while", "string")
125
126        # Check that Field names don't allow a non-valid python identifier
127        non_valid_examples = ["1x", "xx$%@%", "xx yy", "yy\na", "yy\n"]
128        for a in non_valid_examples:
129            self.assertRaises(SyntaxError, Field, a, "string")
130
131        # Check that Field names don't allow a unicode string
132        non_valid_examples = non_valid_examples = [
133            "ℙƴ☂ℌøἤ",
134            u"ℙƴ☂ℌøἤ",
135            u"àè",
136            u"ṧøмℯ",
137            u"тεṧт",
138            u"♥αłüℯṧ",
139            u"ℊεᾔ℮яαт℮∂",
140            u"♭ƴ",
141            u"ᾔ☤ρℌℓ☺ḓ",
142        ]
143        for a in non_valid_examples:
144            self.assertRaises(SyntaxError, Field, a, "string")
145
146    def testFieldTypes(self):
147
148        # Check that string, and password default length is 512
149        for typ in ["string", "password"]:
150            self.assertTrue(
151                Field("abc", typ).length == 512,
152                "Default length for type '%s' is not 512 or 255" % typ,
153            )
154
155        # Check that upload default length is 512
156        self.assertTrue(
157            Field("abc", "upload").length == 512,
158            "Default length for type 'upload' is not 512",
159        )
160
161        # Check that Tables passed in the type creates a reference
162        self.assertTrue(
163            Field("abc", Table(None, "temp")).type == "reference temp",
164            "Passing a Table does not result in a reference type.",
165        )
166
167    def testFieldLabels(self):
168
169        # Check that a label is successfully built from the supplied fieldname
170        self.assertTrue(
171            Field("abc", "string").label == "Abc", "Label built is incorrect"
172        )
173        self.assertTrue(
174            Field("abc_def", "string").label == "Abc Def", "Label built is incorrect"
175        )
176
177    def testFieldFormatters(self):  # Formatter should be called Validator
178
179        # Test the default formatters
180        for typ in ALLOWED_DATATYPES:
181            f = Field("abc", typ)
182            if typ not in ["date", "time", "datetime"]:
183                isinstance(f.formatter("test"), str)
184            else:
185                isinstance(f.formatter(datetime.datetime.now()), str)
186
187    def testUploadField(self):
188        import tempfile
189
190        stream = tempfile.NamedTemporaryFile()
191        content = b"this is the stream content"
192        stream.write(content)
193        # rewind before inserting
194        stream.seek(0)
195
196        db = self.connect()
197        db.define_table(
198            "tt",
199            Field(
200                "fileobj", "upload", uploadfolder=tempfile.gettempdir(), autodelete=True
201            ),
202        )
203        f_id = db.tt.insert(fileobj=stream)
204
205        row = db.tt[f_id]
206        (retr_name, retr_stream) = db.tt.fileobj.retrieve(row.fileobj)
207
208        # name should be the same
209        self.assertEqual(retr_name, os.path.basename(stream.name))
210        # content should be the same
211        retr_content = retr_stream.read()
212        self.assertEqual(retr_content, content)
213
214        # close streams!
215        retr_stream.close()
216
217        # delete
218        row.delete_record()
219
220        # drop
221        db.tt.drop()
222
223        # this part is triggered only if fs (AKA pyfilesystem) module is installed
224        try:
225            from fs.memoryfs import MemoryFS
226
227            # rewind before inserting
228            stream.seek(0)
229            db.define_table(
230                "tt", Field("fileobj", "upload", uploadfs=MemoryFS(), autodelete=True)
231            )
232
233            f_id = db.tt.insert(fileobj=stream)
234
235            row = db.tt[f_id]
236            (retr_name, retr_stream) = db.tt.fileobj.retrieve(row.fileobj)
237
238            # name should be the same
239            self.assertEqual(retr_name, os.path.basename(stream.name))
240            # content should be the same
241            retr_content = retr_stream.read()
242            self.assertEqual(retr_content, content)
243
244            # close streams
245            retr_stream.close()
246            stream.close()
247
248            # delete
249            row.delete_record()
250
251            # drop
252            db.tt.drop()
253
254        except ImportError:
255            pass
256
257    def testBlobBytes(self):
258        # Test blob with latin1 encoded bytes
259        db = self.connect()
260        obj = pickle.dumps("0")
261        db.define_table("tt", Field("aa", "blob"))
262        self.assertEqual(db.tt.insert(aa=obj), 1)
263        self.assertEqual(to_bytes(db().select(db.tt.aa)[0].aa), obj)
264        self.assertEqual(db.tt[1].aa, obj)
265        self.assertEqual(BytesIO(to_bytes(db.tt[1].aa)).read(), obj)
266        db.tt.drop()
267
268    def testRun(self):
269        # Test all field types and their return values
270        db = self.connect()
271        for ft in ["string", "text", "password", "upload", "blob"]:
272            db.define_table("tt", Field("aa", ft, default=""))
273            self.assertEqual(db.tt.insert(aa="ö"), 1)
274            if not (IS_ORACLE and (ft == "text" or ft == "blob")):
275                # only verify insert for LOB types in oracle;
276                # select may create seg fault in test env
277                self.assertEqual(db().select(db.tt.aa)[0].aa, "ö")
278            db.tt.drop()
279        db.define_table("tt", Field("aa", "integer", default=1))
280        self.assertEqual(db.tt.insert(aa=3), 1)
281        self.assertEqual(db().select(db.tt.aa)[0].aa, 3)
282        db.tt.drop()
283
284        db.define_table("tt", Field("aa", "string"))
285        ucs = "A\xc3\xa9 A"
286        self.assertEqual(db.tt.insert(aa=ucs), 1)
287        self.assertEqual(db().select(db.tt.aa)[0].aa, ucs)
288        self.assertEqual(db().select(db.tt.aa.with_alias("zz"))[0].zz, ucs)
289        db.tt.drop()
290
291        db.define_table("tt", Field("aa", "double", default=1))
292        self.assertEqual(db.tt.insert(aa=3.1), 1)
293        self.assertEqual(db().select(db.tt.aa)[0].aa, 3.1)
294        db.tt.drop()
295        db.define_table("tt", Field("aa", "boolean", default=True))
296        self.assertEqual(db.tt.insert(aa=True), 1)
297        self.assertEqual(db().select(db.tt.aa)[0].aa, True)
298        db.tt.drop()
299        db.define_table("tt", Field("aa", "json", default={}))
300        # test different python objects for correct serialization in json
301        objs = [
302            {"a": 1, "b": 2},
303            [1, 2, 3],
304            "abc",
305            True,
306            False,
307            None,
308            11,
309            14.3,
310            long(11),
311        ]
312        for obj in objs:
313            rtn_id = db.tt.insert(aa=obj)
314            rtn = db(db.tt.id == rtn_id).select().first().aa
315            self.assertEqual(obj, rtn)
316        db.tt.drop()
317        db.define_table("tt", Field("aa", "date", default=datetime.date.today()))
318        t0 = datetime.date.today()
319        self.assertEqual(db.tt.insert(aa=t0), 1)
320        self.assertEqual(db().select(db.tt.aa)[0].aa, t0)
321        db.tt.drop()
322        db.define_table(
323            "tt", Field("aa", "datetime", default=datetime.datetime.today())
324        )
325        t0 = datetime.datetime(1971, 12, 21, 10, 30, 55, 0,)
326        self.assertEqual(db.tt.insert(aa=t0), 1)
327        self.assertEqual(db().select(db.tt.aa)[0].aa, t0)
328
329        ## Row APIs
330        row = db().select(db.tt.aa)[0]
331        self.assertEqual(db.tt[1].aa, t0)
332        self.assertEqual(db.tt["aa"], db.tt.aa)
333        self.assertEqual(db.tt(1).aa, t0)
334        self.assertTrue(db.tt(1, aa=None) == None)
335        self.assertFalse(db.tt(1, aa=t0) == None)
336        self.assertEqual(row.aa, t0)
337        self.assertEqual(row["aa"], t0)
338        self.assertEqual(row["tt.aa"], t0)
339        self.assertEqual(row("tt.aa"), t0)
340
341        ## Lazy and Virtual fields
342        db.tt.b = Field.Virtual(lambda row: row.tt.aa)
343        # test for FieldVirtual.bind
344        self.assertEqual(db.tt.b.tablename, "tt")
345        self.assertEqual(db.tt.b.name, "b")
346        db.tt.c = Field.Lazy(lambda row: row.tt.aa)
347        # test for FieldMethod.bind
348        self.assertEqual(db.tt.c.name, "c")
349        rows = db().select(db.tt.aa)
350        row = rows[0]
351        self.assertEqual(row.b, t0)
352        self.assertEqual(row.c(), t0)
353        # test for BasicRows.colnames_fields
354        rows.colnames.insert(0, "tt.b")
355        rows.colnames.insert(1, "tt.c")
356        colnames_fields = rows.colnames_fields
357        self.assertIs(colnames_fields[0], db.tt.b)
358        self.assertIs(colnames_fields[1], db.tt.c)
359        db.tt.drop()
360
361        if not IS_ORACLE:
362            db.define_table("tt", Field("aa", "time", default="11:30"))
363            t0 = datetime.time(10, 30, 55)
364            self.assertEqual(db.tt.insert(aa=t0), 1)
365            self.assertEqual(db().select(db.tt.aa)[0].aa, t0)
366            db.tt.drop()
367
368        # aggregation type detection
369        db.define_table(
370            "tt", Field("aa", "datetime", default=datetime.datetime.today())
371        )
372        t0 = datetime.datetime(1971, 12, 21, 10, 30, 55, 0)
373        self.assertEqual(db.tt.insert(aa=t0), 1)
374        self.assertEqual(db().select(db.tt.aa.min())[0][db.tt.aa.min()], t0)
375        db.tt.drop()
376
377
378class TestTables(unittest.TestCase):
379    def testTableNames(self):
380        """
381        - a "str" something
382        - not a method or property of DAL
383        - "dotted-notation" friendly:
384            - a valid python identifier
385            - not a python keyword
386            - not starting with underscore or an integer
387            - not containing dots
388
389        Basically, anything alphanumeric, no symbols, only underscore as
390        punctuation
391        """
392
393        # Check that Tables cannot start with underscores
394        self.assertRaises(SyntaxError, Table, None, "_abc")
395
396        # Check that Tables cannot contain punctuation other than underscores
397        self.assertRaises(SyntaxError, Table, None, "a.bc")
398
399        # Check that Tables cannot be a name of a method or property of DAL
400        for x in ["define_table", "tables", "as_dict"]:
401            self.assertRaises(SyntaxError, Table, None, x)
402
403        # Check that Table allows underscores in the body of a field name.
404        self.assertTrue(
405            Table(None, "a_bc"),
406            "Table isn't allowing underscores in tablename.  It should.",
407        )
408
409        # Check that Table names don't allow a python keyword
410        self.assertRaises(SyntaxError, Table, None, "True")
411        self.assertRaises(SyntaxError, Table, None, "elif")
412        self.assertRaises(SyntaxError, Table, None, "while")
413
414        # Check that Table names don't allow a non-valid python identifier
415        non_valid_examples = ["1x", "xx$%@%", "xx yy", "yy\na", "yy\n"]
416        for a in non_valid_examples:
417            self.assertRaises(SyntaxError, Table, None, a)
418
419        # Check that Table names don't allow a unicode string
420        non_valid_examples = [
421            "ℙƴ☂ℌøἤ",
422            u"ℙƴ☂ℌøἤ",
423            u"àè",
424            u"ṧøмℯ",
425            u"тεṧт",
426            u"♥αłüℯṧ",
427            u"ℊεᾔ℮яαт℮∂",
428            u"♭ƴ",
429            u"ᾔ☤ρℌℓ☺ḓ",
430        ]
431        for a in non_valid_examples:
432            self.assertRaises(SyntaxError, Table, None, a)
433
434
435class TestAll(unittest.TestCase):
436    def setUp(self):
437        self.pt = Table(None, "PseudoTable", Field("name"), Field("birthdate"))
438
439    def testSQLALL(self):
440        ans = "PseudoTable.id, PseudoTable.name, PseudoTable.birthdate"
441        self.assertEqual(str(SQLALL(self.pt)), ans)
442
443
444class TestTable(DALtest):
445    def testTableCreation(self):
446
447        # Check for error when not passing type other than Field or Table
448
449        self.assertRaises(SyntaxError, Table, None, "test", None)
450
451        persons = Table(
452            None, "persons", Field("firstname", "string"), Field("lastname", "string")
453        )
454
455        # Does it have the correct fields?
456
457        self.assertTrue(set(persons.fields).issuperset(set(["firstname", "lastname"])))
458
459        # ALL is set correctly
460
461        self.assertTrue("persons.firstname, persons.lastname" in str(persons.ALL))
462
463    def testTableAlias(self):
464        db = self.connect()
465        persons = Table(
466            db, "persons", Field("firstname", "string"), Field("lastname", "string")
467        )
468        aliens = persons.with_alias("aliens")
469
470        # Are the different table instances with the same fields
471
472        self.assertTrue(persons is not aliens)
473        self.assertTrue(set(persons.fields) == set(aliens.fields))
474
475    def testTableInheritance(self):
476        persons = Table(
477            None, "persons", Field("firstname", "string"), Field("lastname", "string")
478        )
479        customers = Table(
480            None, "customers", Field("items_purchased", "integer"), persons
481        )
482        self.assertTrue(
483            set(customers.fields).issuperset(
484                set(["items_purchased", "firstname", "lastname"])
485            )
486        )
487
488
489class TestInsert(DALtest):
490    def testRun(self):
491        db = self.connect()
492        db.define_table("tt", Field("aa"))
493        self.assertEqual(db.tt.insert(aa="1"), 1)
494        if not IS_TERADATA:
495            self.assertEqual(db.tt.insert(aa="1"), 2)
496            self.assertEqual(db.tt.insert(aa="1"), 3)
497        else:
498            self.assertEqual(db.tt.insert(aa="1"), 1)
499            self.assertEqual(db.tt.insert(aa="1"), 1)
500
501        self.assertEqual(db(db.tt.aa == "1").count(), 3)
502        self.assertEqual(db(db.tt.aa == "2").isempty(), True)
503        self.assertEqual(db(db.tt.aa == "1").update(aa="2"), 3)
504        self.assertEqual(db(db.tt.aa == "2").count(), 3)
505        self.assertEqual(db(db.tt.aa == "2").isempty(), False)
506        self.assertEqual(db(db.tt.aa == "2").delete(), 3)
507        self.assertEqual(db(db.tt.aa == "2").isempty(), True)
508
509
510class TestSelect(DALtest):
511    def testRun(self):
512        db = self.connect()
513        db.define_table("tt", Field("aa"))
514        self.assertEqual(db.tt.insert(aa="1"), 1)
515        if not IS_TERADATA:
516            self.assertEqual(db.tt.insert(aa="2"), 2)
517            self.assertEqual(db.tt.insert(aa="3"), 3)
518        else:
519            self.assertEqual(db.tt.insert(aa="2"), 1)
520            self.assertEqual(db.tt.insert(aa="3"), 1)
521        self.assertEqual(db(db.tt.id > 0).count(), 3)
522        self.assertEqual(
523            db(db.tt.id > 0).select(orderby=~db.tt.aa | db.tt.id)[0].aa, "3"
524        )
525        self.assertEqual(len(db(db.tt.id > 0).select(limitby=(1, 2))), 1)
526        self.assertEqual(db(db.tt.id > 0).select(limitby=(1, 2))[0].aa, "2")
527        self.assertEqual(len(db().select(db.tt.ALL)), 3)
528        self.assertEqual(db(db.tt.aa == None).count(), 0)
529        self.assertEqual(db(db.tt.aa != None).count(), 3)
530        self.assertEqual(db(db.tt.aa > "1").count(), 2)
531        self.assertEqual(db(db.tt.aa >= "1").count(), 3)
532        self.assertEqual(db(db.tt.aa == "1").count(), 1)
533        self.assertEqual(db(db.tt.aa != "1").count(), 2)
534        self.assertEqual(db(db.tt.aa < "3").count(), 2)
535        self.assertEqual(db(db.tt.aa <= "3").count(), 3)
536        self.assertEqual(db(db.tt.aa > "1")(db.tt.aa < "3").count(), 1)
537        self.assertEqual(db((db.tt.aa > "1") & (db.tt.aa < "3")).count(), 1)
538        self.assertEqual(db((db.tt.aa > "1") | (db.tt.aa < "3")).count(), 3)
539        self.assertEqual(db((db.tt.aa > "1") & ~(db.tt.aa > "2")).count(), 1)
540        self.assertEqual(db(~(db.tt.aa > "1") & (db.tt.aa > "2")).count(), 0)
541        # Test for REGEX_TABLE_DOT_FIELD
542        self.assertEqual(db(db.tt).select("tt.aa").first()[db.tt.aa], "1")
543
544    def testTestQuery(self):
545        db = self.connect()
546        db._adapter.test_connection()
547
548    def testListInteger(self):
549        db = self.connect()
550        db.define_table("tt", Field("aa", "list:integer"))
551        l = [1, 2, 3, 4, 5]
552        db.tt.insert(aa=l)
553        self.assertEqual(db(db.tt).select("tt.aa").first()[db.tt.aa], l)
554
555    def testListString(self):
556        db = self.connect()
557        db.define_table("tt", Field("aa", "list:string"))
558        l = ["a", "b", "c"]
559        db.tt.insert(aa=l)
560        self.assertEqual(db(db.tt).select("tt.aa").first()[db.tt.aa], l)
561
562    def testListReference(self):
563        db = self.connect()
564        db.define_table("t0", Field("aa", "string"))
565        db.define_table("tt", Field("t0_id", "list:reference t0"))
566        id_a1 = db.t0.insert(aa="test1")
567        id_a2 = db.t0.insert(aa="test2")
568        ref1 = [id_a1]
569        ref2 = [id_a2]
570        ref3 = [id_a1, id_a2]
571        db.tt.insert(t0_id=ref1)
572        self.assertEqual(db(db.tt).select(db.tt.t0_id).last()[db.tt.t0_id], ref1)
573        db.tt.insert(t0_id=ref2)
574        self.assertEqual(db(db.tt).select(db.tt.t0_id).last()[db.tt.t0_id], ref2)
575        db.tt.insert(t0_id=ref3)
576        self.assertEqual(db(db.tt).select(db.tt.t0_id).last()[db.tt.t0_id], ref3)
577
578        self.assertEqual(db(db.tt.t0_id == ref3).count(), 1)
579
580    def testGroupByAndDistinct(self):
581        db = self.connect()
582        db.define_table(
583            "tt", Field("aa"), Field("bb", "integer"), Field("cc", "integer")
584        )
585        db.tt.insert(aa="4", bb=1, cc=1)
586        db.tt.insert(aa="3", bb=2, cc=1)
587        db.tt.insert(aa="3", bb=1, cc=1)
588        db.tt.insert(aa="1", bb=1, cc=1)
589        db.tt.insert(aa="1", bb=2, cc=1)
590        db.tt.insert(aa="1", bb=3, cc=1)
591        db.tt.insert(aa="1", bb=4, cc=1)
592        db.tt.insert(aa="2", bb=1, cc=1)
593        db.tt.insert(aa="2", bb=2, cc=1)
594        db.tt.insert(aa="2", bb=3, cc=1)
595        self.assertEqual(db(db.tt).count(), 10)
596
597        # test groupby
598        result = db().select(db.tt.aa, db.tt.bb.sum(), groupby=db.tt.aa)
599        self.assertEqual(len(result), 4)
600        result = db().select(
601            db.tt.aa, db.tt.bb.sum(), groupby=db.tt.aa, orderby=db.tt.aa
602        )
603        self.assertEqual(tuple(result.response[2]), ("3", 3))
604        result = db().select(
605            db.tt.aa, db.tt.bb.sum(), groupby=db.tt.aa, orderby=~db.tt.aa
606        )
607        self.assertEqual(tuple(result.response[1]), ("3", 3))
608        result = db().select(
609            db.tt.aa,
610            db.tt.bb,
611            db.tt.cc.sum(),
612            groupby=db.tt.aa | db.tt.bb,
613            orderby=(db.tt.aa | ~db.tt.bb),
614        )
615        self.assertEqual(tuple(result.response[4]), ("2", 3, 1))
616        result = db().select(
617            db.tt.aa,
618            db.tt.bb.sum(),
619            groupby=db.tt.aa,
620            orderby=~db.tt.aa,
621            limitby=(1, 2),
622        )
623        self.assertEqual(len(result), 1)
624        self.assertEqual(tuple(result.response[0]), ("3", 3))
625        result = db().select(
626            db.tt.aa, db.tt.bb.sum(), groupby=db.tt.aa, orderby=db.tt.aa, limitby=(0, 3)
627        )
628        self.assertEqual(len(result), 3)
629        self.assertEqual(tuple(result.response[2]), ("3", 3))
630
631        # test having
632        self.assertEqual(
633            len(
634                db().select(
635                    db.tt.aa,
636                    db.tt.bb.sum(),
637                    groupby=db.tt.aa,
638                    having=db.tt.bb.sum() > 2,
639                )
640            ),
641            3,
642        )
643
644        # test distinct
645        result = db().select(db.tt.aa, db.tt.cc, distinct=True)
646        self.assertEqual(len(result), 4)
647        result = db().select(db.tt.cc, distinct=True, groupby=db.tt.cc)
648        self.assertEqual(len(result), 1)
649        self.assertEqual(result[0].cc, 1)
650        result = db().select(db.tt.aa, distinct=True, orderby=~db.tt.aa)
651        self.assertEqual(result[2].aa, "2")
652        self.assertEqual(result[1].aa, "3")
653        result = db().select(
654            db.tt.aa, db.tt.bb, distinct=True, orderby=(db.tt.aa | ~db.tt.bb)
655        )
656        self.assertEqual(tuple(result.response[4]), ("2", 3))
657        result = db().select(db.tt.aa, distinct=True, orderby=~db.tt.aa, limitby=(1, 2))
658        self.assertEqual(len(result), 1)
659        self.assertEqual(result[0].aa, "3")
660
661        # test count distinct
662        db.tt.insert(aa="2", bb=3, cc=1)
663        self.assertEqual(db(db.tt).count(distinct=db.tt.aa), 4)
664        self.assertEqual(db(db.tt.aa).count(db.tt.aa), 4)
665        self.assertEqual(db(db.tt.aa).count(), 11)
666        count = db.tt.aa.count()
667        self.assertEqual(db(db.tt).select(count).first()[count], 11)
668
669        count = db.tt.aa.count(distinct=True)
670        sum = db.tt.bb.sum()
671        result = db(db.tt).select(count, sum)
672        self.assertEqual(tuple(result.response[0]), (4, 23))
673
674    def testCoalesce(self):
675        db = self.connect()
676        db.define_table("tt", Field("aa"), Field("bb"), Field("cc"), Field("dd"))
677        db.tt.insert(aa="xx")
678        db.tt.insert(aa="xx", bb="yy")
679        db.tt.insert(aa="xx", bb="yy", cc="zz")
680        if not IS_ORACLE:
681            # empty string is treated as null
682            db.tt.insert(aa="xx", bb="yy", cc="zz", dd="")
683        result = db(db.tt).select(db.tt.dd.coalesce(db.tt.cc, db.tt.bb, db.tt.aa))
684        self.assertEqual(result.response[0][0], "xx")
685        self.assertEqual(result.response[1][0], "yy")
686        self.assertEqual(result.response[2][0], "zz")
687        if not IS_ORACLE:
688            self.assertEqual(result.response[3][0], "")
689        db.tt.drop()
690
691        db.define_table("tt", Field("aa", "integer"), Field("bb"))
692        db.tt.insert(bb="")
693        db.tt.insert(aa=1)
694        result = db(db.tt).select(db.tt.aa.coalesce_zero())
695        self.assertEqual(result.response[0][0], 0)
696        self.assertEqual(result.response[1][0], 1)
697
698    def testTableAliasCollisions(self):
699        db = self.connect()
700        db.define_table("t1", Field("aa"))
701        db.define_table("t2", Field("bb"))
702        t1, t2 = db.t1, db.t2
703        t1.with_alias("t2")
704        t2.with_alias("t1")
705
706        # Passing tables by name will result in exception
707        t1.insert(aa="test")
708        t2.insert(bb="foo")
709        db(t1.id > 0).update(aa="bar")
710        having = t1.aa != None
711        join = [t2.on(t1.aa == t2.bb)]
712        db(t1.aa == t2.bb).select(t1.aa, groupby=t1.aa, having=having, orderby=t1.aa)
713        db(t1.aa).select(t1.aa, join=join, groupby=t1.aa, having=having, orderby=t1.aa)
714        db(t1.aa).select(t1.aa, left=join, groupby=t1.aa, having=having, orderby=t1.aa)
715        db(t1.id > 0).delete()
716
717
718class TestSubselect(DALtest):
719    def testMethods(self):
720        db = self.connect()
721        db.define_table("tt", Field("aa", "integer"), Field("bb"))
722        data = [dict(aa=1, bb="foo"), dict(aa=1, bb="bar"), dict(aa=2, bb="foo")]
723        for item in data:
724            db.tt.insert(**item)
725        fields = [db.tt.aa, db.tt.bb, db.tt.aa + 2, (db.tt.aa + 1).with_alias("exp")]
726        sub = db(db.tt).nested_select(*fields, orderby=db.tt.id)
727        # Check the fields provided by the object
728        self.assertEqual(sorted(["aa", "bb", "exp"]), sorted(list(sub.fields)))
729        for name in sub.fields:
730            self.assertIsInstance(sub[name], Field)
731        for item in sub:
732            self.assertIsInstance(item, Field)
733        self.assertEqual(len(list(sub)), len(sub.fields))
734        for key, val in zip(sub.fields, sub):
735            self.assertIs(sub[key], val)
736            self.assertIs(getattr(sub, key), val)
737        tmp = sub._filter_fields(dict(aa=1, exp=2, foo=3))
738        self.assertEqual(tmp, dict(aa=1, exp=2))
739        # Check result from executing the query
740        result = sub()
741        self.assertEqual(len(result), len(data))
742        for idx, row in enumerate(data):
743            self.assertEqual(result[idx]["tt"].as_dict(), row)
744            self.assertEqual(result[idx]["exp"], row["aa"] + 1)
745        result = db.executesql(str(sub))
746        for idx, row in enumerate(data):
747            tmp = [row["aa"], row["bb"], row["aa"] + 2, row["aa"] + 1]
748            self.assertEqual(list(result[idx]), tmp)
749        # Check that query expansion methods don't work without alias
750        self.assertEqual(sub._rname, None)
751        self.assertEqual(sub._raw_rname, None)
752        self.assertEqual(sub._dalname, None)
753        with self.assertRaises(SyntaxError):
754            sub.query_name()
755        with self.assertRaises(SyntaxError):
756            sub.sql_shortref
757        with self.assertRaises(SyntaxError):
758            sub.on(sub.aa != None)
759        # Alias checks
760        sub = sub.with_alias("foo")
761        result = sub()
762        for idx, row in enumerate(data):
763            self.assertEqual(result[idx]["tt"].as_dict(), row)
764            self.assertEqual(result[idx]["exp"], row["aa"] + 1)
765        # Check query expansion methods again
766        self.assertEqual(sub._rname, None)
767        self.assertEqual(sub._raw_rname, None)
768        self.assertEqual(sub._dalname, None)
769        self.assertEqual(sub.query_name()[0], str(sub))
770        self.assertEqual(sub.sql_shortref, db._adapter.dialect.quote("foo"))
771        self.assertIsInstance(sub.on(sub.aa != None), Expression)
772
773    def testSelectArguments(self):
774        db = self.connect()
775        db.define_table("tt", Field("aa", "integer"), Field("bb"))
776        data = [
777            dict(aa=1, bb="foo"),
778            dict(aa=1, bb="bar"),
779            dict(aa=2, bb="foo"),
780            dict(aa=3, bb="foo"),
781            dict(aa=3, bb="baz"),
782        ]
783        expected = [(1, None, 0), (2, 2, 2), (2, 2, 2), (3, 4, 3), (3, 8, 6)]
784        for item in data:
785            db.tt.insert(**item)
786
787        # Check that select clauses work as expected in stand-alone query
788        t1 = db.tt.with_alias("t1")
789        t2 = db.tt.with_alias("t2")
790        fields = [
791            t1.aa,
792            t2.aa.sum().with_alias("total"),
793            t2.aa.count().with_alias("cnt"),
794        ]
795        join = t1.on(db.tt.bb != t1.bb)
796        left = t2.on(t1.aa > t2.aa)
797        group = db.tt.bb | t1.aa
798        having = db.tt.aa.count() > 1
799        order = t1.aa | t2.aa.count()
800        limit = (1, 6)
801        sub = db(db.tt.aa != 2).nested_select(
802            *fields,
803            join=join,
804            left=left,
805            orderby=order,
806            groupby=group,
807            having=having,
808            limitby=limit
809        )
810        result = sub()
811        self.assertEqual(len(result), len(expected))
812        for idx, val in enumerate(expected):
813            self.assertEqual(result[idx]["t1"]["aa"], val[0])
814            self.assertEqual(result[idx]["total"], val[1])
815            self.assertEqual(result[idx]["cnt"], val[2])
816
817        # Check again when nested inside another query
818        # Also check that the alias will not conflict with existing table
819        t3 = db.tt.with_alias("t3")
820        sub = sub.with_alias("tt")
821        query = (t3.bb == "foo") & (t3.aa == sub.aa)
822        order = t3.aa | sub.cnt
823        result = db(query).select(t3.aa, sub.total, sub.cnt, orderby=order)
824        for idx, val in enumerate(expected):
825            self.assertEqual(result[idx]["t3"]["aa"], val[0])
826            self.assertEqual(result[idx]["tt"]["total"], val[1])
827            self.assertEqual(result[idx]["tt"]["cnt"], val[2])
828
829        # Check "distinct" modifier separately
830        sub = db(db.tt.aa != 2).nested_select(db.tt.aa, orderby=db.tt.aa, distinct=True)
831        result = sub().as_list()
832        self.assertEqual(result, [dict(aa=1), dict(aa=3)])
833
834    def testCorrelated(self):
835        db = self.connect()
836        db.define_table(
837            "t1", Field("aa", "integer"), Field("bb"), Field("mark", "integer")
838        )
839        db.define_table("t2", Field("aa", "integer"), Field("cc"))
840        db.define_table("t3", Field("aa", "integer"))
841        data_t1 = [
842            dict(aa=1, bb="bar"),
843            dict(aa=1, bb="foo"),
844            dict(aa=2, bb="foo"),
845            dict(aa=2, bb="test"),
846            dict(aa=3, bb="baz"),
847            dict(aa=3, bb="foo"),
848        ]
849        data_t2 = [dict(aa=1, cc="foo"), dict(aa=2, cc="bar"), dict(aa=3, cc="baz")]
850        expected_cor = [(1, "foo"), (3, "baz")]
851        expected_leftcor = [(1, "foo"), (2, None), (3, "baz")]
852        expected_uncor = [(1, "bar"), (1, "foo"), (2, "foo"), (3, "baz"), (3, "foo")]
853        for item in data_t1:
854            db.t1.insert(**item)
855        for item in data_t2:
856            db.t2.insert(**item)
857            db.t3.insert(aa=item["aa"])
858
859        # Correlated subqueries
860        subquery = db.t1.aa == db.t2.aa
861        subfields = [db.t2.cc]
862        sub = db(subquery).nested_select(*subfields).with_alias("sub")
863        query = db.t1.bb.belongs(sub)
864        order = db.t1.aa | db.t1.bb
865        result = db(query).select(db.t1.aa, db.t1.bb, orderby=order)
866        self.assertEqual(len(result), len(expected_cor))
867        for idx, val in enumerate(expected_cor):
868            self.assertEqual(result[idx]["aa"], val[0])
869            self.assertEqual(result[idx]["bb"], val[1])
870
871        join = [db.t1.on((db.t3.aa == db.t1.aa) & db.t1.bb.belongs(sub))]
872        order = db.t3.aa | db.t1.bb
873        result = db(db.t3).select(db.t3.aa, db.t1.bb, join=join, orderby=order)
874        self.assertEqual(len(result), len(expected_cor))
875        for idx, val in enumerate(expected_cor):
876            self.assertEqual(result[idx]["t3"]["aa"], val[0])
877            self.assertEqual(result[idx]["t1"]["bb"], val[1])
878
879        left = [db.t1.on((db.t3.aa == db.t1.aa) & db.t1.bb.belongs(sub))]
880        result = db(db.t3).select(db.t3.aa, db.t1.bb, left=left, orderby=order)
881        self.assertEqual(len(result), len(expected_leftcor))
882        for idx, val in enumerate(expected_leftcor):
883            self.assertEqual(result[idx]["t3"]["aa"], val[0])
884            self.assertEqual(result[idx]["t1"]["bb"], val[1])
885
886        order = db.t1.aa | db.t1.bb
887        db(db.t1.bb.belongs(sub)).update(mark=1)
888        result = db(db.t1.mark == 1).select(db.t1.aa, db.t1.bb, orderby=order)
889        self.assertEqual(len(result), len(expected_cor))
890        for idx, val in enumerate(expected_cor):
891            self.assertEqual(result[idx]["aa"], val[0])
892            self.assertEqual(result[idx]["bb"], val[1])
893
894        db(~db.t1.bb.belongs(sub)).delete()
895        result = db(db.t1.id > 0).select(db.t1.aa, db.t1.bb, orderby=order)
896        self.assertEqual(len(result), len(expected_cor))
897        for idx, val in enumerate(expected_cor):
898            self.assertEqual(result[idx]["aa"], val[0])
899            self.assertEqual(result[idx]["bb"], val[1])
900
901        db(db.t1.id > 0).delete()
902        for item in data_t1:
903            db.t1.insert(**item)
904
905        # Uncorrelated subqueries
906        kwargs = dict(correlated=False)
907        sub = db(subquery).nested_select(*subfields, **kwargs)
908        query = db.t1.bb.belongs(sub)
909        order = db.t1.aa | db.t1.bb
910        result = db(query).select(db.t1.aa, db.t1.bb, orderby=order)
911        self.assertEqual(len(result), len(expected_uncor))
912        for idx, val in enumerate(expected_uncor):
913            self.assertEqual(result[idx]["aa"], val[0])
914            self.assertEqual(result[idx]["bb"], val[1])
915
916        join = [db.t1.on((db.t3.aa == db.t1.aa) & db.t1.bb.belongs(sub))]
917        order = db.t3.aa | db.t1.bb
918        result = db(db.t3).select(db.t3.aa, db.t1.bb, join=join, orderby=order)
919        self.assertEqual(len(result), len(expected_uncor))
920        for idx, val in enumerate(expected_uncor):
921            self.assertEqual(result[idx]["t3"]["aa"], val[0])
922            self.assertEqual(result[idx]["t1"]["bb"], val[1])
923
924        left = [db.t1.on((db.t3.aa == db.t1.aa) & db.t1.bb.belongs(sub))]
925        result = db(db.t3).select(db.t3.aa, db.t1.bb, left=left, orderby=order)
926        self.assertEqual(len(result), len(expected_uncor))
927        for idx, val in enumerate(expected_uncor):
928            self.assertEqual(result[idx]["t3"]["aa"], val[0])
929            self.assertEqual(result[idx]["t1"]["bb"], val[1])
930        # MySQL does not support subqueries with uncorrelated references
931        # to target table
932
933        # Correlation prevented by alias in parent select
934        tmp = db.t1.with_alias("tmp")
935        sub = db(subquery).nested_select(*subfields)
936        query = tmp.bb.belongs(sub)
937        order = tmp.aa | tmp.bb
938        result = db(query).select(tmp.aa, tmp.bb, orderby=order)
939        self.assertEqual(len(result), len(expected_uncor))
940        for idx, val in enumerate(expected_uncor):
941            self.assertEqual(result[idx]["aa"], val[0])
942            self.assertEqual(result[idx]["bb"], val[1])
943
944        join = [tmp.on((db.t3.aa == tmp.aa) & tmp.bb.belongs(sub))]
945        order = db.t3.aa | tmp.bb
946        result = db(db.t3).select(db.t3.aa, tmp.bb, join=join, orderby=order)
947        self.assertEqual(len(result), len(expected_uncor))
948        for idx, val in enumerate(expected_uncor):
949            self.assertEqual(result[idx]["t3"]["aa"], val[0])
950            self.assertEqual(result[idx]["tmp"]["bb"], val[1])
951
952        left = [tmp.on((db.t3.aa == tmp.aa) & tmp.bb.belongs(sub))]
953        result = db(db.t3).select(db.t3.aa, tmp.bb, left=left, orderby=order)
954        self.assertEqual(len(result), len(expected_uncor))
955        for idx, val in enumerate(expected_uncor):
956            self.assertEqual(result[idx]["t3"]["aa"], val[0])
957            self.assertEqual(result[idx]["tmp"]["bb"], val[1])
958        # SQLite does not support aliasing target table in UPDATE/DELETE
959        # MySQL does not support subqueries with uncorrelated references
960        # to target table
961
962
963class TestAddMethod(DALtest):
964    def testRun(self):
965        db = self.connect()
966        db.define_table("tt", Field("aa"))
967
968        @db.tt.add_method.all
969        def select_all(table, orderby=None):
970            return table._db(table).select(orderby=orderby)
971
972        self.assertEqual(db.tt.insert(aa="1"), 1)
973        if not IS_TERADATA:
974            self.assertEqual(db.tt.insert(aa="1"), 2)
975            self.assertEqual(db.tt.insert(aa="1"), 3)
976        else:
977            self.assertEqual(db.tt.insert(aa="1"), 1)
978            self.assertEqual(db.tt.insert(aa="1"), 1)
979        self.assertEqual(len(db.tt.all()), 3)
980
981
982class TestBelongs(DALtest):
983    def testRun(self):
984        db = self.connect()
985        db.define_table("tt", Field("aa"))
986        self.assertEqual(db.tt.insert(aa="1"), 1)
987        if not IS_TERADATA:
988            self.assertEqual(db.tt.insert(aa="2"), 2)
989            self.assertEqual(db.tt.insert(aa="3"), 3)
990        else:
991            self.assertEqual(db.tt.insert(aa="2"), 1)
992            self.assertEqual(db.tt.insert(aa="3"), 1)
993        self.assertEqual(db(db.tt.aa.belongs(("1", "3"))).count(), 2)
994        self.assertEqual(
995            db(db.tt.aa.belongs(db(db.tt.id > 2)._select(db.tt.aa))).count(), 1
996        )
997        self.assertEqual(
998            db(
999                db.tt.aa.belongs(db(db.tt.aa.belongs(("1", "3")))._select(db.tt.aa))
1000            ).count(),
1001            2,
1002        )
1003        self.assertEqual(
1004            db(
1005                db.tt.aa.belongs(
1006                    db(
1007                        db.tt.aa.belongs(
1008                            db(db.tt.aa.belongs(("1", "3")))._select(db.tt.aa)
1009                        )
1010                    )._select(db.tt.aa)
1011                )
1012            ).count(),
1013            2,
1014        )
1015
1016
1017class TestContains(DALtest):
1018    def testRun(self):
1019        db = self.connect()
1020        db.define_table("tt", Field("aa", "list:string"), Field("bb", "string"))
1021        self.assertEqual(db.tt.insert(aa=["aaa", "bbb"], bb="aaa"), 1)
1022        if not IS_TERADATA:
1023            self.assertEqual(db.tt.insert(aa=["bbb", "ddd"], bb="abb"), 2)
1024            self.assertEqual(db.tt.insert(aa=["eee", "aaa"], bb="acc"), 3)
1025        else:
1026            self.assertEqual(db.tt.insert(aa=["bbb", "ddd"], bb="abb"), 1)
1027            self.assertEqual(db.tt.insert(aa=["eee", "aaa"], bb="acc"), 1)
1028        self.assertEqual(db(db.tt.aa.contains("aaa")).count(), 2)
1029        self.assertEqual(db(db.tt.aa.contains("bbb")).count(), 2)
1030        self.assertEqual(db(db.tt.aa.contains("aa")).count(), 0)
1031        self.assertEqual(db(db.tt.bb.contains("a")).count(), 3)
1032        self.assertEqual(db(db.tt.bb.contains("b")).count(), 1)
1033        self.assertEqual(db(db.tt.bb.contains("d")).count(), 0)
1034        self.assertEqual(db(db.tt.aa.contains(db.tt.bb)).count(), 1)
1035        # case-sensitivity tests, if 1 it isn't
1036        is_case_insensitive = db(db.tt.bb.like("%AA%")).count()
1037        if is_case_insensitive:
1038            self.assertEqual(db(db.tt.aa.contains("AAA")).count(), 2)
1039            self.assertEqual(db(db.tt.bb.contains("A")).count(), 3)
1040        else:
1041            self.assertEqual(
1042                db(db.tt.aa.contains("AAA", case_sensitive=True)).count(), 0
1043            )
1044            self.assertEqual(db(db.tt.bb.contains("A", case_sensitive=True)).count(), 0)
1045            self.assertEqual(
1046                db(db.tt.aa.contains("AAA", case_sensitive=False)).count(), 2
1047            )
1048            self.assertEqual(
1049                db(db.tt.bb.contains("A", case_sensitive=False)).count(), 3
1050            )
1051        db.tt.drop()
1052
1053        # integers in string fields
1054        db.define_table(
1055            "tt",
1056            Field("aa", "list:string"),
1057            Field("bb", "string"),
1058            Field("cc", "integer"),
1059        )
1060        self.assertEqual(db.tt.insert(aa=["123", "456"], bb="123", cc=12), 1)
1061        if not IS_TERADATA:
1062            self.assertEqual(db.tt.insert(aa=["124", "456"], bb="123", cc=123), 2)
1063            self.assertEqual(db.tt.insert(aa=["125", "457"], bb="23", cc=125), 3)
1064        else:
1065            self.assertEqual(db.tt.insert(aa=["124", "456"], bb="123", cc=123), 1)
1066            self.assertEqual(db.tt.insert(aa=["125", "457"], bb="23", cc=125), 1)
1067        self.assertEqual(db(db.tt.aa.contains(123)).count(), 1)
1068        self.assertEqual(db(db.tt.aa.contains(23)).count(), 0)
1069        self.assertEqual(db(db.tt.aa.contains(db.tt.cc)).count(), 1)
1070        self.assertEqual(db(db.tt.bb.contains(123)).count(), 2)
1071        self.assertEqual(db(db.tt.bb.contains(23)).count(), 3)
1072        self.assertEqual(db(db.tt.bb.contains(db.tt.cc)).count(), 2)
1073        db.tt.drop()
1074
1075        # string field contains string field
1076        db.define_table("tt", Field("aa"), Field("bb"))
1077        db.tt.insert(aa="aaa", bb="%aaa")
1078        db.tt.insert(aa="aaa", bb="aaa")
1079        self.assertEqual(db(db.tt.aa.contains(db.tt.bb)).count(), 1)
1080        db.tt.drop()
1081
1082        # escaping
1083        db.define_table("tt", Field("aa"))
1084        db.tt.insert(aa="perc%ent")
1085        db.tt.insert(aa="percent")
1086        db.tt.insert(aa="percxyzent")
1087        db.tt.insert(aa="under_score")
1088        db.tt.insert(aa="underxscore")
1089        db.tt.insert(aa="underyscore")
1090        self.assertEqual(db(db.tt.aa.contains("perc%ent")).count(), 1)
1091        self.assertEqual(db(db.tt.aa.contains("under_score")).count(), 1)
1092
1093
1094class TestLike(DALtest):
1095    def setUp(self):
1096        db = self.connect()
1097        db.define_table("tt", Field("aa"))
1098        self.assertEqual(isinstance(db.tt.insert(aa="abc"), long), True)
1099        self.db = db
1100
1101    def testRun(self):
1102        db = self.db
1103        self.assertEqual(db(db.tt.aa.like("a%")).count(), 1)
1104        self.assertEqual(db(db.tt.aa.like("%b%")).count(), 1)
1105        self.assertEqual(db(db.tt.aa.like("%c")).count(), 1)
1106        self.assertEqual(db(db.tt.aa.like("%d%")).count(), 0)
1107        self.assertEqual(db(db.tt.aa.like("ab_")).count(), 1)
1108        self.assertEqual(db(db.tt.aa.like("a_c")).count(), 1)
1109        self.assertEqual(db(db.tt.aa.like("_bc")).count(), 1)
1110
1111        self.assertEqual(db(db.tt.aa.like("A%", case_sensitive=False)).count(), 1)
1112        self.assertEqual(db(db.tt.aa.like("%B%", case_sensitive=False)).count(), 1)
1113        self.assertEqual(db(db.tt.aa.like("%C", case_sensitive=False)).count(), 1)
1114        self.assertEqual(db(db.tt.aa.ilike("A%")).count(), 1)
1115        self.assertEqual(db(db.tt.aa.ilike("%B%")).count(), 1)
1116        self.assertEqual(db(db.tt.aa.ilike("%C")).count(), 1)
1117
1118        # DAL maps like() (and contains(), startswith(), endswith())
1119        # to the LIKE operator, that in ANSI-SQL is case-sensitive
1120        # There are backends supporting case-sensitivity by default
1121        # and backends that needs additional care to turn
1122        # case-sensitivity on. To discern among those, let's run
1123        # this query comparing previously inserted 'abc' with 'ABC':
1124        # if the result is 0, then the backend recognizes
1125        # case-sensitivity, if 1 it isn't
1126        is_case_insensitive = db(db.tt.aa.like("%ABC%")).count()
1127        self.assertEqual(db(db.tt.aa.like("A%")).count(), is_case_insensitive)
1128        self.assertEqual(db(db.tt.aa.like("%B%")).count(), is_case_insensitive)
1129        self.assertEqual(db(db.tt.aa.like("%C")).count(), is_case_insensitive)
1130
1131    def testUpperLower(self):
1132        db = self.db
1133        self.assertEqual(db(db.tt.aa.upper().like("A%")).count(), 1)
1134        self.assertEqual(db(db.tt.aa.upper().like("%B%")).count(), 1)
1135        self.assertEqual(db(db.tt.aa.upper().like("%C")).count(), 1)
1136        self.assertEqual(db(db.tt.aa.lower().like("%c")).count(), 1)
1137
1138    def testStartsEndsWith(self):
1139        db = self.db
1140        self.assertEqual(db(db.tt.aa.startswith("a")).count(), 1)
1141        self.assertEqual(db(db.tt.aa.endswith("c")).count(), 1)
1142        self.assertEqual(db(db.tt.aa.startswith("c")).count(), 0)
1143        self.assertEqual(db(db.tt.aa.endswith("a")).count(), 0)
1144
1145    def testEscaping(self):
1146        db = self.db
1147        term = "ahbc".replace("h", "\\")  # funny but to avoid any doubts...
1148        db.tt.insert(aa="a%bc")
1149        db.tt.insert(aa="a_bc")
1150        db.tt.insert(aa=term)
1151        self.assertEqual(db(db.tt.aa.like("%ax%bc%", escape="x")).count(), 1)
1152        self.assertEqual(db(db.tt.aa.like("%ax_bc%", escape="x")).count(), 1)
1153        self.assertEqual(db(db.tt.aa.like("%" + term + "%")).count(), 1)
1154        db(db.tt.id > 0).delete()
1155        # test "literal" like, i.e. exactly as LIKE in the backend
1156        db.tt.insert(aa="perc%ent")
1157        db.tt.insert(aa="percent")
1158        db.tt.insert(aa="percxyzent")
1159        db.tt.insert(aa="under_score")
1160        db.tt.insert(aa="underxscore")
1161        db.tt.insert(aa="underyscore")
1162        self.assertEqual(db(db.tt.aa.like("%perc%ent%")).count(), 3)
1163        self.assertEqual(db(db.tt.aa.like("%under_score%")).count(), 3)
1164        db(db.tt.id > 0).delete()
1165        # escaping with startswith and endswith
1166        db.tt.insert(aa="%percent")
1167        db.tt.insert(aa="xpercent")
1168        db.tt.insert(aa="discount%")
1169        db.tt.insert(aa="discountx")
1170        self.assertEqual(db(db.tt.aa.endswith("discount%")).count(), 1)
1171        self.assertEqual(db(db.tt.aa.like("discount%%")).count(), 2)
1172        self.assertEqual(db(db.tt.aa.startswith("%percent")).count(), 1)
1173        self.assertEqual(db(db.tt.aa.like("%%percent")).count(), 2)
1174
1175    @unittest.skipIf(IS_MSSQL, "No Regexp on MSSQL")
1176    def testRegexp(self):
1177        db = self.db
1178        db(db.tt.id > 0).delete()
1179        db.tt.insert(aa="%percent")
1180        db.tt.insert(aa="xpercent")
1181        db.tt.insert(aa="discount%")
1182        db.tt.insert(aa="discountx")
1183        try:
1184            self.assertEqual(db(db.tt.aa.regexp("count")).count(), 2)
1185        except NotImplementedError:
1186            pass
1187        else:
1188            self.assertEqual(db(db.tt.aa.lower().regexp("count")).count(), 2)
1189            self.assertEqual(
1190                db(
1191                    db.tt.aa.upper().regexp("COUNT") & db.tt.aa.lower().regexp("count")
1192                ).count(),
1193                2,
1194            )
1195            self.assertEqual(
1196                db(
1197                    db.tt.aa.upper().regexp("COUNT") | (db.tt.aa.lower() == "xpercent")
1198                ).count(),
1199                3,
1200            )
1201
1202    def testLikeInteger(self):
1203        db = self.db
1204        db.tt.drop()
1205        db.define_table("tt", Field("aa", "integer"))
1206        self.assertEqual(isinstance(db.tt.insert(aa=1111111111), long), True)
1207        self.assertEqual(isinstance(db.tt.insert(aa=1234567), long), True)
1208        self.assertEqual(db(db.tt.aa.like("1%")).count(), 2)
1209        self.assertEqual(db(db.tt.aa.like("1_3%")).count(), 1)
1210        self.assertEqual(db(db.tt.aa.like("2%")).count(), 0)
1211        self.assertEqual(db(db.tt.aa.like("_2%")).count(), 1)
1212        self.assertEqual(db(db.tt.aa.like("12%")).count(), 1)
1213        self.assertEqual(db(db.tt.aa.like("012%")).count(), 0)
1214        self.assertEqual(db(db.tt.aa.like("%45%")).count(), 1)
1215        self.assertEqual(db(db.tt.aa.like("%54%")).count(), 0)
1216
1217
1218class TestDatetime(DALtest):
1219    def testRun(self):
1220        db = self.connect()
1221        db.define_table("tt", Field("aa", "datetime"))
1222        self.assertEqual(db.tt.insert(aa=datetime.datetime(1971, 12, 21, 11, 30)), 1)
1223        self.assertEqual(db.tt.insert(aa=datetime.datetime(1971, 11, 21, 10, 30)), 2)
1224        self.assertEqual(db.tt.insert(aa=datetime.datetime(1970, 12, 21, 9, 31)), 3)
1225        self.assertEqual(
1226            db(db.tt.aa == datetime.datetime(1971, 12, 21, 11, 30)).count(), 1
1227        )
1228        self.assertEqual(db(db.tt.aa.year() == 1971).count(), 2)
1229        self.assertEqual(db(db.tt.aa.month() > 11).count(), 2)
1230        self.assertEqual(db(db.tt.aa.day() >= 21).count(), 3)
1231        self.assertEqual(db(db.tt.aa.hour() < 10).count(), 1)
1232        self.assertEqual(db(db.tt.aa.minutes() <= 30).count(), 2)
1233        self.assertEqual(db(db.tt.aa.seconds() != 31).count(), 3)
1234        self.assertEqual(db(db.tt.aa.epoch() < 365 * 24 * 3600).delete(), 1)
1235        db.tt.drop()
1236
1237        # pure TIME types without dates are not possible in Oracle
1238        if not IS_ORACLE:
1239            db.define_table("tt", Field("aa", "time"))
1240            t0 = datetime.time(10, 30, 55)
1241            db.tt.insert(aa=t0)
1242            self.assertEqual(db().select(db.tt.aa)[0].aa, t0)
1243            db.tt.drop()
1244
1245        db.define_table("tt", Field("aa", "date"))
1246        t0 = datetime.date.today()
1247        db.tt.insert(aa=t0)
1248        self.assertEqual(db().select(db.tt.aa)[0].aa, t0)
1249
1250
1251class TestExpressions(DALtest):
1252    @unittest.skipIf(IS_POSTGRESQL, "PG8000 does not like these")
1253    def testRun(self):
1254        db = self.connect()
1255        db.define_table(
1256            "tt", Field("aa", "integer"), Field("bb", "integer"), Field("cc")
1257        )
1258        self.assertEqual(db.tt.insert(aa=1, bb=0), 1)
1259        self.assertEqual(db.tt.insert(aa=2, bb=0), 2)
1260        self.assertEqual(db.tt.insert(aa=3, bb=0), 3)
1261
1262        # test update
1263        self.assertEqual(db(db.tt.aa == 3).update(aa=db.tt.aa + 1, bb=db.tt.bb + 2), 1)
1264        self.assertEqual(db(db.tt.aa == 4).count(), 1)
1265        self.assertEqual(db(db.tt.bb == 2).count(), 1)
1266        self.assertEqual(db(db.tt.aa == -2).count(), 0)
1267        self.assertEqual(db(db.tt.aa == 4).update(aa=db.tt.aa * 2, bb=5), 1)
1268        self.assertEqual(db(db.tt.bb == 5).count(), 1)
1269        self.assertEqual(db(db.tt.aa + 1 == 9).count(), 1)
1270        self.assertEqual(db(db.tt.aa + 1 == 9).update(aa=db.tt.aa - 2, cc="cc"), 1)
1271        self.assertEqual(db(db.tt.cc == "cc").count(), 1)
1272        self.assertEqual(db(db.tt.aa == 6).count(), 1)
1273        self.assertEqual(db(db.tt.aa == 6).update(bb=db.tt.aa * (db.tt.bb - 3)), 1)
1274        self.assertEqual(db(db.tt.bb == 12).count(), 1)
1275        self.assertEqual(db(db.tt.aa == 6).count(), 1)
1276        self.assertEqual(
1277            db(db.tt.aa == 6).update(aa=db.tt.aa % 4 + 1, cc=db.tt.cc + "1" + "1"), 1
1278        )
1279        self.assertEqual(db(db.tt.cc == "cc11").count(), 1)
1280        self.assertEqual(db(db.tt.aa == 3).count(), 1)
1281
1282        # test comparsion expression based count
1283        self.assertEqual(db(db.tt.aa != db.tt.aa).count(), 0)
1284        self.assertEqual(db(db.tt.aa == db.tt.aa).count(), 3)
1285
1286        # test select aggregations
1287        sum = (db.tt.aa + 1).sum()
1288        self.assertEqual(db(db.tt.aa + 1 >= 3).select(sum).first()[sum], 7)
1289        self.assertEqual(db((1 == 0) & (db.tt.aa >= db.tt.aa)).count(), 0)
1290        self.assertEqual(db(db.tt.aa * 2 == -2).select(sum).first()[sum], None)
1291
1292        count = db.tt.aa.count()
1293        avg = db.tt.aa.avg()
1294        min = db.tt.aa.min()
1295        max = db.tt.aa.max()
1296        result = db(db.tt).select(sum, count, avg, min, max).first()
1297        self.assertEqual(result[sum], 9)
1298        self.assertEqual(result[count], 3)
1299        self.assertEqual(result[avg], 2)
1300        self.assertEqual(result[min], 1)
1301        self.assertEqual(result[max], 3)
1302
1303        # Test basic expressions evaluated at python level
1304        self.assertEqual(db((1 == 1) & (db.tt.aa >= 2)).count(), 2)
1305        self.assertEqual(db((1 == 1) | (db.tt.aa >= 2)).count(), 3)
1306        self.assertEqual(db((1 == 0) & (db.tt.aa >= 2)).count(), 0)
1307        self.assertEqual(db((1 == 0) | (db.tt.aa >= 2)).count(), 2)
1308
1309        # test abs()
1310        self.assertEqual(db(db.tt.aa == 2).update(aa=db.tt.aa * -10), 1)
1311        abs = db.tt.aa.abs().with_alias("abs")
1312        result = db(db.tt.aa == -20).select(abs).first()
1313        self.assertEqual(result[abs], 20)
1314        self.assertEqual(result["abs"], 20)
1315        abs = db.tt.aa.abs() / 10 + 5
1316        exp = abs.min() * 2 + 1
1317        result = db(db.tt.aa == -20).select(exp).first()
1318        self.assertEqual(result[exp], 15)
1319
1320        # test case()
1321        condition = db.tt.aa > 2
1322        case = condition.case(db.tt.aa + 2, db.tt.aa - 2)
1323        my_case = case.with_alias("my_case")
1324        result = db().select(my_case)
1325        self.assertEqual(len(result), 3)
1326        self.assertEqual(result[0][my_case], -1)
1327        self.assertEqual(result[0]["my_case"], -1)
1328        self.assertEqual(result[1]["my_case"], -22)
1329        self.assertEqual(result[2]["my_case"], 5)
1330
1331        # test expression based delete
1332        self.assertEqual(db(db.tt.aa + 1 >= 4).count(), 1)
1333        self.assertEqual(db(db.tt.aa + 1 >= 4).delete(), 1)
1334        self.assertEqual(db(db.tt.aa).count(), 2)
1335
1336    def testUpdate(self):
1337        db = self.connect()
1338
1339        # some db's only support seconds
1340        datetime_datetime_today = datetime.datetime.today()
1341        datetime_datetime_today = datetime_datetime_today.replace(microsecond=0)
1342        one_day = datetime.timedelta(1)
1343        one_sec = datetime.timedelta(0, 1)
1344
1345        update_vals = (
1346            ("string", "x", "y"),
1347            ("text", "x", "y"),
1348            ("password", "x", "y"),
1349            ("integer", 1, 2),
1350            ("bigint", 1, 2),
1351            ("float", 1.0, 2.0),
1352            ("double", 1.0, 2.0),
1353            ("boolean", True, False),
1354            ("date", datetime.date.today(), datetime.date.today() + one_day),
1355            (
1356                "datetime",
1357                datetime.datetime(1971, 12, 21, 10, 30, 55, 0),
1358                datetime_datetime_today,
1359            ),
1360            (
1361                "time",
1362                datetime_datetime_today.time(),
1363                (datetime_datetime_today + one_sec).time(),
1364            ),
1365        )
1366
1367        for uv in update_vals:
1368            if IS_ORACLE and (uv[0] == "time" or uv[0] == "text"):
1369                # oracle can have problems with this test on CLOBs
1370                # and date-less "time" type is not supported
1371                continue
1372            db.define_table("tt", Field("aa", "integer", default=0), Field("bb", uv[0]))
1373            self.assertTrue(isinstance(db.tt.insert(bb=uv[1]), long))
1374            self.assertEqual(db(db.tt.aa + 1 == 1).select(db.tt.bb)[0].bb, uv[1])
1375            self.assertEqual(db(db.tt.aa + 1 == 1).update(bb=uv[2]), 1)
1376            self.assertEqual(db(db.tt.aa / 3 == 0).select(db.tt.bb)[0].bb, uv[2])
1377            db.tt.drop()
1378
1379    def testSubstring(self):
1380        db = self.connect()
1381        t0 = db.define_table("t0", Field("name"))
1382        input_name = "web2py"
1383        t0.insert(name=input_name)
1384        exp_slice = t0.name.lower()[4:6]
1385        exp_slice_no_max = t0.name.lower()[4:]
1386        exp_slice_neg_max = t0.name.lower()[2:-2]
1387        exp_slice_neg_start = t0.name.lower()[-2:]
1388        exp_item = t0.name.lower()[3]
1389        out = (
1390            db(t0)
1391            .select(
1392                exp_slice,
1393                exp_item,
1394                exp_slice_no_max,
1395                exp_slice_neg_max,
1396                exp_slice_neg_start,
1397            )
1398            .first()
1399        )
1400        self.assertEqual(out[exp_slice], input_name[4:6])
1401        self.assertEqual(out[exp_item], input_name[3])
1402        self.assertEqual(out[exp_slice_no_max], input_name[4:])
1403        self.assertEqual(out[exp_slice_neg_max], input_name[2:-2])
1404        self.assertEqual(out[exp_slice_neg_start], input_name[-2:])
1405
1406    def testOps(self):
1407        db = self.connect()
1408        t0 = db.define_table("t0", Field("vv", "integer"))
1409        self.assertEqual(db.t0.insert(vv=1), 1)
1410        self.assertEqual(db.t0.insert(vv=2), 2)
1411        self.assertEqual(db.t0.insert(vv=3), 3)
1412        sum = db.t0.vv.sum()
1413        count = db.t0.vv.count()
1414        avg = db.t0.vv.avg()
1415        op = sum / count
1416        op1 = (sum / count).with_alias("tot")
1417        self.assertEqual(db(t0).select(op).first()[op], 2)
1418        self.assertEqual(db(t0).select(op1).first()[op1], 2)
1419        print("DICT", db(t0).select(op1).as_dict())
1420        self.assertEqual(db(t0).select(op1).first()["tot"], 2)
1421        op2 = avg * count
1422        self.assertEqual(db(t0).select(op2).first()[op2], 6)
1423        # the following is not possible at least on sqlite
1424        sum = db.t0.vv.sum().with_alias("s")
1425        count = db.t0.vv.count().with_alias("c")
1426        op = sum / count
1427        # self.assertEqual(db(t0).select(op).first()[op], 2)
1428
1429
1430class TestTableAliasing(DALtest):
1431    def testRun(self):
1432        db = self.connect()
1433        db.define_table("t1", Field("aa"))
1434        db.define_table(
1435            "t2",
1436            Field("pk", type="id", unique=True, notnull=True),
1437            Field("bb", type="integer"),
1438            rname="tt",
1439        )
1440        tab1 = db.t1.with_alias("test1")
1441        tab2 = db.t2.with_alias("test2")
1442        self.assertIs(tab2.id, tab2.pk)
1443        self.assertIs(tab2._id, tab2.pk)
1444        self.assertEqual(tab1._dalname, "t1")
1445        self.assertEqual(tab1._tablename, "test1")
1446        self.assertEqual(tab2._dalname, "t2")
1447        self.assertEqual(tab2._tablename, "test2")
1448        self.assertEqual(tab2._rname, "tt")
1449        tab1.insert(aa="foo")
1450        tab1.insert(aa="bar")
1451        result = db(tab1).select(tab1.aa, orderby=tab1.aa)
1452        self.assertEqual(result.as_list(), [{"aa": "bar"}, {"aa": "foo"}])
1453
1454        if not IS_SQLITE:
1455            db(tab1.aa == "foo").update(aa="baz")
1456            result = db(tab1).select(tab1.aa, orderby=tab1.aa)
1457            self.assertEqual(result.as_list(), [{"aa": "bar"}, {"aa": "baz"}])
1458            db(tab1.aa == "bar").delete()
1459            result = db(tab1).select(tab1.aa, orderby=tab1.aa)
1460            self.assertEqual(result.as_list(), [{"aa": "baz"}])
1461        else:
1462            with self.assertRaises(SyntaxError):
1463                db(tab1.aa == "foo").update(aa="baz")
1464            with self.assertRaises(SyntaxError):
1465                db(tab1.aa == "bar").delete()
1466
1467        tab2.insert(bb=123)
1468        tab2.insert(bb=456)
1469        result = db(tab2).select(tab2.bb, orderby=tab2.bb)
1470        self.assertEqual(result.as_list(), [{"bb": 123}, {"bb": 456}])
1471
1472        if not IS_SQLITE:
1473            db(tab2.bb == 456).update(bb=789)
1474            result = db(tab2).select(tab2.bb, orderby=tab2.bb)
1475            self.assertEqual(result.as_list(), [{"bb": 123}, {"bb": 789}])
1476            db(tab2.bb == 123).delete()
1477            result = db(tab2).select(tab2.bb, orderby=tab2.bb)
1478            self.assertEqual(result.as_list(), [{"bb": 789}])
1479        else:
1480            with self.assertRaises(SyntaxError):
1481                db(tab2.bb == 456).update(bb=789)
1482            with self.assertRaises(SyntaxError):
1483                db(tab2.bb == 123).delete()
1484
1485
1486class TestJoin(DALtest):
1487    def testRun(self):
1488        db = self.connect()
1489        db.define_table("t1", Field("aa"))
1490        db.define_table("t2", Field("aa"), Field("b", db.t1))
1491        i1 = db.t1.insert(aa="1")
1492        i2 = db.t1.insert(aa="2")
1493        i3 = db.t1.insert(aa="3")
1494        db.t2.insert(aa="4", b=i1)
1495        db.t2.insert(aa="5", b=i2)
1496        db.t2.insert(aa="6", b=i2)
1497        self.assertEqual(
1498            len(db(db.t1.id == db.t2.b).select(orderby=db.t1.aa | db.t2.aa)), 3
1499        )
1500        self.assertEqual(
1501            db(db.t1.id == db.t2.b).select(orderby=db.t1.aa | db.t2.aa)[2].t1.aa, "2"
1502        )
1503        self.assertEqual(
1504            db(db.t1.id == db.t2.b).select(orderby=db.t1.aa | db.t2.aa)[2].t2.aa, "6"
1505        )
1506        self.assertEqual(
1507            len(
1508                db().select(
1509                    db.t1.ALL,
1510                    db.t2.ALL,
1511                    left=db.t2.on(db.t1.id == db.t2.b),
1512                    orderby=db.t1.aa | db.t2.aa,
1513                )
1514            ),
1515            4,
1516        )
1517        self.assertEqual(
1518            db()
1519            .select(
1520                db.t1.ALL,
1521                db.t2.ALL,
1522                left=db.t2.on(db.t1.id == db.t2.b),
1523                orderby=db.t1.aa | db.t2.aa,
1524            )[2]
1525            .t1.aa,
1526            "2",
1527        )
1528        self.assertEqual(
1529            db()
1530            .select(
1531                db.t1.ALL,
1532                db.t2.ALL,
1533                left=db.t2.on(db.t1.id == db.t2.b),
1534                orderby=db.t1.aa | db.t2.aa,
1535            )[2]
1536            .t2.aa,
1537            "6",
1538        )
1539        self.assertEqual(
1540            db()
1541            .select(
1542                db.t1.ALL,
1543                db.t2.ALL,
1544                left=db.t2.on(db.t1.id == db.t2.b),
1545                orderby=db.t1.aa | db.t2.aa,
1546            )[3]
1547            .t1.aa,
1548            "3",
1549        )
1550        self.assertEqual(
1551            db()
1552            .select(
1553                db.t1.ALL,
1554                db.t2.ALL,
1555                left=db.t2.on(db.t1.id == db.t2.b),
1556                orderby=db.t1.aa | db.t2.aa,
1557            )[3]
1558            .t2.aa,
1559            None,
1560        )
1561        self.assertEqual(
1562            len(
1563                db().select(
1564                    db.t1.aa,
1565                    db.t2.id.count(),
1566                    left=db.t2.on(db.t1.id == db.t2.b),
1567                    orderby=db.t1.aa,
1568                    groupby=db.t1.aa,
1569                )
1570            ),
1571            3,
1572        )
1573        self.assertEqual(
1574            db()
1575            .select(
1576                db.t1.aa,
1577                db.t2.id.count(),
1578                left=db.t2.on(db.t1.id == db.t2.b),
1579                orderby=db.t1.aa,
1580                groupby=db.t1.aa,
1581            )[0]
1582            ._extra[db.t2.id.count()],
1583            1,
1584        )
1585        self.assertEqual(
1586            db()
1587            .select(
1588                db.t1.aa,
1589                db.t2.id.count(),
1590                left=db.t2.on(db.t1.id == db.t2.b),
1591                orderby=db.t1.aa,
1592                groupby=db.t1.aa,
1593            )[1]
1594            ._extra[db.t2.id.count()],
1595            2,
1596        )
1597        self.assertEqual(
1598            db()
1599            .select(
1600                db.t1.aa,
1601                db.t2.id.count(),
1602                left=db.t2.on(db.t1.id == db.t2.b),
1603                orderby=db.t1.aa,
1604                groupby=db.t1.aa,
1605            )[2]
1606            ._extra[db.t2.id.count()],
1607            0,
1608        )
1609        db.t2.drop()
1610        db.t1.drop()
1611
1612        db.define_table("person", Field("name"))
1613        id = db.person.insert(name="max")
1614        self.assertEqual(id.name, "max")
1615        db.define_table("dog", Field("name"), Field("ownerperson", "reference person"))
1616        db.dog.insert(name="skipper", ownerperson=1)
1617        row = db(db.person.id == db.dog.ownerperson).select().first()
1618        self.assertEqual(row[db.person.name], "max")
1619        self.assertEqual(row["person.name"], "max")
1620        db.dog.drop()
1621        self.assertEqual(len(db.person._referenced_by), 0)
1622
1623
1624class TestMinMaxSumAvg(DALtest):
1625    def testRun(self):
1626        db = self.connect()
1627        db.define_table("tt", Field("aa", "integer"))
1628        self.assertEqual(db.tt.insert(aa=1), 1)
1629        self.assertEqual(db.tt.insert(aa=2), 2)
1630        self.assertEqual(db.tt.insert(aa=3), 3)
1631        s = db.tt.aa.min()
1632        self.assertEqual(db(db.tt.id > 0).select(s)[0]._extra[s], 1)
1633        self.assertEqual(db(db.tt.id > 0).select(s).first()[s], 1)
1634        self.assertEqual(db().select(s).first()[s], 1)
1635        s = db.tt.aa.max()
1636        self.assertEqual(db().select(s).first()[s], 3)
1637        s = db.tt.aa.sum()
1638        self.assertEqual(db().select(s).first()[s], 6)
1639        s = db.tt.aa.count()
1640        self.assertEqual(db().select(s).first()[s], 3)
1641        s = db.tt.aa.avg()
1642        self.assertEqual(db().select(s).first()[s], 2)
1643
1644
1645class TestMigrations(unittest.TestCase):
1646    def testRun(self):
1647        db = DAL(DEFAULT_URI, check_reserved=["all"])
1648        db.define_table("tt", Field("aa"), Field("BB"), migrate=".storage.table")
1649        db.define_table(
1650            "t1", Field("aa"), Field("BB"), migrate=".storage.rname", rname="foo"
1651        )
1652        db.commit()
1653        db.close()
1654        db = DAL(DEFAULT_URI, check_reserved=["all"])
1655        db.define_table("tt", Field("aa"), migrate=".storage.table")
1656        db.define_table("t1", Field("aa"), migrate=".storage.rname", rname="foo")
1657        db.commit()
1658        db.close()
1659        db = DAL(DEFAULT_URI, check_reserved=["all"])
1660        db.define_table("tt", Field("aa"), Field("b"), migrate=".storage.table")
1661        db.define_table(
1662            "t1", Field("aa"), Field("b"), migrate=".storage.rname", rname="foo"
1663        )
1664        db.commit()
1665        db.close()
1666        db = DAL(DEFAULT_URI, check_reserved=["all"])
1667        db.define_table("tt", Field("aa"), Field("b", "text"), migrate=".storage.table")
1668        db.define_table(
1669            "t1", Field("aa"), Field("b", "text"), migrate=".storage.rname", rname="foo"
1670        )
1671        db.commit()
1672        db.close()
1673        db = DAL(DEFAULT_URI, check_reserved=["all"])
1674        db.define_table("tt", Field("aa"), migrate=".storage.table")
1675        db.define_table("t1", Field("aa"), migrate=".storage.rname", rname="foo")
1676        db.tt.drop()
1677        db.t1.drop()
1678        db.commit()
1679        db.close()
1680
1681    def testFieldRName(self):
1682        def checkWrite(db, table, data):
1683            rowid = table.insert(**data)
1684            query = table._id == rowid
1685            fields = [table[x] for x in data.keys()]
1686            row = db(query).select(*fields).first()
1687            self.assertIsNot(row, None)
1688            self.assertEqual(row.as_dict(), data)
1689            db(query).delete()
1690
1691        # Create tables
1692        db = DAL(DEFAULT_URI, check_reserved=["all"])
1693        db.define_table(
1694            "tt",
1695            Field("aa", rname="faa"),
1696            Field("BB", rname="fbb"),
1697            migrate=".storage.table",
1698        )
1699        db.define_table(
1700            "t1",
1701            Field("aa", rname="faa"),
1702            Field("BB", rname="fbb"),
1703            migrate=".storage.rname",
1704            rname="foo",
1705        )
1706        data = dict(aa="aa1", BB="BB1")
1707        checkWrite(db, db.tt, data)
1708        checkWrite(db, db.t1, data)
1709        db.commit()
1710        db.close()
1711
1712        # Drop field defined by CREATE TABLE
1713        db = DAL(DEFAULT_URI, check_reserved=["all"])
1714        db.define_table("tt", Field("aa", rname="faa"), migrate=".storage.table")
1715        db.define_table(
1716            "t1", Field("aa", rname="faa"), migrate=".storage.rname", rname="foo"
1717        )
1718        data = dict(aa="aa2")
1719        checkWrite(db, db.tt, data)
1720        checkWrite(db, db.t1, data)
1721        db.commit()
1722        db.close()
1723
1724        # Add new field
1725        db = DAL(DEFAULT_URI, check_reserved=["all"])
1726        db.define_table(
1727            "tt",
1728            Field("aa", rname="faa"),
1729            Field("b", rname="fb"),
1730            migrate=".storage.table",
1731        )
1732        db.define_table(
1733            "t1",
1734            Field("aa", rname="faa"),
1735            Field("b", rname="fb"),
1736            migrate=".storage.rname",
1737            rname="foo",
1738        )
1739        data = dict(aa="aa3", b="b3")
1740        integrity = dict(aa="data", b="integrity")
1741        checkWrite(db, db.tt, data)
1742        checkWrite(db, db.t1, data)
1743        db.tt.insert(**integrity)
1744        db.t1.insert(**integrity)
1745        db.commit()
1746        db.close()
1747
1748        if not IS_SQLITE:
1749
1750            # Change field type
1751            db = DAL(DEFAULT_URI, check_reserved=["all"])
1752            db.define_table(
1753                "tt",
1754                Field("aa", rname="faa"),
1755                Field("b", "text", rname="fb"),
1756                migrate=".storage.table",
1757            )
1758            db.define_table(
1759                "t1",
1760                Field("aa", rname="faa"),
1761                Field("b", "text", rname="fb"),
1762                migrate=".storage.rname",
1763                rname="foo",
1764            )
1765            data = dict(aa="aa4", b="b4")
1766            checkWrite(db, db.tt, data)
1767            checkWrite(db, db.t1, data)
1768            row = db(db.tt).select(*[db.tt[x] for x in integrity.keys()]).first()
1769            self.assertIsNot(row, None)
1770            self.assertEqual(row.as_dict(), integrity)
1771            row2 = db(db.t1).select(*[db.t1[x] for x in integrity.keys()]).first()
1772            self.assertIsNot(row2, None)
1773            self.assertEqual(row2.as_dict(), integrity)
1774            db.commit()
1775            db.close()
1776
1777        if not IS_SQLITE:
1778
1779            # Change field rname
1780            db = DAL(DEFAULT_URI, check_reserved=["all"])
1781            db.define_table(
1782                "tt",
1783                Field("aa", rname="faa"),
1784                Field("b", "text", rname="xb"),
1785                migrate=".storage.table",
1786            )
1787            db.define_table(
1788                "t1",
1789                Field("aa", rname="faa"),
1790                Field("b", "text", rname="xb"),
1791                migrate=".storage.rname",
1792                rname="foo",
1793            )
1794            data = dict(aa="aa4", b="b4")
1795            checkWrite(db, db.tt, data)
1796            checkWrite(db, db.t1, data)
1797            row = db(db.tt).select(*[db.tt[x] for x in integrity.keys()]).first()
1798            self.assertIsNot(row, None)
1799            self.assertEqual(row.as_dict(), integrity)
1800            row2 = db(db.t1).select(*[db.t1[x] for x in integrity.keys()]).first()
1801            self.assertIsNot(row2, None)
1802            self.assertEqual(row2.as_dict(), integrity)
1803            db.commit()
1804            db.close()
1805
1806        # Drop field defined by ALTER TABLE
1807        db = DAL(DEFAULT_URI, check_reserved=["all"])
1808        db.define_table("tt", Field("aa", rname="faa"), migrate=".storage.table")
1809        db.define_table(
1810            "t1", Field("aa", rname="faa"), migrate=".storage.rname", rname="foo"
1811        )
1812        data = dict(aa="aa5")
1813        checkWrite(db, db.tt, data)
1814        checkWrite(db, db.t1, data)
1815        db.tt.drop()
1816        db.t1.drop()
1817        db.commit()
1818        db.close()
1819
1820    def tearDown(self):
1821        if os.path.exists(".storage.db"):
1822            os.unlink(".storage.db")
1823        if os.path.exists(".storage.table"):
1824            os.unlink(".storage.table")
1825        if os.path.exists(".storage.rname"):
1826            os.unlink(".storage.rname")
1827
1828
1829class TestReference(DALtest):
1830    def testRun(self):
1831        scenarios = (
1832            (True, "CASCADE"),
1833            (False, "CASCADE"),
1834            (False, "SET NULL"),
1835        )
1836        for (b, ondelete) in scenarios:
1837            db = self.connect(bigint_id=b)
1838            if DEFAULT_URI.startswith("mssql"):
1839                # multiple cascade gotcha
1840                for key in ["reference", "reference FK"]:
1841                    db._adapter.types[key] = db._adapter.types[key].replace(
1842                        "%(on_delete_action)s", "NO ACTION"
1843                    )
1844            db.define_table(
1845                "tt", Field("name"), Field("aa", "reference tt", ondelete=ondelete)
1846            )
1847            db.commit()
1848            x = db.tt.insert(name="xxx")
1849            self.assertEqual(x.id, 1)
1850            self.assertEqual(x["id"], 1)
1851            x.aa = x
1852            self.assertEqual(x.aa, 1)
1853            x.update_record()
1854            x1 = db.tt[1]
1855            self.assertEqual(x1.aa, 1)
1856            self.assertEqual(x1.aa.aa.aa.aa.aa.aa.name, "xxx")
1857            y = db.tt.insert(name="yyy", aa=x1)
1858            self.assertEqual(y.aa, x1.id)
1859
1860            if not DEFAULT_URI.startswith("mssql"):
1861                self.assertEqual(db.tt.insert(name="zzz"), 3)
1862                self.assertEqual(db(db.tt.name).count(), 3)
1863                db(db.tt.id == x).delete()
1864                expected_count = {
1865                    "SET NULL": 2,
1866                    "NO ACTION": 2,
1867                    "CASCADE": 1,
1868                }
1869                self.assertEqual(db(db.tt.name).count(), expected_count[ondelete])
1870                if ondelete == "SET NULL":
1871                    self.assertEqual(db(db.tt.name == "yyy").select()[0].aa, None)
1872
1873            self.tearDown()
1874
1875
1876class TestClientLevelOps(DALtest):
1877    def testRun(self):
1878        db = self.connect()
1879        db.define_table(
1880            "tt",
1881            Field("aa", represent=lambda x, r: "x" + x),
1882            Field("bb", type="integer", represent=lambda x, r: "y" + str(x)),
1883        )
1884        db.commit()
1885        db.tt.insert(aa="test", bb=1)
1886        rows1 = db(db.tt.id < 0).select()
1887        rows2 = db(db.tt.id > 0).select()
1888        self.assertNotEqual(rows1, rows2)
1889        rows1 = db(db.tt.id > 0).select()
1890        rows2 = db(db.tt.id > 0).select()
1891        self.assertEqual(rows1, rows2)
1892        rows3 = rows1 + rows2
1893        self.assertEqual(len(rows3), 2)
1894        rows4 = rows1 | rows2
1895        self.assertEqual(len(rows4), 1)
1896        rows5 = rows1 & rows2
1897        self.assertEqual(len(rows5), 1)
1898        rows6 = rows1.find(lambda row: row.aa == "test")
1899        self.assertEqual(len(rows6), 1)
1900        rows7 = rows2.exclude(lambda row: row.aa == "test")
1901        self.assertEqual(len(rows7), 1)
1902        rows8 = rows5.sort(lambda row: row.aa)
1903        self.assertEqual(len(rows8), 1)
1904
1905        def represent(f, v, r):
1906            return "z" + str(v)
1907
1908        db.representers = {
1909            "rows_render": represent,
1910        }
1911        db.tt.insert(aa="foo", bb=2)
1912        rows = db(db.tt.id > 0).select()
1913        exp1 = [
1914            Row(aa="ztest", bb="z1", id=rows[0]["id"]),
1915            Row(aa="zfoo", bb="z2", id=rows[1]["id"]),
1916        ]
1917        exp2 = [
1918            Row(aa="ztest", bb=1, id=rows[0]["id"]),
1919            Row(aa="zfoo", bb=2, id=rows[1]["id"]),
1920        ]
1921        exp3 = [
1922            Row(aa="test", bb="z1", id=rows[0]["id"]),
1923            Row(aa="foo", bb="z2", id=rows[1]["id"]),
1924        ]
1925        self.assertEqual(rows.render(i=0), exp1[0])
1926        self.assertEqual(rows.render(i=0, fields=[db.tt.aa, db.tt.bb]), exp1[0])
1927        self.assertEqual(rows.render(i=0, fields=[db.tt.aa]), exp2[0])
1928        self.assertEqual(rows.render(i=0, fields=[db.tt.bb]), exp3[0])
1929        self.assertEqual(list(rows.render()), exp1)
1930        self.assertEqual(list(rows.render(fields=[db.tt.aa, db.tt.bb])), exp1)
1931        self.assertEqual(list(rows.render(fields=[db.tt.aa])), exp2)
1932        self.assertEqual(list(rows.render(fields=[db.tt.bb])), exp3)
1933        ret = rows.render(i=0)
1934        rows = db(db.tt.id > 0).select()
1935        rows.compact = False
1936        row = rows[0]
1937        self.assertIn("tt", row)
1938        self.assertIn("id", row.tt)
1939        self.assertNotIn("id", row)
1940        rows.compact = True
1941        row = rows[0]
1942        self.assertNotIn("tt", row)
1943        self.assertIn("id", row)
1944
1945        rows = db(db.tt.id > 0).select(db.tt.id.max())
1946        rows.compact = False
1947        row = rows[0]
1948        self.assertNotIn("tt", row)
1949        self.assertIn("_extra", row)
1950
1951        rows = db(db.tt.id > 0).select(db.tt.id.max())
1952        rows.compact = True
1953        row = rows[0]
1954        self.assertNotIn("tt", row)
1955        self.assertIn("_extra", row)
1956        db.tt.drop()
1957
1958        db.define_table("tt", Field("aa"), Field.Virtual("bb", lambda row: ":p"))
1959        db.tt.insert(aa="test")
1960        rows = db(db.tt.id > 0).select()
1961        row = rows.first()
1962        self.assertNotIn("tt", row)
1963        self.assertIn("id", row)
1964        self.assertIn("bb", row)
1965
1966        rows.compact = False
1967        row = rows.first()
1968        self.assertIn("tt", row)
1969        self.assertEqual(len(row.keys()), 1)
1970        self.assertIn("id", row.tt)
1971        self.assertIn("bb", row.tt)
1972        self.assertNotIn("id", row)
1973        self.assertNotIn("bb", row)
1974
1975
1976class TestVirtualFields(DALtest):
1977    def testRun(self):
1978        db = self.connect()
1979        db.define_table("tt", Field("aa"))
1980        db.commit()
1981        db.tt.insert(aa="test")
1982
1983        class Compute:
1984            def a_upper(row):
1985                return row.tt.aa.upper()
1986
1987        db.tt.virtualfields.append(Compute())
1988        assert db(db.tt.id > 0).select().first().a_upper == "TEST"
1989
1990
1991class TestComputedFields(DALtest):
1992    def testRun(self):
1993        db = self.connect()
1994        db.define_table(
1995            "tt",
1996            Field("aa"),
1997            Field("bb", default="x"),
1998            Field("cc", compute=lambda r: r.aa + r.bb),
1999        )
2000        db.commit()
2001        id = db.tt.insert(aa="z")
2002        self.assertEqual(db.tt[id].cc, "zx")
2003        db.tt.drop()
2004        db.commit()
2005
2006        # test checking that a compute field can refer to earlier-defined computed fields
2007        db.define_table(
2008            "tt",
2009            Field("aa"),
2010            Field("bb", default="x"),
2011            Field("cc", compute=lambda r: r.aa + r.bb),
2012            Field("dd", compute=lambda r: r.bb + r.cc),
2013        )
2014        db.commit()
2015        id = db.tt.insert(aa="z")
2016        self.assertEqual(db.tt[id].dd, "xzx")
2017
2018
2019class TestCommonFilters(DALtest):
2020    def testRun(self):
2021        db = self.connect()
2022        db.define_table("t1", Field("aa", "integer"))
2023        db.define_table("t2", Field("aa", "integer"), Field("b", db.t1))
2024        i1 = db.t1.insert(aa=1)
2025        i2 = db.t1.insert(aa=2)
2026        i3 = db.t1.insert(aa=3)
2027        db.t2.insert(aa=4, b=i1)
2028        db.t2.insert(aa=5, b=i2)
2029        db.t2.insert(aa=6, b=i2)
2030        db.t1._common_filter = lambda q: db.t1.aa > 1
2031        self.assertEqual(db(db.t1).count(), 2)
2032        self.assertEqual(db(db.t1).count(), 2)
2033        q = db.t2.b == db.t1.id
2034        self.assertEqual(db(q).count(), 2)
2035        self.assertEqual(db(q).count(), 2)
2036        self.assertEqual(len(db(db.t1).select(left=db.t2.on(q))), 3)
2037        db.t2._common_filter = lambda q: db.t2.aa < 6
2038        self.assertEqual(db(q).count(), 1)
2039        self.assertEqual(db(q).count(), 1)
2040        self.assertEqual(len(db(db.t1).select(left=db.t2.on(q))), 2)
2041        # test delete
2042        self.assertEqual(db(db.t2).count(), 2)
2043        db(db.t2).delete()
2044        self.assertEqual(db(db.t2).count(), 0)
2045        db.t2._common_filter = None
2046        self.assertEqual(db(db.t2).count(), 1)
2047        # test update
2048        db.t2.insert(aa=4, b=i1)
2049        db.t2.insert(aa=5, b=i2)
2050        db.t2._common_filter = lambda q: db.t2.aa < 6
2051        self.assertEqual(db(db.t2).count(), 2)
2052        db(db.t2).update(aa=6)
2053        self.assertEqual(db(db.t2).count(), 0)
2054        db.t2._common_filter = None
2055        self.assertEqual(db(db.t2).count(), 3)
2056
2057
2058class TestImportExportFields(DALtest):
2059    def testRun(self):
2060        db = self.connect()
2061        db.define_table("person", Field("name"))
2062        db.define_table("pet", Field("friend", db.person), Field("name"))
2063        for n in range(2):
2064            db(db.pet).delete()
2065            db(db.person).delete()
2066            for k in range(10):
2067                id = db.person.insert(name=str(k))
2068                db.pet.insert(friend=id, name=str(k))
2069        db.commit()
2070        stream = StringIO()
2071        db.export_to_csv_file(stream)
2072        db(db.pet).delete()
2073        db(db.person).delete()
2074        stream = StringIO(stream.getvalue())
2075        db.import_from_csv_file(stream)
2076        assert (
2077            db(db.person.id == db.pet.friend)(db.person.name == db.pet.name).count()
2078            == 10
2079        )
2080
2081
2082class TestImportExportUuidFields(DALtest):
2083    def testRun(self):
2084        db = self.connect()
2085        db.define_table("person", Field("name"), Field("uuid"))
2086        db.define_table("pet", Field("friend", db.person), Field("name"))
2087        for n in range(2):
2088            db(db.pet).delete()
2089            db(db.person).delete()
2090            for k in range(10):
2091                id = db.person.insert(name=str(k), uuid=str(k))
2092                db.pet.insert(friend=id, name=str(k))
2093        db.commit()
2094        stream = StringIO()
2095        db.export_to_csv_file(stream)
2096        stream = StringIO(stream.getvalue())
2097        db.import_from_csv_file(stream)
2098        assert db(db.person).count() == 10
2099        assert (
2100            db(db.person.id == db.pet.friend)(db.person.name == db.pet.name).count()
2101            == 20
2102        )
2103
2104
2105class TestDALDictImportExport(unittest.TestCase):
2106    def testRun(self):
2107        db = DAL(DEFAULT_URI, check_reserved=["all"])
2108        db.define_table("person", Field("name", default="Michael"), Field("uuid"))
2109        db.define_table("pet", Field("friend", db.person), Field("name"))
2110        dbdict = db.as_dict(flat=True, sanitize=False)
2111        assert isinstance(dbdict, dict)
2112        uri = dbdict["uri"]
2113        assert isinstance(uri, basestring) and uri
2114        assert len(dbdict["tables"]) == 2
2115        assert len(dbdict["tables"][0]["fields"]) == 3
2116        assert dbdict["tables"][0]["fields"][1]["type"] == db.person.name.type
2117        assert dbdict["tables"][0]["fields"][1]["default"] == db.person.name.default
2118
2119        db2 = DAL(**dbdict)
2120        assert len(db.tables) == len(db2.tables)
2121        assert hasattr(db2, "pet") and isinstance(db2.pet, Table)
2122        assert hasattr(db2.pet, "friend") and isinstance(db2.pet.friend, Field)
2123        db.pet.drop()
2124        db.commit()
2125
2126        db2.commit()
2127
2128        dbjson = db.as_json(sanitize=False)
2129        assert isinstance(dbjson, basestring) and len(dbjson) > 0
2130        db3 = DAL(**json.loads(dbjson))
2131        assert hasattr(db3, "person") and hasattr(db3.person, "uuid")
2132        assert db3.person.uuid.type == db.person.uuid.type
2133        db3.person.drop()
2134        db3.commit()
2135        db3.close()
2136
2137        mpfc = "Monty Python's Flying Circus"
2138        dbdict4 = {
2139            "uri": DEFAULT_URI,
2140            "tables": [
2141                {
2142                    "tablename": "tvshow",
2143                    "fields": [
2144                        {"fieldname": "name", "default": mpfc},
2145                        {"fieldname": "rating", "type": "double"},
2146                    ],
2147                },
2148                {
2149                    "tablename": "staff",
2150                    "fields": [
2151                        {"fieldname": "name", "default": "Michael"},
2152                        {"fieldname": "food", "default": "Spam"},
2153                        {"fieldname": "tvshow", "type": "reference tvshow"},
2154                    ],
2155                },
2156            ],
2157        }
2158        db4 = DAL(**dbdict4)
2159        assert "staff" in db4.tables
2160        assert "name" in db4.staff
2161        assert db4.tvshow.rating.type == "double"
2162        assert (
2163            db4.tvshow.insert(),
2164            db4.tvshow.insert(name="Loriot"),
2165            db4.tvshow.insert(name="Il Mattatore"),
2166        ) == (1, 2, 3)
2167        assert db4(db4.tvshow).select().first().id == 1
2168        assert db4(db4.tvshow).select().first().name == mpfc
2169
2170        db4.staff.drop()
2171        db4.tvshow.drop()
2172        db4.commit()
2173
2174        dbdict5 = {"uri": DEFAULT_URI}
2175        db5 = DAL(**dbdict5)
2176        assert db5.tables in ([], None)
2177        assert not (str(db5) in ("", None))
2178
2179        dbdict6 = {
2180            "uri": DEFAULT_URI,
2181            "tables": [
2182                {"tablename": "staff"},
2183                {
2184                    "tablename": "tvshow",
2185                    "fields": [
2186                        {"fieldname": "name"},
2187                        {"fieldname": "rating", "type": "double"},
2188                    ],
2189                },
2190            ],
2191        }
2192        db6 = DAL(**dbdict6)
2193
2194        assert len(db6["staff"].fields) == 1
2195        assert "name" in db6["tvshow"].fields
2196
2197        assert db6.staff.insert() is not None
2198        assert db6(db6.staff).select().first().id == 1
2199
2200        db6.staff.drop()
2201        db6.tvshow.drop()
2202        db6.commit()
2203
2204        db.close()
2205        db2.close()
2206        db4.close()
2207        db5.close()
2208        db6.close()
2209
2210
2211class TestSelectAsDict(DALtest):
2212    def testSelect(self):
2213        db = self.connect()
2214        if IS_ORACLE:
2215            # if lowercase fieldnames desired in return, must be quoted in oracle
2216            db.define_table(
2217                "a_table", Field("b_field"), Field("a_field"),
2218            )
2219            db.a_table.insert(a_field="aa1", b_field="bb1")
2220            rtn = db.executesql(
2221                'SELECT "id", "b_field", "a_field" FROM "a_table"', as_dict=True
2222            )
2223            self.assertEqual(rtn[0]["b_field"], "bb1")
2224            rtn = db.executesql(
2225                'SELECT "id", "b_field", "a_field" FROM "a_table"', as_ordered_dict=True
2226            )
2227            self.assertEqual(rtn[0]["b_field"], "bb1")
2228            self.assertEqual(list(rtn[0].keys()), ["id", "b_field", "a_field"])
2229
2230        else:
2231            db.define_table(
2232                "a_table", Field("b_field"), Field("a_field"),
2233            )
2234            db.a_table.insert(a_field="aa1", b_field="bb1")
2235            rtn = db.executesql(
2236                "SELECT id, b_field, a_field FROM a_table", as_dict=True
2237            )
2238            self.assertEqual(rtn[0]["b_field"], "bb1")
2239            rtn = db.executesql(
2240                "SELECT id, b_field, a_field FROM a_table", as_ordered_dict=True
2241            )
2242            self.assertEqual(rtn[0]["b_field"], "bb1")
2243            self.assertEqual(list(rtn[0].keys()), ["id", "b_field", "a_field"])
2244
2245
2246class TestExecuteSQL(DALtest):
2247    def testSelect(self):
2248        if IS_ORACLE:
2249            # see note on prior test
2250            db = self.connect(DEFAULT_URI, entity_quoting=True)
2251            db.define_table(
2252                "a_table", Field("b_field"), Field("a_field"),
2253            )
2254            db.a_table.insert(a_field="aa1", b_field="bb1")
2255            rtn = db.executesql(
2256                'SELECT "id", "b_field", "a_field" FROM "a_table"', as_dict=True
2257            )
2258            self.assertEqual(rtn[0]["b_field"], "bb1")
2259            rtn = db.executesql(
2260                'SELECT "id", "b_field", "a_field" FROM "a_table"', as_ordered_dict=True
2261            )
2262            self.assertEqual(rtn[0]["b_field"], "bb1")
2263            self.assertEqual(rtn[0]["b_field"], "bb1")
2264            self.assertEqual(list(rtn[0].keys()), ["id", "b_field", "a_field"])
2265
2266            rtn = db.executesql(
2267                'select "id", "b_field", "a_field" from "a_table"', fields=db.a_table
2268            )
2269            self.assertTrue(
2270                all(x in rtn[0].keys() for x in ["id", "b_field", "a_field"])
2271            )
2272            self.assertEqual(rtn[0].b_field, "bb1")
2273
2274            rtn = db.executesql(
2275                'select "id", "b_field", "a_field" from "a_table"',
2276                fields=db.a_table,
2277                colnames=["a_table.id", "a_table.b_field", "a_table.a_field"],
2278            )
2279
2280            self.assertTrue(
2281                all(x in rtn[0].keys() for x in ["id", "b_field", "a_field"])
2282            )
2283            self.assertEqual(rtn[0].b_field, "bb1")
2284            rtn = db.executesql(
2285                'select COUNT(*) from "a_table"',
2286                fields=[db.a_table.id.count()],
2287                colnames=["foo"],
2288            )
2289            self.assertEqual(rtn[0].foo, 1)
2290
2291        if not IS_ORACLE:
2292            db = self.connect(DEFAULT_URI, entity_quoting=False)
2293            db.define_table(
2294                "a_table", Field("b_field"), Field("a_field"),
2295            )
2296            db.a_table.insert(a_field="aa1", b_field="bb1")
2297            rtn = db.executesql(
2298                "SELECT id, b_field, a_field FROM a_table", as_dict=True
2299            )
2300            self.assertEqual(rtn[0]["b_field"], "bb1")
2301            rtn = db.executesql(
2302                "SELECT id, b_field, a_field FROM a_table", as_ordered_dict=True
2303            )
2304            self.assertEqual(rtn[0]["b_field"], "bb1")
2305            self.assertEqual(rtn[0]["b_field"], "bb1")
2306            self.assertEqual(list(rtn[0].keys()), ["id", "b_field", "a_field"])
2307
2308            rtn = db.executesql(
2309                "select id, b_field, a_field from a_table", fields=db.a_table
2310            )
2311            self.assertTrue(
2312                all(x in rtn[0].keys() for x in ["id", "b_field", "a_field"])
2313            )
2314            self.assertEqual(rtn[0].b_field, "bb1")
2315
2316            rtn = db.executesql(
2317                "select id, b_field, a_field from a_table",
2318                fields=db.a_table,
2319                colnames=["a_table.id", "a_table.b_field", "a_table.a_field"],
2320            )
2321
2322            self.assertTrue(
2323                all(x in rtn[0].keys() for x in ["id", "b_field", "a_field"])
2324            )
2325            self.assertEqual(rtn[0].b_field, "bb1")
2326            rtn = db.executesql(
2327                "select COUNT(*) from a_table",
2328                fields=[db.a_table.id.count()],
2329                colnames=["foo"],
2330            )
2331            self.assertEqual(rtn[0].foo, 1)
2332
2333
2334class TestRNameTable(DALtest):
2335    # tests for highly experimental rname attribute
2336
2337    def testSelect(self):
2338        db = self.connect()
2339        rname = "a_very_complicated_tablename"
2340        if IS_ORACLE:
2341            # name size limitations
2342            rname = "a_complex_tablename"
2343        db.define_table("easy_name", Field("a_field"), rname=rname)
2344        rtn = db.easy_name.insert(a_field="a")
2345        self.assertEqual(rtn.id, 1)
2346        rtn = db(db.easy_name.a_field == "a").select()
2347        self.assertEqual(len(rtn), 1)
2348        self.assertEqual(rtn[0].id, 1)
2349        self.assertEqual(rtn[0].a_field, "a")
2350        db.easy_name.insert(a_field="b")
2351        rtn = db(db.easy_name.id > 0).delete()
2352        self.assertEqual(rtn, 2)
2353        rtn = db(db.easy_name.id > 0).count()
2354        self.assertEqual(rtn, 0)
2355        db.easy_name.insert(a_field="a")
2356        db.easy_name.insert(a_field="b")
2357        rtn = db(db.easy_name.id > 0).count()
2358        self.assertEqual(rtn, 2)
2359        rtn = db(db.easy_name.a_field == "a").update(a_field="c")
2360        rtn = db(db.easy_name.a_field == "c").count()
2361        self.assertEqual(rtn, 1)
2362        rtn = db(db.easy_name.a_field != "c").count()
2363        self.assertEqual(rtn, 1)
2364        avg = db.easy_name.id.avg()
2365        rtn = db(db.easy_name.id > 0).select(avg)
2366        self.assertEqual(rtn[0][avg], 3)
2367        rname = "this_is_the_person_table"
2368        db.define_table(
2369            "person", Field("name", default="Michael"), Field("uuid"), rname=rname
2370        )
2371        rname = "this_is_the_pet_table"
2372        db.define_table(
2373            "pet", Field("friend", "reference person"), Field("name"), rname=rname
2374        )
2375        michael = db.person.insert()  # default insert
2376        john = db.person.insert(name="John")
2377        luke = db.person.insert(name="Luke")
2378
2379        # michael owns Phippo
2380        phippo = db.pet.insert(friend=michael, name="Phippo")
2381        # john owns Dunstin and Gertie
2382        dunstin = db.pet.insert(friend=john, name="Dunstin")
2383        gertie = db.pet.insert(friend=john, name="Gertie")
2384
2385        rtn = db(db.person.id == db.pet.friend).select(orderby=db.person.id | db.pet.id)
2386        self.assertEqual(len(rtn), 3)
2387        self.assertEqual(rtn[0].person.id, michael)
2388        self.assertEqual(rtn[0].person.name, "Michael")
2389        self.assertEqual(rtn[0].pet.id, phippo)
2390        self.assertEqual(rtn[0].pet.name, "Phippo")
2391        self.assertEqual(rtn[1].person.id, john)
2392        self.assertEqual(rtn[1].person.name, "John")
2393        self.assertEqual(rtn[1].pet.name, "Dunstin")
2394        self.assertEqual(rtn[2].pet.name, "Gertie")
2395        # fetch owners, eventually with pet
2396        # main point is retrieving Luke with no pets
2397        rtn = db(db.person.id > 0).select(
2398            orderby=db.person.id | db.pet.id,
2399            left=db.pet.on(db.person.id == db.pet.friend),
2400        )
2401        self.assertEqual(rtn[0].person.id, michael)
2402        self.assertEqual(rtn[0].person.name, "Michael")
2403        self.assertEqual(rtn[0].pet.id, phippo)
2404        self.assertEqual(rtn[0].pet.name, "Phippo")
2405        self.assertEqual(rtn[3].person.name, "Luke")
2406        self.assertEqual(rtn[3].person.id, luke)
2407        self.assertEqual(rtn[3].pet.name, None)
2408        # lets test a subquery
2409        subq = db(db.pet.name == "Gertie")._select(db.pet.friend)
2410        rtn = db(db.person.id.belongs(subq)).select()
2411        self.assertEqual(rtn[0].id, 2)
2412        self.assertEqual(rtn[0]("person.name"), "John")
2413        # as dict
2414        rtn = db(db.person.id > 0).select().as_dict()
2415        self.assertEqual(rtn[1]["name"], "Michael")
2416        # as list
2417        rtn = db(db.person.id > 0).select().as_list()
2418        self.assertEqual(rtn[0]["name"], "Michael")
2419        # isempty
2420        rtn = db(db.person.id > 0).isempty()
2421        self.assertEqual(rtn, False)
2422        # join argument
2423        rtn = db(db.person).select(
2424            orderby=db.person.id | db.pet.id,
2425            join=db.pet.on(db.person.id == db.pet.friend),
2426        )
2427        self.assertEqual(len(rtn), 3)
2428        self.assertEqual(rtn[0].person.id, michael)
2429        self.assertEqual(rtn[0].person.name, "Michael")
2430        self.assertEqual(rtn[0].pet.id, phippo)
2431        self.assertEqual(rtn[0].pet.name, "Phippo")
2432        self.assertEqual(rtn[1].person.id, john)
2433        self.assertEqual(rtn[1].person.name, "John")
2434        self.assertEqual(rtn[1].pet.name, "Dunstin")
2435        self.assertEqual(rtn[2].pet.name, "Gertie")
2436
2437        # aliases
2438        if DEFAULT_URI.startswith("mssql"):
2439            # multiple cascade gotcha
2440            for key in ["reference", "reference FK"]:
2441                db._adapter.types[key] = db._adapter.types[key].replace(
2442                    "%(on_delete_action)s", "NO ACTION"
2443                )
2444        rname = "the_cubs"
2445        db.define_table(
2446            "pet_farm",
2447            Field("name"),
2448            Field("father", "reference pet_farm"),
2449            Field("mother", "reference pet_farm"),
2450            rname=rname,
2451        )
2452
2453        minali = db.pet_farm.insert(name="Minali")
2454        osbert = db.pet_farm.insert(name="Osbert")
2455        # they had a cub
2456        selina = db.pet_farm.insert(name="Selina", father=osbert, mother=minali)
2457
2458        father = db.pet_farm.with_alias("father")
2459        mother = db.pet_farm.with_alias("mother")
2460
2461        # fetch pets with relatives
2462        rtn = db().select(
2463            db.pet_farm.name,
2464            father.name,
2465            mother.name,
2466            left=[
2467                father.on(father.id == db.pet_farm.father),
2468                mother.on(mother.id == db.pet_farm.mother),
2469            ],
2470            orderby=db.pet_farm.id,
2471        )
2472
2473        self.assertEqual(len(rtn), 3)
2474        self.assertEqual(rtn[0].pet_farm.name, "Minali")
2475        self.assertEqual(rtn[0].father.name, None)
2476        self.assertEqual(rtn[0].mother.name, None)
2477        self.assertEqual(rtn[1].pet_farm.name, "Osbert")
2478        self.assertEqual(rtn[2].pet_farm.name, "Selina")
2479        self.assertEqual(rtn[2].father.name, "Osbert")
2480        self.assertEqual(rtn[2].mother.name, "Minali")
2481
2482    def testJoin(self):
2483        db = self.connect()
2484        rname = "this_is_table_t1"
2485        rname2 = "this_is_table_t2"
2486        db.define_table("t1", Field("aa"), rname=rname)
2487        db.define_table("t2", Field("aa"), Field("b", db.t1), rname=rname2)
2488        i1 = db.t1.insert(aa="1")
2489        i2 = db.t1.insert(aa="2")
2490        i3 = db.t1.insert(aa="3")
2491        db.t2.insert(aa="4", b=i1)
2492        db.t2.insert(aa="5", b=i2)
2493        db.t2.insert(aa="6", b=i2)
2494        self.assertEqual(
2495            len(db(db.t1.id == db.t2.b).select(orderby=db.t1.aa | db.t2.aa)), 3
2496        )
2497        self.assertEqual(
2498            db(db.t1.id == db.t2.b).select(orderby=db.t1.aa | db.t2.aa)[2].t1.aa, "2"
2499        )
2500        self.assertEqual(
2501            db(db.t1.id == db.t2.b).select(orderby=db.t1.aa | db.t2.aa)[2].t2.aa, "6"
2502        )
2503        self.assertEqual(
2504            len(
2505                db().select(
2506                    db.t1.ALL,
2507                    db.t2.ALL,
2508                    left=db.t2.on(db.t1.id == db.t2.b),
2509                    orderby=db.t1.aa | db.t2.aa,
2510                )
2511            ),
2512            4,
2513        )
2514        self.assertEqual(
2515            db()
2516            .select(
2517                db.t1.ALL,
2518                db.t2.ALL,
2519                left=db.t2.on(db.t1.id == db.t2.b),
2520                orderby=db.t1.aa | db.t2.aa,
2521            )[2]
2522            .t1.aa,
2523            "2",
2524        )
2525        self.assertEqual(
2526            db()
2527            .select(
2528                db.t1.ALL,
2529                db.t2.ALL,
2530                left=db.t2.on(db.t1.id == db.t2.b),
2531                orderby=db.t1.aa | db.t2.aa,
2532            )[2]
2533            .t2.aa,
2534            "6",
2535        )
2536        self.assertEqual(
2537            db()
2538            .select(
2539                db.t1.ALL,
2540                db.t2.ALL,
2541                left=db.t2.on(db.t1.id == db.t2.b),
2542                orderby=db.t1.aa | db.t2.aa,
2543            )[3]
2544            .t1.aa,
2545            "3",
2546        )
2547        self.assertEqual(
2548            db()
2549            .select(
2550                db.t1.ALL,
2551                db.t2.ALL,
2552                left=db.t2.on(db.t1.id == db.t2.b),
2553                orderby=db.t1.aa | db.t2.aa,
2554            )[3]
2555            .t2.aa,
2556            None,
2557        )
2558        self.assertEqual(
2559            len(
2560                db().select(
2561                    db.t1.aa,
2562                    db.t2.id.count(),
2563                    left=db.t2.on(db.t1.id == db.t2.b),
2564                    orderby=db.t1.aa,
2565                    groupby=db.t1.aa,
2566                )
2567            ),
2568            3,
2569        )
2570        self.assertEqual(
2571            db()
2572            .select(
2573                db.t1.aa,
2574                db.t2.id.count(),
2575                left=db.t2.on(db.t1.id == db.t2.b),
2576                orderby=db.t1.aa,
2577                groupby=db.t1.aa,
2578            )[0]
2579            ._extra[db.t2.id.count()],
2580            1,
2581        )
2582        self.assertEqual(
2583            db()
2584            .select(
2585                db.t1.aa,
2586                db.t2.id.count(),
2587                left=db.t2.on(db.t1.id == db.t2.b),
2588                orderby=db.t1.aa,
2589                groupby=db.t1.aa,
2590            )[1]
2591            ._extra[db.t2.id.count()],
2592            2,
2593        )
2594        self.assertEqual(
2595            db()
2596            .select(
2597                db.t1.aa,
2598                db.t2.id.count(),
2599                left=db.t2.on(db.t1.id == db.t2.b),
2600                orderby=db.t1.aa,
2601                groupby=db.t1.aa,
2602            )[2]
2603            ._extra[db.t2.id.count()],
2604            0,
2605        )
2606        db.t2.drop()
2607        db.t1.drop()
2608
2609        db.define_table("person", Field("name"), rname=rname)
2610        id = db.person.insert(name="max")
2611        self.assertEqual(id.name, "max")
2612        db.define_table(
2613            "dog", Field("name"), Field("ownerperson", "reference person"), rname=rname2
2614        )
2615        db.dog.insert(name="skipper", ownerperson=1)
2616        row = db(db.person.id == db.dog.ownerperson).select().first()
2617        self.assertEqual(row[db.person.name], "max")
2618        self.assertEqual(row["person.name"], "max")
2619        db.dog.drop()
2620        self.assertEqual(len(db.person._referenced_by), 0)
2621
2622
2623class TestRNameFields(DALtest):
2624    # tests for highly experimental rname attribute
2625    def testSelect(self):
2626        db = self.connect()
2627        rname = "a_very_complicated_fieldname"
2628        rname2 = "rrating_from_1_to_10"
2629        db.define_table(
2630            "easy_name",
2631            Field("a_field", rname=rname),
2632            Field("rating", "integer", rname=rname2, default=2),
2633        )
2634        rtn = db.easy_name.insert(a_field="a")
2635        self.assertEqual(rtn.id, 1)
2636        rtn = db(db.easy_name.a_field == "a").select()
2637        self.assertEqual(len(rtn), 1)
2638        self.assertEqual(rtn[0].id, 1)
2639        self.assertEqual(rtn[0].a_field, "a")
2640        db.easy_name.insert(a_field="b")
2641        rtn = db(db.easy_name.id > 0).delete()
2642        self.assertEqual(rtn, 2)
2643        rtn = db(db.easy_name.id > 0).count()
2644        self.assertEqual(rtn, 0)
2645        db.easy_name.insert(a_field="a")
2646        db.easy_name.insert(a_field="b")
2647        rtn = db(db.easy_name.id > 0).count()
2648        self.assertEqual(rtn, 2)
2649        rtn = db(db.easy_name.a_field == "a").update(a_field="c")
2650        rtn = db(db.easy_name.a_field == "c").count()
2651        self.assertEqual(rtn, 1)
2652        rtn = db(db.easy_name.a_field != "c").count()
2653        self.assertEqual(rtn, 1)
2654        avg = db.easy_name.id.avg()
2655        rtn = db(db.easy_name.id > 0).select(avg)
2656        self.assertEqual(rtn[0][avg], 3)
2657
2658        avg = db.easy_name.rating.avg()
2659        rtn = db(db.easy_name.id > 0).select(avg)
2660        self.assertEqual(rtn[0][avg], 2)
2661
2662        rname = "this_is_the_person_name"
2663        db.define_table(
2664            "person",
2665            Field("id", type="id", rname="fooid"),
2666            Field("name", default="Michael", rname=rname),
2667            Field("uuid"),
2668        )
2669        rname = "this_is_the_pet_name"
2670        db.define_table(
2671            "pet", Field("friend", "reference person"), Field("name", rname=rname)
2672        )
2673        michael = db.person.insert()  # default insert
2674        john = db.person.insert(name="John")
2675        luke = db.person.insert(name="Luke")
2676
2677        # michael owns Phippo
2678        phippo = db.pet.insert(friend=michael, name="Phippo")
2679        # john owns Dunstin and Gertie
2680        dunstin = db.pet.insert(friend=john, name="Dunstin")
2681        gertie = db.pet.insert(friend=john, name="Gertie")
2682
2683        rtn = db(db.person.id == db.pet.friend).select(orderby=db.person.id | db.pet.id)
2684        self.assertEqual(len(rtn), 3)
2685        self.assertEqual(rtn[0].person.id, michael)
2686        self.assertEqual(rtn[0].person.name, "Michael")
2687        self.assertEqual(rtn[0].pet.id, phippo)
2688        self.assertEqual(rtn[0].pet.name, "Phippo")
2689        self.assertEqual(rtn[1].person.id, john)
2690        self.assertEqual(rtn[1].person.name, "John")
2691        self.assertEqual(rtn[1].pet.name, "Dunstin")
2692        self.assertEqual(rtn[2].pet.name, "Gertie")
2693        # fetch owners, eventually with pet
2694        # main point is retrieving Luke with no pets
2695        rtn = db(db.person.id > 0).select(
2696            orderby=db.person.id | db.pet.id,
2697            left=db.pet.on(db.person.id == db.pet.friend),
2698        )
2699        self.assertEqual(rtn[0].person.id, michael)
2700        self.assertEqual(rtn[0].person.name, "Michael")
2701        self.assertEqual(rtn[0].pet.id, phippo)
2702        self.assertEqual(rtn[0].pet.name, "Phippo")
2703        self.assertEqual(rtn[3].person.name, "Luke")
2704        self.assertEqual(rtn[3].person.id, luke)
2705        self.assertEqual(rtn[3].pet.name, None)
2706        # lets test a subquery
2707        subq = db(db.pet.name == "Gertie")._select(db.pet.friend)
2708        rtn = db(db.person.id.belongs(subq)).select()
2709        self.assertEqual(rtn[0].id, 2)
2710        self.assertEqual(rtn[0]("person.name"), "John")
2711        # as dict
2712        rtn = db(db.person.id > 0).select().as_dict()
2713        self.assertEqual(rtn[1]["name"], "Michael")
2714        # as list
2715        rtn = db(db.person.id > 0).select().as_list()
2716        self.assertEqual(rtn[0]["name"], "Michael")
2717        # isempty
2718        rtn = db(db.person.id > 0).isempty()
2719        self.assertEqual(rtn, False)
2720        # join argument
2721        rtn = db(db.person).select(
2722            orderby=db.person.id | db.pet.id,
2723            join=db.pet.on(db.person.id == db.pet.friend),
2724        )
2725        self.assertEqual(len(rtn), 3)
2726        self.assertEqual(rtn[0].person.id, michael)
2727        self.assertEqual(rtn[0].person.name, "Michael")
2728        self.assertEqual(rtn[0].pet.id, phippo)
2729        self.assertEqual(rtn[0].pet.name, "Phippo")
2730        self.assertEqual(rtn[1].person.id, john)
2731        self.assertEqual(rtn[1].person.name, "John")
2732        self.assertEqual(rtn[1].pet.name, "Dunstin")
2733        self.assertEqual(rtn[2].pet.name, "Gertie")
2734
2735        # aliases
2736        rname = "the_cub_name"
2737        if DEFAULT_URI.startswith("mssql"):
2738            # multiple cascade gotcha
2739            for key in ["reference", "reference FK"]:
2740                db._adapter.types[key] = db._adapter.types[key].replace(
2741                    "%(on_delete_action)s", "NO ACTION"
2742                )
2743        db.define_table(
2744            "pet_farm",
2745            Field("name", rname=rname),
2746            Field("father", "reference pet_farm"),
2747            Field("mother", "reference pet_farm"),
2748        )
2749
2750        minali = db.pet_farm.insert(name="Minali")
2751        osbert = db.pet_farm.insert(name="Osbert")
2752        # they had a cub
2753        selina = db.pet_farm.insert(name="Selina", father=osbert, mother=minali)
2754
2755        father = db.pet_farm.with_alias("father")
2756        mother = db.pet_farm.with_alias("mother")
2757
2758        # fetch pets with relatives
2759        rtn = db().select(
2760            db.pet_farm.name,
2761            father.name,
2762            mother.name,
2763            left=[
2764                father.on(father.id == db.pet_farm.father),
2765                mother.on(mother.id == db.pet_farm.mother),
2766            ],
2767            orderby=db.pet_farm.id,
2768        )
2769
2770        self.assertEqual(len(rtn), 3)
2771        self.assertEqual(rtn[0].pet_farm.name, "Minali")
2772        self.assertEqual(rtn[0].father.name, None)
2773        self.assertEqual(rtn[0].mother.name, None)
2774        self.assertEqual(rtn[1].pet_farm.name, "Osbert")
2775        self.assertEqual(rtn[2].pet_farm.name, "Selina")
2776        self.assertEqual(rtn[2].father.name, "Osbert")
2777        self.assertEqual(rtn[2].mother.name, "Minali")
2778
2779    def testRun(self):
2780        db = self.connect()
2781        rname = "a_very_complicated_fieldname"
2782        for ft in ["string", "text", "password", "upload", "blob"]:
2783            db.define_table("tt", Field("aa", ft, default="", rname=rname))
2784            self.assertEqual(db.tt.insert(aa="x"), 1)
2785            if not IS_ORACLE:
2786                self.assertEqual(db().select(db.tt.aa)[0].aa, "x")
2787            db.tt.drop()
2788        db.define_table("tt", Field("aa", "integer", default=1, rname=rname))
2789        self.assertEqual(db.tt.insert(aa=3), 1)
2790        self.assertEqual(db().select(db.tt.aa)[0].aa, 3)
2791        db.tt.drop()
2792        db.define_table("tt", Field("aa", "double", default=1, rname=rname))
2793        self.assertEqual(db.tt.insert(aa=3.1), 1)
2794        self.assertEqual(db().select(db.tt.aa)[0].aa, 3.1)
2795        db.tt.drop()
2796        db.define_table("tt", Field("aa", "boolean", default=True, rname=rname))
2797        self.assertEqual(db.tt.insert(aa=True), 1)
2798        self.assertEqual(db().select(db.tt.aa)[0].aa, True)
2799        db.tt.drop()
2800        if not IS_ORACLE:
2801            db.define_table("tt", Field("aa", "json", default={}, rname=rname))
2802            self.assertEqual(db.tt.insert(aa={}), 1)
2803            self.assertEqual(db().select(db.tt.aa)[0].aa, {})
2804            db.tt.drop()
2805        db.define_table(
2806            "tt", Field("aa", "date", default=datetime.date.today(), rname=rname)
2807        )
2808        t0 = datetime.date.today()
2809        self.assertEqual(db.tt.insert(aa=t0), 1)
2810        self.assertEqual(db().select(db.tt.aa)[0].aa, t0)
2811        db.tt.drop()
2812        db.define_table(
2813            "tt",
2814            Field("aa", "datetime", default=datetime.datetime.today(), rname=rname),
2815        )
2816        t0 = datetime.datetime(1971, 12, 21, 10, 30, 55, 0,)
2817        self.assertEqual(db.tt.insert(aa=t0), 1)
2818        self.assertEqual(db().select(db.tt.aa)[0].aa, t0)
2819
2820        ## Row APIs
2821        row = db().select(db.tt.aa)[0]
2822        self.assertEqual(db.tt[1].aa, t0)
2823        self.assertEqual(db.tt["aa"], db.tt.aa)
2824        self.assertEqual(db.tt(1).aa, t0)
2825        self.assertTrue(db.tt(1, aa=None) == None)
2826        self.assertFalse(db.tt(1, aa=t0) == None)
2827        self.assertEqual(row.aa, t0)
2828        self.assertEqual(row["aa"], t0)
2829        self.assertEqual(row["tt.aa"], t0)
2830        self.assertEqual(row("tt.aa"), t0)
2831        self.assertTrue("aa" in row)
2832        self.assertTrue("pydal" not in row)
2833        self.assertTrue(hasattr(row, "aa"))
2834        self.assertFalse(hasattr(row, "pydal"))
2835
2836        ## Lazy and Virtual fields
2837        db.tt.b = Field.Virtual(lambda row: row.tt.aa)
2838        db.tt.c = Field.Lazy(lambda row: row.tt.aa)
2839        row = db().select(db.tt.aa)[0]
2840        self.assertEqual(row.b, t0)
2841        self.assertEqual(row.c(), t0)
2842
2843        db.tt.drop()
2844        db.define_table("tt", Field("aa", "time", default="11:30", rname=rname))
2845        t0 = datetime.time(10, 30, 55)
2846        self.assertEqual(db.tt.insert(aa=t0), 1)
2847        self.assertEqual(db().select(db.tt.aa)[0].aa, t0)
2848
2849    def testInsert(self):
2850        db = self.connect()
2851        rname = "a_very_complicated_fieldname"
2852        db.define_table("tt", Field("aa", rname=rname))
2853        self.assertEqual(db.tt.insert(aa="1"), 1)
2854        self.assertEqual(db.tt.insert(aa="1"), 2)
2855        self.assertEqual(db.tt.insert(aa="1"), 3)
2856        self.assertEqual(db(db.tt.aa == "1").count(), 3)
2857        self.assertEqual(db(db.tt.aa == "2").isempty(), True)
2858        self.assertEqual(db(db.tt.aa == "1").update(aa="2"), 3)
2859        self.assertEqual(db(db.tt.aa == "2").count(), 3)
2860        self.assertEqual(db(db.tt.aa == "2").isempty(), False)
2861        self.assertEqual(db(db.tt.aa == "2").delete(), 3)
2862        self.assertEqual(db(db.tt.aa == "2").isempty(), True)
2863
2864    def testJoin(self):
2865        db = self.connect()
2866        rname = "this_is_field_aa"
2867        rname2 = "this_is_field_b"
2868        db.define_table("t1", Field("aa", rname=rname))
2869        db.define_table("t2", Field("aa", rname=rname), Field("b", db.t1, rname=rname2))
2870        i1 = db.t1.insert(aa="1")
2871        i2 = db.t1.insert(aa="2")
2872        i3 = db.t1.insert(aa="3")
2873        db.t2.insert(aa="4", b=i1)
2874        db.t2.insert(aa="5", b=i2)
2875        db.t2.insert(aa="6", b=i2)
2876        self.assertEqual(
2877            len(db(db.t1.id == db.t2.b).select(orderby=db.t1.aa | db.t2.aa)), 3
2878        )
2879        self.assertEqual(
2880            db(db.t1.id == db.t2.b).select(orderby=db.t1.aa | db.t2.aa)[2].t1.aa, "2"
2881        )
2882        self.assertEqual(
2883            db(db.t1.id == db.t2.b).select(orderby=db.t1.aa | db.t2.aa)[2].t2.aa, "6"
2884        )
2885        self.assertEqual(
2886            len(
2887                db().select(
2888                    db.t1.ALL,
2889                    db.t2.ALL,
2890                    left=db.t2.on(db.t1.id == db.t2.b),
2891                    orderby=db.t1.aa | db.t2.aa,
2892                )
2893            ),
2894            4,
2895        )
2896        self.assertEqual(
2897            db()
2898            .select(
2899                db.t1.ALL,
2900                db.t2.ALL,
2901                left=db.t2.on(db.t1.id == db.t2.b),
2902                orderby=db.t1.aa | db.t2.aa,
2903            )[2]
2904            .t1.aa,
2905            "2",
2906        )
2907        self.assertEqual(
2908            db()
2909            .select(
2910                db.t1.ALL,
2911                db.t2.ALL,
2912                left=db.t2.on(db.t1.id == db.t2.b),
2913                orderby=db.t1.aa | db.t2.aa,
2914            )[2]
2915            .t2.aa,
2916            "6",
2917        )
2918        self.assertEqual(
2919            db()
2920            .select(
2921                db.t1.ALL,
2922                db.t2.ALL,
2923                left=db.t2.on(db.t1.id == db.t2.b),
2924                orderby=db.t1.aa | db.t2.aa,
2925            )[3]
2926            .t1.aa,
2927            "3",
2928        )
2929        self.assertEqual(
2930            db()
2931            .select(
2932                db.t1.ALL,
2933                db.t2.ALL,
2934                left=db.t2.on(db.t1.id == db.t2.b),
2935                orderby=db.t1.aa | db.t2.aa,
2936            )[3]
2937            .t2.aa,
2938            None,
2939        )
2940        self.assertEqual(
2941            len(
2942                db().select(
2943                    db.t1.aa,
2944                    db.t2.id.count(),
2945                    left=db.t2.on(db.t1.id == db.t2.b),
2946                    orderby=db.t1.aa,
2947                    groupby=db.t1.aa,
2948                )
2949            ),
2950            3,
2951        )
2952        self.assertEqual(
2953            db()
2954            .select(
2955                db.t1.aa,
2956                db.t2.id.count(),
2957                left=db.t2.on(db.t1.id == db.t2.b),
2958                orderby=db.t1.aa,
2959                groupby=db.t1.aa,
2960            )[0]
2961            ._extra[db.t2.id.count()],
2962            1,
2963        )
2964        self.assertEqual(
2965            db()
2966            .select(
2967                db.t1.aa,
2968                db.t2.id.count(),
2969                left=db.t2.on(db.t1.id == db.t2.b),
2970                orderby=db.t1.aa,
2971                groupby=db.t1.aa,
2972            )[1]
2973            ._extra[db.t2.id.count()],
2974            2,
2975        )
2976        self.assertEqual(
2977            db()
2978            .select(
2979                db.t1.aa,
2980                db.t2.id.count(),
2981                left=db.t2.on(db.t1.id == db.t2.b),
2982                orderby=db.t1.aa,
2983                groupby=db.t1.aa,
2984            )[2]
2985            ._extra[db.t2.id.count()],
2986            0,
2987        )
2988        db.t2.drop()
2989        db.t1.drop()
2990
2991        db.define_table("person", Field("name", rname=rname))
2992        id = db.person.insert(name="max")
2993        self.assertEqual(id.name, "max")
2994        db.define_table(
2995            "dog",
2996            Field("name", rname=rname),
2997            Field("ownerperson", "reference person", rname=rname2),
2998        )
2999        db.dog.insert(name="skipper", ownerperson=1)
3000        row = db(db.person.id == db.dog.ownerperson).select().first()
3001        self.assertEqual(row[db.person.name], "max")
3002        self.assertEqual(row["person.name"], "max")
3003        db.dog.drop()
3004        self.assertEqual(len(db.person._referenced_by), 0)
3005
3006    def testTFK(self):
3007        db = self.connect()
3008        if "reference TFK" not in db._adapter.types:
3009            self.skipTest("Adapter does not support TFK references")
3010        db.define_table(
3011            "t1",
3012            Field("id1", type="string", length=1, rname="foo1"),
3013            Field("id2", type="integer", rname="foo2"),
3014            Field("val", type="integer"),
3015            primarykey=["id1", "id2"],
3016        )
3017        db.define_table(
3018            "t2",
3019            Field("ref1", type=db.t1.id1, rname="bar1"),
3020            Field("ref2", type=db.t1.id2, rname="bar2"),
3021        )
3022        db.t1.insert(id1="a", id2=1, val=10)
3023        db.t1.insert(id1="a", id2=2, val=30)
3024        db.t2.insert(ref1="a", ref2=1)
3025        query = (db.t1.id1 == db.t2.ref1) & (db.t1.id2 == db.t2.ref2)
3026        result = db(query).select(db.t1.ALL)
3027        self.assertEqual(len(result), 1)
3028        self.assertEqual(result[0]["id1"], "a")
3029        self.assertEqual(result[0]["id2"], 1)
3030        self.assertEqual(result[0]["val"], 10)
3031
3032
3033class TestQuoting(DALtest):
3034
3035    # tests for case sensitivity
3036    def testCase(self):
3037        db = self.connect(ignore_field_case=False, entity_quoting=True)
3038        if DEFAULT_URI.startswith("mssql"):
3039            # multiple cascade gotcha
3040            for key in ["reference", "reference FK"]:
3041                db._adapter.types[key] = db._adapter.types[key].replace(
3042                    "%(on_delete_action)s", "NO ACTION"
3043                )
3044
3045        t0 = db.define_table("t0", Field("f", "string"))
3046        t1 = db.define_table("b", Field("B", t0), Field("words", "text"))
3047
3048        blather = "blah blah and so"
3049        t0[None] = {"f": "content"}
3050        t1[None] = {"B": int(t0[1]["id"]), "words": blather}
3051
3052        r = db(db.t0.id == db.b.B).select()
3053
3054        self.assertEqual(r[0].b.words, blather)
3055
3056        t1.drop()
3057        t0.drop()
3058
3059        # test field case
3060        try:
3061            t0 = db.define_table("table_is_a_test", Field("a_a"), Field("a_A"))
3062        except Exception as e:
3063            # some db does not support case sensitive field names mysql is one of them.
3064            if DEFAULT_URI.startswith("mysql:") or DEFAULT_URI.startswith("sqlite:"):
3065                db.rollback()
3066                return
3067            if "Column names in each table must be unique" in e.args[1]:
3068                db.rollback()
3069                return
3070            raise e
3071
3072        t0[None] = dict(a_a="a_a", a_A="a_A")
3073
3074        self.assertEqual(t0[1].a_a, "a_a")
3075        self.assertEqual(t0[1].a_A, "a_A")
3076
3077    def testPKFK(self):
3078
3079        # test primary keys
3080
3081        db = self.connect(ignore_field_case=False)
3082        if DEFAULT_URI.startswith("mssql"):
3083            # multiple cascade gotcha
3084            for key in ["reference", "reference FK"]:
3085                db._adapter.types[key] = db._adapter.types[key].replace(
3086                    "%(on_delete_action)s", "NO ACTION"
3087                )
3088        # test table without surrogate key. Length must is limited to
3089        # 100 because of MySQL limitations: it cannot handle more than
3090        # 767 bytes in unique keys.
3091
3092        t0 = db.define_table("t0", Field("Code", length=100), primarykey=["Code"])
3093        t2 = db.define_table("t2", Field("f"), Field("t0_Code", "reference t0"))
3094        t3 = db.define_table(
3095            "t3", Field("f", length=100), Field("t0_Code", t0.Code), primarykey=["f"]
3096        )
3097        t4 = db.define_table(
3098            "t4", Field("f", length=100), Field("t0", t0), primarykey=["f"]
3099        )
3100
3101        try:
3102            t5 = db.define_table(
3103                "t5",
3104                Field("f", length=100),
3105                Field("t0", "reference no_table_wrong_reference"),
3106                primarykey=["f"],
3107            )
3108        except Exception as e:
3109            self.assertTrue(isinstance(e, KeyError))
3110
3111        if DEFAULT_URI.startswith("mssql"):
3112            # there's no drop cascade in mssql
3113            t3.drop()
3114            t4.drop()
3115            t2.drop()
3116            t0.drop()
3117        else:
3118            t0.drop("cascade")
3119            t2.drop()
3120            t3.drop()
3121            t4.drop()
3122
3123    def testPKFK2(self):
3124
3125        # test reference to reference
3126
3127        db = self.connect(ignore_field_case=False)
3128        if DEFAULT_URI.startswith("mssql"):
3129            # multiple cascade gotcha
3130            for key in ["reference", "reference FK"]:
3131                db._adapter.types[key] = db._adapter.types[key].replace(
3132                    "%(on_delete_action)s", "NO ACTION"
3133                )
3134
3135        t0 = db.define_table("object_", Field("id", "id"))
3136        t1 = db.define_table(
3137            "part", Field("id", "reference object_"), primarykey=["id"]
3138        )
3139        t2 = db.define_table(
3140            "part_rev",
3141            Field("id", "reference object_"),
3142            Field("part", "reference part"),
3143            Field("rev", "integer"),
3144            primarykey=["id"],
3145        )
3146        id = db.object_.insert()
3147        db.part.insert(id=id)
3148        id_rev = db.object_.insert()
3149        db.part_rev.insert(id=id_rev, part=id, rev=0)
3150        result = db(db.part_rev.part == db.part.id).select()
3151        self.assertEqual(len(result), 1)
3152        self.assertEqual(result[0]["part_rev.id"], id_rev)
3153        self.assertEqual(result[0]["part_rev.part"], id)
3154
3155        if DEFAULT_URI.startswith(("mssql", "sqlite")):
3156            # there's no drop cascade in mssql and it seems there is some problem in sqlite
3157            t2.drop()
3158            t1.drop()
3159            t0.drop()
3160        else:
3161            t0.drop("cascade")
3162            t1.drop("cascade")
3163            t2.drop()
3164
3165
3166class TestTableAndFieldCase(unittest.TestCase):
3167    """
3168    at the Python level we should not allow db.C and db.c because of .table conflicts on windows
3169    but it should be possible to map two different names into distinct tables "c" and "C" at the Python level
3170    By default Python models names should be mapped into lower case table names and assume case insensitivity.
3171    """
3172
3173    def testme(self):
3174        return
3175
3176
3177class TestQuotesByDefault(unittest.TestCase):
3178    """
3179    all default tables names should be quoted unless an explicit mapping has been given for a table.
3180    """
3181
3182    def testme(self):
3183        return
3184
3185class TestGis(DALtest):
3186
3187    @unittest.skipIf(True, "WIP")
3188    def testGeometry(self):
3189        from pydal import geoPoint, geoLine, geoPolygon
3190
3191        if not IS_POSTGRESQL:
3192            return
3193        db = self.connect()
3194        t0 = db.define_table("t0", Field("point", "geometry()"))
3195        t1 = db.define_table("t1", Field("line", "geometry(public, 4326, 2)"))
3196        t2 = db.define_table("t2", Field("polygon", "geometry(public, 4326, 2)"))
3197        t0.insert(point=geoPoint(1, 1))
3198        text = (
3199            db(db.t0.id)
3200            .select(db.t0.point.st_astext())
3201            .first()[db.t0.point.st_astext()]
3202        )
3203        self.assertEqual(text, "POINT(1 1)")
3204        t1.insert(line=geoLine((1, 1), (2, 2)))
3205        text = (
3206            db(db.t1.id).select(db.t1.line.st_astext()).first()[db.t1.line.st_astext()]
3207        )
3208        self.assertEqual(text, "LINESTRING(1 1,2 2)")
3209        t2.insert(polygon=geoPolygon((0, 0), (2, 0), (2, 2), (0, 2), (0, 0)))
3210        text = (
3211            db(db.t2.id)
3212            .select(db.t2.polygon.st_astext())
3213            .first()[db.t2.polygon.st_astext()]
3214        )
3215        self.assertEqual(text, "POLYGON((0 0,2 0,2 2,0 2,0 0))")
3216        query = t0.point.st_intersects(geoLine((0, 0), (2, 2)))
3217        output = db(query).select(db.t0.point).first()[db.t0.point]
3218        self.assertEqual(output, "POINT(1 1)")
3219        query = t2.polygon.st_contains(geoPoint(1, 1))
3220        n = db(query).count()
3221        self.assertEqual(n, 1)
3222        x = t0.point.st_x()
3223        y = t0.point.st_y()
3224        point = db(t0.id).select(x, y).first()
3225        self.assertEqual(point[x], 1)
3226        self.assertEqual(point[y], 1)
3227
3228    @unittest.skipIf(True, "WIP")
3229    def testGeometryCase(self):
3230        from pydal import geoPoint, geoLine, geoPolygon
3231
3232        if not IS_POSTGRESQL:
3233            return
3234        db = self.connect(ignore_field_case=False)
3235        t0 = db.define_table(
3236            "t0", Field("point", "geometry()"), Field("Point", "geometry()")
3237        )
3238        t0.insert(point=geoPoint(1, 1))
3239        t0.insert(Point=geoPoint(2, 2))
3240
3241    @unittest.skipIf(True, "WIP")
3242    def testGisMigration(self):
3243        if not IS_POSTGRESQL:
3244            return
3245        for b in [True, False]:
3246            db = DAL(DEFAULT_URI, check_reserved=["all"], ignore_field_case=b)
3247            t0 = db.define_table(
3248                "t0",
3249                Field("Point", "geometry()"),
3250                Field("rname_point", "geometry()", rname="foo"),
3251            )
3252            db.commit()
3253            db.close()
3254            db = DAL(DEFAULT_URI, check_reserved=["all"], ignore_field_case=b)
3255            t0 = db.define_table("t0", Field("New_point", "geometry()"))
3256            t0.drop()
3257            db.commit()
3258            db.close()
3259
3260
3261@unittest.skipUnless(IS_POSTGRESQL, "Only implemented for postgres for now")
3262class TestJSON(DALtest):
3263    def testJSONExpressions(self):
3264        db = self.connect()
3265        if not hasattr(db._adapter.dialect, "json_key"):
3266            return
3267        tj = db.define_table("tj", Field("testjson", "json"))
3268        rec1 = tj.insert(
3269            testjson={
3270                u"a": {u"a1": 2, u"a0": 1},
3271                u"b": 3,
3272                u"c": {u"c0": {u"c01": [2, 4]}},
3273                u"str": "foo",
3274            }
3275        )
3276        rec2 = tj.insert(
3277            testjson={
3278                u"a": {u"a1": 2, u"a0": 2},
3279                u"b": 4,
3280                u"c": {u"c0": {u"c01": [2, 3]}},
3281                u"str": "bar",
3282            }
3283        )
3284        rows = db(db.tj.testjson.json_key("a").json_key_value("a0") == 1).select()
3285        self.assertEqual(len(rows), 1)
3286        self.assertEqual(rows[0].id, rec1)
3287        rows = db(db.tj.testjson.json_path_value("{a, a1}") == 2).select()
3288        self.assertEqual(len(rows), 2)
3289        rows = db(db.tj.testjson.json_path_value("{a, a0}") == 2).select()
3290        self.assertEqual(len(rows), 1)
3291        self.assertEqual(rows[0].id, rec2)
3292        rows = db(db.tj.testjson.json_path_value(r"{str}") == "foo").select()
3293        self.assertEqual(len(rows), 1)
3294        rows = db(db.tj.testjson.json_contains('{"c": {"c0":{"c01": [2]}}}')).select()
3295        self.assertEqual(len(rows), 2)
3296        rows = db(db.tj.testjson.json_contains('{"c": {"c0":{"c01": [4]}}}')).select()
3297        self.assertEqual(len(rows), 1)
3298        rows = db(db.tj.id > 0).select(
3299            db.tj.testjson.json_path("{c, c0, c01, 0}").with_alias("first")
3300        )
3301        self.assertEqual(rows[0].first, 2)
3302        self.assertEqual(rows[1].first, 2)
3303
3304
3305class TestSQLCustomType(DALtest):
3306    def testRun(self):
3307        db = self.connect()
3308        from pydal.helpers.classes import SQLCustomType
3309
3310        native_double = "double"
3311        native_string = "string"
3312        if hasattr(db._adapter, "types"):
3313            native_double = db._adapter.types["double"]
3314            native_string = db._adapter.types["string"] % {"length": 256}
3315        basic_t = SQLCustomType(type="double", native=native_double)
3316        basic_t_str = SQLCustomType(type="string", native=native_string)
3317        t0 = db.define_table(
3318            "t0", Field("price", basic_t), Field("product", basic_t_str)
3319        )
3320        r_id = t0.insert(price=None, product=None)
3321        row = db(t0.id == r_id).select(t0.ALL).first()
3322        self.assertEqual(row["price"], None)
3323        self.assertEqual(row["product"], None)
3324        r_id = t0.insert(price=1.2, product="car")
3325        row = db(t0.id == r_id).select(t0.ALL).first()
3326        self.assertEqual(row["price"], 1.2)
3327        self.assertEqual(row["product"], "car")
3328        t0.drop()
3329
3330        import zlib
3331
3332        native = "text"
3333        if IS_ORACLE:
3334            native = "CLOB"
3335        compressed = SQLCustomType(
3336            type="text",
3337            native=native,
3338            encoder=(lambda x: zlib.compress(x or "", 1)),
3339            decoder=(lambda x: zlib.decompress(x)),
3340        )
3341        t1 = db.define_table("t0", Field("cdata", compressed))
3342        # r_id=t1.insert(cdata="car")
3343        # row=db(t1.id == r_id).select(t1.ALL).first()
3344        # self.assertEqual(row['cdata'], "'car'")
3345
3346
3347class TestLazy(DALtest):
3348    def testRun(self):
3349        db = self.connect(lazy_tables=True)
3350        t0 = db.define_table("t0", Field("name"))
3351        self.assertTrue(("t0" in db._LAZY_TABLES.keys()))
3352        db.t0.insert(name="1")
3353        self.assertFalse(("t0" in db._LAZY_TABLES.keys()))
3354
3355    def testLazyGetter(self):
3356        db = self.connect(check_reserved=None, lazy_tables=True)
3357        db.define_table("tt", Field("value", "integer"))
3358        db.define_table(
3359            "ttt", Field("value", "integer"), Field("tt_id", "reference tt"),
3360        )
3361        # Force table definition
3362        db.ttt.value.writable = False
3363        idd = db.tt.insert(value=0)
3364        db.ttt.insert(tt_id=idd)
3365
3366    def testRowNone(self):
3367        db = self.connect(check_reserved=None, lazy_tables=True)
3368        tt = db.define_table("tt", Field("value", "integer"))
3369        db.tt.insert(value=None)
3370        row = db(db.tt).select(db.tt.ALL).first()
3371        self.assertEqual(row.value, None)
3372        self.assertEqual(row[db.tt.value], None)
3373        self.assertEqual(row["tt.value"], None)
3374        self.assertEqual(row.get("tt.value"), None)
3375        self.assertEqual(row["value"], None)
3376        self.assertEqual(row.get("value"), None)
3377
3378    def testRowExtra(self):
3379        db = self.connect(check_reserved=None, lazy_tables=True)
3380        if IS_ORACLE:
3381            # lazy use is difficult in Oracle if using cased (non-upper) fields
3382            tt = db.define_table("tt", Field("VALUE", "integer"))
3383            db.tt.insert(VALUE=1)
3384        else:
3385            tt = db.define_table("tt", Field("value", "integer"))
3386            db.tt.insert(value=1)
3387        row = db(db.tt).select("value").first()
3388        self.assertEqual(row.value, 1)
3389
3390
3391class TestRedefine(unittest.TestCase):
3392    def testRun(self):
3393        db = DAL(DEFAULT_URI, check_reserved=["all"], lazy_tables=True, migrate=False)
3394        db.define_table("t_a", Field("code"))
3395        self.assertTrue("code" in db.t_a)
3396        self.assertTrue("code" in db["t_a"])
3397        db.define_table("t_a", Field("code_a"), redefine=True)
3398        self.assertFalse("code" in db.t_a)
3399        self.assertFalse("code" in db["t_a"])
3400        self.assertTrue("code_a" in db.t_a)
3401        self.assertTrue("code_a" in db["t_a"])
3402        db.close()
3403
3404
3405class TestUpdateInsert(DALtest):
3406    def testRun(self):
3407        db = self.connect()
3408        t0 = db.define_table("t0", Field("name"))
3409        i_id = t0.update_or_insert((t0.id == 1), name="web2py")
3410        u_id = t0.update_or_insert((t0.id == i_id), name="web2py2")
3411        self.assertTrue(i_id != None)
3412        self.assertTrue(u_id == None)
3413        self.assertTrue(db(t0).count() == 1)
3414        self.assertTrue(db(t0.name == "web2py").count() == 0)
3415        self.assertTrue(db(t0.name == "web2py2").count() == 1)
3416
3417
3418class TestBulkInsert(DALtest):
3419    def testRun(self):
3420        db = self.connect()
3421        t0 = db.define_table("t0", Field("name"))
3422        global ctr
3423        ctr = 0
3424
3425        def test_after_insert(i, r):
3426            self.assertIsInstance(i, OpRow)
3427            global ctr
3428            ctr += 1
3429            return True
3430
3431        t0._after_insert.append(test_after_insert)
3432        items = [{"name": "web2py_%s" % pos} for pos in range(0, 10, 1)]
3433        t0.bulk_insert(items)
3434        self.assertTrue(db(t0).count() == len(items))
3435        for pos in range(0, 10, 1):
3436            self.assertTrue(db(t0.name == "web2py_%s" % pos).count() == 1)
3437        self.assertTrue(ctr == len(items))
3438
3439
3440class TestRecordVersioning(DALtest):
3441    def testRun(self):
3442        db = self.connect()
3443        db.define_table(
3444            "t0",
3445            Field("name"),
3446            Field("is_active", writable=False, readable=False, default=True),
3447        )
3448        db.t0._enable_record_versioning(archive_name="t0_archive")
3449        self.assertTrue("t0_archive" in db)
3450        i_id = db.t0.insert(name="web2py1")
3451        db.t0.insert(name="web2py2")
3452        db(db.t0.name == "web2py2").delete()
3453        self.assertEqual(len(db(db.t0).select()), 1)
3454        self.assertEqual(db(db.t0).count(), 1)
3455        db(db.t0.id == i_id).update(name="web2py3")
3456        self.assertEqual(len(db(db.t0).select()), 1)
3457        self.assertEqual(db(db.t0).count(), 1)
3458        self.assertEqual(len(db(db.t0_archive).select()), 2)
3459        self.assertEqual(db(db.t0_archive).count(), 2)
3460
3461
3462@unittest.skipIf(IS_SQLITE or IS_NOSQL, "Skip if sqlite or NOSQL since no pools")
3463class TestConnection(unittest.TestCase):
3464    def testRun(self):
3465        # check connection is no longer active after close
3466        db = DAL(DEFAULT_URI, check_reserved=["all"])
3467        connection = db._adapter.connection
3468        db.close()
3469        if not IS_ORACLE:
3470            # newer Oracle versions no longer play well with explicit .close()
3471            self.assertRaises(Exception, connection.commit)
3472
3473        # check connection are reused with pool_size
3474        connections = set()
3475        for a in range(10):
3476            db2 = DAL(DEFAULT_URI, check_reserved=["all"], pool_size=5)
3477            c = db2._adapter.connection
3478            connections.add(c)
3479            db2.close()
3480        self.assertEqual(len(connections), 1)
3481        c = connections.pop()
3482        c.commit()
3483        c.close()
3484        # check correct use of pool_size
3485        dbs = []
3486        for a in range(10):
3487            db3 = DAL(DEFAULT_URI, check_reserved=["all"], pool_size=5)
3488            # make sure the connection is stablished
3489            db3._adapter.get_connection()
3490            dbs.append(db3)
3491        for db in dbs:
3492            db.close()
3493        self.assertEqual(len(db3._adapter.POOLS[DEFAULT_URI]), 5)
3494        for c in db3._adapter.POOLS[DEFAULT_URI]:
3495            c.close()
3496        db3._adapter.POOLS[DEFAULT_URI] = []
3497        # Clean close if a connection is broken (closed explicity)
3498        if not IS_ORACLE:
3499            for a in range(10):
3500                db4 = DAL(DEFAULT_URI, check_reserved=["all"], pool_size=5)
3501                db4._adapter.connection.close()
3502                db4.close()
3503            self.assertEqual(len(db4._adapter.POOLS[DEFAULT_URI]), 0)
3504
3505
3506class TestSerializers(DALtest):
3507    def testAsJson(self):
3508        db = self.connect()
3509        db.define_table("tt", Field("date_field", "datetime"))
3510        db.tt.insert(date_field=datetime.datetime.now())
3511        rows = db().select(db.tt.ALL)
3512        j = rows.as_json()
3513        import json  # standard library
3514
3515        json.loads(j)
3516
3517    def testSelectIterselect(self):
3518        db = self.connect()
3519        db.define_table("tt", Field("tt"))
3520        db.tt.insert(tt="pydal")
3521        methods = ["as_dict", "as_csv", "as_json", "as_xml", "as_list"]
3522        for method in methods:
3523            rows = db(db.tt).select()
3524            rowsI = db(db.tt).iterselect()
3525            self.assertEqual(
3526                getattr(rows, method)(), getattr(rowsI, method)(), "failed %s" % method
3527            )
3528
3529
3530class TestIterselect(DALtest):
3531    def testRun(self):
3532        db = self.connect()
3533        t0 = db.define_table("t0", Field("name"))
3534        names = ["web2py", "pydal", "Massimo"]
3535        for n in names:
3536            t0.insert(name=n)
3537
3538        rows = db(db.t0).select(orderby=db.t0.id)
3539        for pos, r in enumerate(rows):
3540            self.assertEqual(r.name, names[pos])
3541        # Testing basic iteration
3542        rows = db(db.t0).iterselect(orderby=db.t0.id)
3543        for pos, r in enumerate(rows):
3544            self.assertEqual(r.name, names[pos])
3545        # Testing IterRows.first before basic iteration
3546        rows = db(db.t0).iterselect(orderby=db.t0.id)
3547        self.assertEqual(rows.first().name, names[0])
3548        self.assertEqual(rows.first().name, names[0])
3549
3550        for pos, r in enumerate(rows):
3551            self.assertEqual(r.name, names[pos])
3552        # Testing IterRows.__nonzero__ before basic iteration
3553        rows = db(db.t0).iterselect(orderby=db.t0.id)
3554        if rows:
3555            for pos, r in enumerate(rows):
3556                self.assertEqual(r.name, names[pos])
3557
3558        # Empty IterRows
3559        rows = db(db.t0.name == "IterRows").iterselect(orderby=db.t0.id)
3560        self.assertEqual(bool(rows), False)
3561        for pos, r in enumerate(rows):
3562            self.assertEqual(r.name, names[pos])
3563
3564        # Testing IterRows.__getitem__
3565        rows = db(db.t0).iterselect(orderby=db.t0.id)
3566        self.assertEqual(rows[0].name, names[0])
3567        self.assertEqual(rows[1].name, names[1])
3568        # recall the same item
3569        self.assertEqual(rows[1].name, names[1])
3570        self.assertEqual(rows[2].name, names[2])
3571        self.assertRaises(IndexError, rows.__getitem__, 1)
3572
3573        # Testing IterRows.next()
3574        rows = db(db.t0).iterselect(orderby=db.t0.id)
3575        for n in names:
3576            self.assertEqual(next(rows).name, n)
3577        self.assertRaises(StopIteration, next, rows)
3578
3579        # Testing IterRows.compact
3580        rows = db(db.t0).iterselect(orderby=db.t0.id)
3581        rows.compact = False
3582        for n in names:
3583            self.assertEqual(next(rows).t0.name, n)
3584
3585    @unittest.skipIf(IS_MSSQL, "Skip mssql")
3586    def testMultiSelect(self):
3587        # Iterselect holds the cursors until all elemets have been evaluated
3588        # inner queries use new cursors
3589        db = self.connect()
3590        t0 = db.define_table("t0", Field("name"), Field("name_copy"))
3591        db(db.t0).delete()
3592        db.commit()
3593        names = ["web2py", "pydal", "Massimo"]
3594        for n in names:
3595            t0.insert(name=n)
3596        c = 0
3597        for r in db(db.t0).iterselect():
3598            db.t0.update_or_insert(db.t0.id == r.id, name_copy=r.name)
3599            c += 1
3600
3601        self.assertEqual(c, len(names), "The iterator is not looping over all elements")
3602        self.assertEqual(db(db.t0).count(), len(names))
3603        c = 0
3604        for x in db(db.t0).iterselect(orderby=db.t0.id):
3605            for y in db(db.t0).iterselect(orderby=db.t0.id):
3606                db.t0.update_or_insert(db.t0.id == x.id, name_copy=x.name)
3607                c += 1
3608
3609        self.assertEqual(c, len(names) * len(names))
3610        self.assertEqual(db(db.t0).count(), len(names))
3611        db._adapter.test_connection()
3612
3613    @unittest.skipIf(IS_SQLITE | IS_MSSQL, "Skip sqlite & ms sql")
3614    def testMultiSelectWithCommit(self):
3615        db = self.connect()
3616        t0 = db.define_table("t0", Field("nn", "integer"))
3617        for n in xrange(1, 100, 1):
3618            t0.insert(nn=n)
3619        db.commit()
3620        s = db.t0.nn.sum()
3621        tot = db(db.t0).select(s).first()[s]
3622        c = 0
3623        for r in db(db.t0).iterselect(db.t0.ALL):
3624            db.t0.update_or_insert(db.t0.id == r.id, nn=r.nn * 2)
3625            db.commit()
3626            c += 1
3627
3628        self.assertEqual(c, db(db.t0).count())
3629        self.assertEqual(tot * 2, db(db.t0).select(s).first()[s])
3630
3631        db._adapter.test_connection()
3632
3633
3634if __name__ == "__main__":
3635    unittest.main()
3636    tearDownModule()
Note: See TracBrowser for help on using the repository browser.