source: OpenRLabs-Git/deploy/rlabs-docker/web2py-rlabs/gluon/tests/test_cron.py

main
Last change on this file was 42bd667, checked in by David Fuertes <dfuertes@…>, 4 years ago

Historial Limpio

  • Property mode set to 100755
File size: 2.6 KB
Line 
1# -*- coding: utf-8 -*-
2# vim: set ts=4 sw=4 et ai:
3"""
4    Unit tests for cron
5"""
6
7import unittest
8import os
9import shutil
10import time
11import sys
12
13from gluon.newcron import (Token, crondance, subprocess_count,
14    SimplePool, stopcron, reset)
15from gluon.fileutils import create_app, write_file
16
17
18test_app_name = '_test_cron'
19appdir = os.path.join('applications', test_app_name)
20
21def setUpModule():
22    if not os.path.exists(appdir):
23        os.mkdir(appdir)
24        create_app(appdir)
25
26def tearDownModule():
27    if os.path.exists(appdir):
28        shutil.rmtree(appdir)
29
30
31TEST_CRONTAB = """@reboot peppe **applications/%s/cron/test.py
32""" %  test_app_name
33TEST_SCRIPT1 = """
34from os.path import join as pjoin
35
36with open(pjoin(request.folder, 'private', 'cron_req'), 'w') as f:
37    f.write(str(request))
38"""
39TARGET = os.path.join(appdir, 'private', 'cron_req')
40TEST_SCRIPT2 = """
41import time
42
43time.sleep(13)
44"""
45
46class TestCron(unittest.TestCase):
47
48    @classmethod
49    def tearDownClass(cls):
50        for master in (os.path.join(appdir, 'cron.master'), 'cron.master'):
51            if os.path.exists(master):
52                os.unlink(master)
53
54    def test_1_Token(self):
55        app_path = os.path.join(os.getcwd(), 'applications', test_app_name)
56        t = Token(path=app_path)
57        self.assertEqual(t.acquire(), t.now)
58        self.assertFalse(t.release())
59        self.assertIsNone(t.acquire())
60        self.assertTrue(t.release())
61
62    def test_2_crondance(self):
63        base = os.path.join(appdir, 'cron')
64        write_file(os.path.join(base, 'crontab'), TEST_CRONTAB)
65        write_file(os.path.join(base, 'test.py'), TEST_SCRIPT1)
66        if os.path.exists(TARGET):
67            os.unlink(TARGET)
68        crondance(os.getcwd(), 'hard', startup=True, apps=[test_app_name])
69        # must wait for the cron task, not very reliable
70        time.sleep(1)
71        while subprocess_count():
72            time.sleep(1)
73        time.sleep(1)
74        self.assertTrue(os.path.exists(TARGET))
75
76    def test_3_SimplePool(self):
77        base = os.path.join(appdir, 'cron')
78        write_file(os.path.join(base, 'test.py'), TEST_SCRIPT2)
79        w2p_path = os.path.join(os.getcwd(), 'web2py.py')
80        self.assertTrue(os.path.exists(w2p_path))
81        launcher = SimplePool(1)
82        cmd1 = [sys.executable, w2p_path,
83            '--cron_job', '--no_banner', '--no_gui', '--plain',
84            '-S', test_app_name, '-R', "applications/%s/cron/test.py" % test_app_name]
85        self.assertTrue(launcher(cmd1))
86        self.assertFalse(launcher(None))
87        time.sleep(1)
88        stopcron()
89        time.sleep(1)
90        reset()
Note: See TracBrowser for help on using the repository browser.