source: OpenRLabs-Git/web2py/applications/rlabs/unitTest/disabledTest/test_threading.py

main
Last change on this file was 42bd667, checked in by David Fuertes <dfuertes@…>, 4 years ago

Historial Limpio

  • Property mode set to 100644
File size: 2.1 KB
Line 
1#!/usr/bin/python3
2from fileinput import filename
3
4
5try:
6    import unittest2 as unittest #for Python <= 2.6
7except:
8    import unittest   
9
10import os
11import sys
12sys.path.append(os.path.dirname(sys.argv[0]) + '/') #Path to my unitTest directory
13from gluon import *
14
15from baseTest import BaseTest
16
17APP =sys.argv[1]
18
19
20#This line executes your controller file, bringing all of the function declarations into the local namespace.
21#execfile("applications/" + APP + "/controllers/default.py", globals())
22filename = "applications/" + APP + "/controllers/default.py"
23
24exec(compile(open(filename, "rb").read(), filename, 'exec'), globals())
25
26import concurrent.futures
27import threading
28from dummy import DummySingleton, Dummy
29import dummy_module
30
31class TestThreading(BaseTest):
32   
33       
34       
35    def do_singleton(self, var):
36        print(threading.current_thread())
37        ds = DummySingleton()
38        ds.set_var(var)
39        d = Dummy()
40        d.set_var(var)
41        dummy_module.set_var(var)
42       
43    def new_threads(self, var):
44        print(threading.current_thread())
45        if var == 1:
46            vars = [3,4]
47        else:
48            vars = [5,6]
49           
50        with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
51            PCs_with_status =  list(executor.map(self.do_singleton, vars))
52
53   
54    def test_threads(self):
55        vars = [1,2]
56        with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
57            PCs_with_status =  list(executor.map(self.new_threads, vars))
58           
59    def d_do_new_threads(self):
60        print(threading.current_thread())
61        threads = []
62        for i in range(2,4):
63            t = threading.Thread(target=self.do_singleton, args=(i,))
64            threads.append(t)
65            t.start() 
66       
67    def d_test_threads(self):
68        threads = []
69        for i in range(2):
70            t = threading.Thread(target=self.do_new_threads)
71            threads.append(t)
72            t.start() 
73        t.join()
74
75suite = unittest.TestSuite()
76suite.addTest(unittest.makeSuite(TestThreading))
77unittest.TextTestRunner(verbosity=2).run(suite)
Note: See TracBrowser for help on using the repository browser.