import os import threading import time from ._compat import unittest from ._adapt import IS_GAE from pydal._compat import to_bytes from pydal.contrib.portalocker import lock, unlock, read_locked, write_locked from pydal.contrib.portalocker import LockedFile, LOCK_EX def tearDownModule(): if os.path.isfile("test.txt"): os.unlink("test.txt") class testPortalocker(unittest.TestCase): def test_LockedFile(self): f = LockedFile("test.txt", mode="wb") f.write(to_bytes("test ok")) f.close() f = LockedFile("test.txt", mode="rb") self.assertEqual(f.read(), to_bytes("test ok")) f.close() @unittest.skipIf(IS_GAE, "GAE has no locks") def test_openmultiple(self): t0 = time.time() def worker1(): start = time.time() f1 = LockedFile("test.txt", mode="ab") time.sleep(2) f1.write(to_bytes("%s\t%s\n" % (start, time.time()))) f1.close() f = LockedFile("test.txt", mode="wb") f.write(to_bytes("")) f.close() th = [] for x in range(10): t1 = threading.Thread(target=worker1) th.append(t1) t1.start() for t in th: t.join() with open("test.txt") as g: content = g.read() results = [line.strip().split("\t") for line in content.split("\n") if line] # all started at more or less the same time starts = [1 for line in results if float(line[0]) - t0 < 1] ends = [line[1] for line in results] self.assertEqual(sum(starts), len(starts)) # end - start is at least 2 for line in results: self.assertTrue(float(line[1]) - float(line[0]) >= 2) # ends are not the same self.assertTrue(len(ends) == len(ends)) @unittest.skipIf(IS_GAE, "GAE has no locks") def test_lock_unlock(self): def worker1(fh): time.sleep(2) unlock(fh) def worker2(fh): time.sleep(2) fh.close() f = open("test.txt", mode="wb") lock(f, LOCK_EX) f.write(to_bytes("test ok")) t1 = threading.Thread(target=worker1, args=(f,)) t1.start() start = int(time.time()) content = read_locked("test.txt") end = int(time.time()) t1.join() f.close() # it took at least 2 seconds to read # although nothing is there until .close() self.assertTrue(end - start >= 2) self.assertEqual(content, to_bytes("")) content = read_locked("test.txt") self.assertEqual(content, to_bytes("test ok")) f = LockedFile("test.txt", mode="wb") f.write(to_bytes("test ok")) t1 = threading.Thread(target=worker2, args=(f,)) t1.start() start = int(time.time()) content = read_locked("test.txt") end = int(time.time()) t1.join() # it took at least 2 seconds to read # content is there because we called close() self.assertTrue(end - start >= 2) self.assertEqual(content, to_bytes("test ok")) @unittest.skipIf(IS_GAE, "GAE has no locks") def test_read_locked(self): def worker(fh): time.sleep(2) fh.close() f = LockedFile("test.txt", mode="wb") f.write(to_bytes("test ok")) t1 = threading.Thread(target=worker, args=(f,)) t1.start() start = int(time.time()) content = read_locked("test.txt") end = int(time.time()) t1.join() # it took at least 2 seconds to read self.assertTrue(end - start >= 2) self.assertEqual(content, to_bytes("test ok")) @unittest.skipIf(IS_GAE, "GAE has no locks") def test_write_locked(self): def worker(fh): time.sleep(2) fh.close() f = open("test.txt", mode="wb") lock(f, LOCK_EX) t1 = threading.Thread(target=worker, args=(f,)) t1.start() start = int(time.time()) write_locked("test.txt", to_bytes("test ok")) end = int(time.time()) t1.join() with open("test.txt") as g: content = g.read() # it took at least 2 seconds to read self.assertTrue(end - start >= 2) self.assertEqual(content, "test ok") def test_exception(self): self.assertRaises(RuntimeError, LockedFile, *["test.txt", "x"]) def test_readline(self): f = LockedFile("test.txt", "wb") f.write(to_bytes("abc\n")) f.write(to_bytes("123\n")) f.close() f = LockedFile("test.txt", "rb") rl = f.readline() self.assertTrue(to_bytes("abc") in rl) rl = f.readline() self.assertTrue(to_bytes("123") in rl) f.close() f = LockedFile("test.txt", "rb") rls = f.readlines() f.close() self.assertEqual(len(rls), 2)