source: OpenRLabs-Git/deploy/rlabs-docker/web2py-rlabs/gluon/packages/dal/tests/contribs.py

main
Last change on this file was 42bd667, checked in by David Fuertes <dfuertes@…>, 4 years ago

Historial Limpio

  • Property mode set to 100755
File size: 4.8 KB
Line 
1import os
2import threading
3import time
4from ._compat import unittest
5from ._adapt import IS_GAE
6from pydal._compat import to_bytes
7from pydal.contrib.portalocker import lock, unlock, read_locked, write_locked
8from pydal.contrib.portalocker import LockedFile, LOCK_EX
9
10
11def tearDownModule():
12    if os.path.isfile("test.txt"):
13        os.unlink("test.txt")
14
15
16class testPortalocker(unittest.TestCase):
17    def test_LockedFile(self):
18        f = LockedFile("test.txt", mode="wb")
19        f.write(to_bytes("test ok"))
20        f.close()
21        f = LockedFile("test.txt", mode="rb")
22        self.assertEqual(f.read(), to_bytes("test ok"))
23        f.close()
24
25    @unittest.skipIf(IS_GAE, "GAE has no locks")
26    def test_openmultiple(self):
27
28        t0 = time.time()
29
30        def worker1():
31            start = time.time()
32            f1 = LockedFile("test.txt", mode="ab")
33            time.sleep(2)
34            f1.write(to_bytes("%s\t%s\n" % (start, time.time())))
35            f1.close()
36
37        f = LockedFile("test.txt", mode="wb")
38        f.write(to_bytes(""))
39        f.close()
40        th = []
41        for x in range(10):
42            t1 = threading.Thread(target=worker1)
43            th.append(t1)
44            t1.start()
45        for t in th:
46            t.join()
47        with open("test.txt") as g:
48            content = g.read()
49
50        results = [line.strip().split("\t") for line in content.split("\n") if line]
51        # all started at more or less the same time
52        starts = [1 for line in results if float(line[0]) - t0 < 1]
53        ends = [line[1] for line in results]
54        self.assertEqual(sum(starts), len(starts))
55        # end - start is at least 2
56        for line in results:
57            self.assertTrue(float(line[1]) - float(line[0]) >= 2)
58        # ends are not the same
59        self.assertTrue(len(ends) == len(ends))
60
61    @unittest.skipIf(IS_GAE, "GAE has no locks")
62    def test_lock_unlock(self):
63        def worker1(fh):
64            time.sleep(2)
65            unlock(fh)
66
67        def worker2(fh):
68            time.sleep(2)
69            fh.close()
70
71        f = open("test.txt", mode="wb")
72        lock(f, LOCK_EX)
73        f.write(to_bytes("test ok"))
74        t1 = threading.Thread(target=worker1, args=(f,))
75        t1.start()
76        start = int(time.time())
77        content = read_locked("test.txt")
78        end = int(time.time())
79        t1.join()
80        f.close()
81        # it took at least 2 seconds to read
82        # although nothing is there until .close()
83        self.assertTrue(end - start >= 2)
84        self.assertEqual(content, to_bytes(""))
85        content = read_locked("test.txt")
86        self.assertEqual(content, to_bytes("test ok"))
87
88        f = LockedFile("test.txt", mode="wb")
89        f.write(to_bytes("test ok"))
90        t1 = threading.Thread(target=worker2, args=(f,))
91        t1.start()
92        start = int(time.time())
93        content = read_locked("test.txt")
94        end = int(time.time())
95        t1.join()
96        # it took at least 2 seconds to read
97        # content is there because we called close()
98        self.assertTrue(end - start >= 2)
99        self.assertEqual(content, to_bytes("test ok"))
100
101    @unittest.skipIf(IS_GAE, "GAE has no locks")
102    def test_read_locked(self):
103        def worker(fh):
104            time.sleep(2)
105            fh.close()
106
107        f = LockedFile("test.txt", mode="wb")
108        f.write(to_bytes("test ok"))
109        t1 = threading.Thread(target=worker, args=(f,))
110        t1.start()
111        start = int(time.time())
112        content = read_locked("test.txt")
113        end = int(time.time())
114        t1.join()
115        # it took at least 2 seconds to read
116        self.assertTrue(end - start >= 2)
117        self.assertEqual(content, to_bytes("test ok"))
118
119    @unittest.skipIf(IS_GAE, "GAE has no locks")
120    def test_write_locked(self):
121        def worker(fh):
122            time.sleep(2)
123            fh.close()
124
125        f = open("test.txt", mode="wb")
126        lock(f, LOCK_EX)
127        t1 = threading.Thread(target=worker, args=(f,))
128        t1.start()
129        start = int(time.time())
130        write_locked("test.txt", to_bytes("test ok"))
131        end = int(time.time())
132        t1.join()
133        with open("test.txt") as g:
134            content = g.read()
135        # it took at least 2 seconds to read
136        self.assertTrue(end - start >= 2)
137        self.assertEqual(content, "test ok")
138
139    def test_exception(self):
140        self.assertRaises(RuntimeError, LockedFile, *["test.txt", "x"])
141
142    def test_readline(self):
143        f = LockedFile("test.txt", "wb")
144        f.write(to_bytes("abc\n"))
145        f.write(to_bytes("123\n"))
146        f.close()
147        f = LockedFile("test.txt", "rb")
148        rl = f.readline()
149        self.assertTrue(to_bytes("abc") in rl)
150        rl = f.readline()
151        self.assertTrue(to_bytes("123") in rl)
152        f.close()
153        f = LockedFile("test.txt", "rb")
154        rls = f.readlines()
155        f.close()
156        self.assertEqual(len(rls), 2)
Note: See TracBrowser for help on using the repository browser.