1 | import re |
---|
2 | from ._compat import unittest |
---|
3 | from ._adapt import DEFAULT_URI, IS_NOSQL, IS_IMAP, drop |
---|
4 | from pydal._compat import integer_types |
---|
5 | from pydal import DAL, Field |
---|
6 | |
---|
7 | long = integer_types[-1] |
---|
8 | |
---|
9 | regex_isint = re.compile(r"^[+-]?\d+$") |
---|
10 | |
---|
11 | |
---|
12 | def range_error_message(error_message, what_to_enter, minimum, maximum): |
---|
13 | "build the error message for the number range validators" |
---|
14 | if error_message is None: |
---|
15 | error_message = "Enter " + what_to_enter |
---|
16 | if minimum is not None and maximum is not None: |
---|
17 | error_message += " between %(min)g and %(max)g" |
---|
18 | elif minimum is not None: |
---|
19 | error_message += " greater than or equal to %(min)g" |
---|
20 | elif maximum is not None: |
---|
21 | error_message += " less than or equal to %(max)g" |
---|
22 | if type(maximum) in integer_types: |
---|
23 | maximum -= 1 |
---|
24 | return error_message % dict(min=minimum, max=maximum) |
---|
25 | |
---|
26 | |
---|
27 | class IS_INT_IN_RANGE(object): |
---|
28 | def __init__(self, minimum=None, maximum=None, error_message=None): |
---|
29 | self.minimum = int(minimum) if minimum is not None else None |
---|
30 | self.maximum = int(maximum) if maximum is not None else None |
---|
31 | self.error_message = range_error_message( |
---|
32 | error_message, "an integer", self.minimum, self.maximum |
---|
33 | ) |
---|
34 | |
---|
35 | def __call__(self, value, record_id=None): |
---|
36 | if regex_isint.match(str(value)): |
---|
37 | v = int(value) |
---|
38 | if (self.minimum is None or v >= self.minimum) and ( |
---|
39 | self.maximum is None or v < self.maximum |
---|
40 | ): |
---|
41 | return (v, None) |
---|
42 | return (value, self.error_message) |
---|
43 | |
---|
44 | |
---|
45 | @unittest.skipIf(IS_IMAP, "TODO: IMAP test") |
---|
46 | class TestValidateAndInsert(unittest.TestCase): |
---|
47 | def testRun(self): |
---|
48 | db = DAL(DEFAULT_URI, check_reserved=["all"]) |
---|
49 | db.define_table( |
---|
50 | "val_and_insert", |
---|
51 | Field("aa"), |
---|
52 | Field("bb", "integer", requires=IS_INT_IN_RANGE(1, 5)), |
---|
53 | ) |
---|
54 | rtn = db.val_and_insert.validate_and_insert(aa="test1", bb=2) |
---|
55 | if IS_NOSQL: |
---|
56 | self.assertEqual(isinstance(rtn.id, long), True) |
---|
57 | else: |
---|
58 | self.assertEqual(rtn.id, 1) |
---|
59 | # errors should be empty |
---|
60 | self.assertEqual(len(rtn.errors.keys()), 0) |
---|
61 | # this insert won't pass |
---|
62 | rtn = db.val_and_insert.validate_and_insert(bb="a") |
---|
63 | # the returned id should be None |
---|
64 | self.assertEqual(rtn.id, None) |
---|
65 | # an error message should be in rtn.errors.bb |
---|
66 | self.assertNotEqual(rtn.errors.bb, None) |
---|
67 | # cleanup table |
---|
68 | drop(db.val_and_insert) |
---|
69 | |
---|
70 | |
---|
71 | @unittest.skipIf(IS_IMAP, "TODO: IMAP test") |
---|
72 | class TestValidateUpdateInsert(unittest.TestCase): |
---|
73 | def testRun(self): |
---|
74 | db = DAL(DEFAULT_URI, check_reserved=["all"]) |
---|
75 | t1 = db.define_table( |
---|
76 | "t1", Field("int_level", "integer", requires=IS_INT_IN_RANGE(1, 5)) |
---|
77 | ) |
---|
78 | i_response = t1.validate_and_update_or_insert((t1.int_level == 1), int_level=1) |
---|
79 | u_response = t1.validate_and_update_or_insert((t1.int_level == 1), int_level=2) |
---|
80 | e_response = t1.validate_and_update_or_insert((t1.int_level == 1), int_level=6) |
---|
81 | self.assertTrue(i_response.id != None) |
---|
82 | self.assertTrue(u_response.id != None) |
---|
83 | self.assertTrue(e_response.id == None and len(e_response.errors.keys()) != 0) |
---|
84 | self.assertEqual(len(db(t1).select()), 1) |
---|
85 | self.assertEqual(db(t1).count(), 1) |
---|
86 | self.assertEqual(db(t1.int_level == 1).count(), 0) |
---|
87 | self.assertEqual(db(t1.int_level == 6).count(), 0) |
---|
88 | self.assertEqual(db(t1.int_level == 2).count(), 1) |
---|
89 | drop(db.t1) |
---|
90 | return |
---|