source: OpenRLabs-Git/deploy/rlabs-docker/web2py-rlabs/gluon/tests/test_scheduler.py

main
Last change on this file was 42bd667, checked in by David Fuertes <dfuertes@…>, 4 years ago

Historial Limpio

  • Property mode set to 100755
File size: 32.7 KB
Line 
1# -*- coding: utf-8 -*-
2
3"""
4    Unit tests for gluon.scheduler
5"""
6import os
7import unittest
8import glob
9import datetime
10import sys
11import shutil
12
13from gluon.storage import Storage
14from gluon.languages import TranslatorFactory
15from gluon.scheduler import JobGraph, Scheduler, CronParser
16from gluon.dal import DAL
17from gluon.fileutils import create_app
18
19
20test_app_name = '_test_scheduler'
21
22class BaseTestScheduler(unittest.TestCase):
23
24    def setUp(self):
25        self.tearDown()
26        appdir = os.path.join('applications', test_app_name)
27        os.mkdir(appdir)
28        create_app(appdir)
29
30        self.db = None
31        from gluon.globals import current
32        s = Storage({'application': test_app_name,
33                     'folder': "applications/%s" % test_app_name,
34                     'controller': 'default'})
35        current.request = s
36        T = TranslatorFactory('', 'en')
37        current.T = T
38        self.db = DAL('sqlite://dummy2.db',
39                      folder="applications/%s/databases" % test_app_name,
40                      check_reserved=['all'])
41
42    def tearDown(self):
43        try:
44            self.inner_teardown()
45        except:
46            pass
47        appdir = os.path.join('applications', test_app_name)
48        if os.path.exists(appdir):
49            shutil.rmtree(appdir)
50
51
52
53class CronParserTest(unittest.TestCase):
54
55    def testMinute(self):
56        # minute asterisk
57        base = datetime.datetime(2010, 1, 23, 12, 18)
58        itr = CronParser('*/1 * * * *', base)
59        n1 = itr.next()    # 19
60        self.assertEqual(base.year, n1.year)
61        self.assertEqual(base.month, n1.month)
62        self.assertEqual(base.day, n1.day)
63        self.assertEqual(base.hour, n1.hour)
64        self.assertEqual(base.minute, n1.minute - 1)
65        for i in range(39):  # ~ 58
66            itr.next()
67        n2 = itr.next()
68        self.assertEqual(n2.minute, 59)
69        n3 = itr.next()
70        self.assertEqual(n3.minute, 0)
71        self.assertEqual(n3.hour, 13)
72
73        itr = CronParser('*/5 * * * *', base)
74        n4 = itr.next()
75        self.assertEqual(n4.minute, 20)
76        for i in range(6):
77            itr.next()
78        n5 = itr.next()
79        self.assertEqual(n5.minute, 55)
80        n6 = itr.next()
81        self.assertEqual(n6.minute, 0)
82        self.assertEqual(n6.hour, 13)
83
84        base = datetime.datetime(2010, 1, 23, 12, 18)
85        itr = CronParser('4/34 * * * *', base)
86        n7 = itr.next()
87        self.assertEqual(n7.minute, 38)
88        self.assertEqual(n7.hour, 12)
89        n8 = itr.next()
90        self.assertEqual(n8.minute, 4)
91        self.assertEqual(n8.hour, 13)
92
93    def testHour(self):
94        base = datetime.datetime(2010, 1, 24, 12, 2)
95        itr = CronParser('0 */3 * * *', base)
96        n1 = itr.next()
97        self.assertEqual(n1.hour, 15)
98        self.assertEqual(n1.minute, 0)
99        for i in range(2):
100            itr.next()
101        n2 = itr.next()
102        self.assertEqual(n2.hour, 0)
103        self.assertEqual(n2.day, 25)
104
105    def testDay(self):
106        base = datetime.datetime(2010, 2, 24, 12, 9)
107        itr = CronParser('0 0 */3 * *', base)
108        n1 = itr.next()
109        # 1 4 7 10 13 16 19 22 25 28
110        self.assertEqual(n1.day, 25)
111        n2 = itr.next()
112        self.assertEqual(n2.day, 28)
113        n3 = itr.next()
114        self.assertEqual(n3.day, 1)
115        self.assertEqual(n3.month, 3)
116
117        # test leap year
118        base = datetime.datetime(1996, 2, 27)
119        itr = CronParser('0 0 * * *', base)
120        n1 = itr.next()
121        self.assertEqual(n1.day, 28)
122        self.assertEqual(n1.month, 2)
123        n2 = itr.next()
124        self.assertEqual(n2.day, 29)
125        self.assertEqual(n2.month, 2)
126
127        base2 = datetime.datetime(2000, 2, 27)
128        itr2 = CronParser('0 0 * * *', base2)
129        n3 = itr2.next()
130        self.assertEqual(n3.day, 28)
131        self.assertEqual(n3.month, 2)
132        n4 = itr2.next()
133        self.assertEqual(n4.day, 29)
134        self.assertEqual(n4.month, 2)
135
136    def testWeekDay(self):
137        base = datetime.datetime(2010, 2, 25)
138        itr = CronParser('0 0 * * sat', base)
139        n1 = itr.next()
140        self.assertEqual(n1.isoweekday(), 6)
141        self.assertEqual(n1.day, 27)
142        n2 = itr.next()
143        self.assertEqual(n2.isoweekday(), 6)
144        self.assertEqual(n2.day, 6)
145        self.assertEqual(n2.month, 3)
146
147        base = datetime.datetime(2010, 1, 25)
148        itr = CronParser('0 0 1 * wed', base)
149        n1 = itr.next()
150        self.assertEqual(n1.month, 1)
151        self.assertEqual(n1.day, 27)
152        self.assertEqual(n1.year, 2010)
153        n2 = itr.next()
154        self.assertEqual(n2.month, 2)
155        self.assertEqual(n2.day, 1)
156        self.assertEqual(n2.year, 2010)
157        n3 = itr.next()
158        self.assertEqual(n3.month, 2)
159        self.assertEqual(n3.day, 3)
160        self.assertEqual(n3.year, 2010)
161
162    def testMonth(self):
163        base = datetime.datetime(2010, 1, 25)
164        itr = CronParser('0 0 1 * *', base)
165        n1 = itr.next()
166        self.assertEqual(n1.month, 2)
167        self.assertEqual(n1.day, 1)
168        n2 = itr.next()
169        self.assertEqual(n2.month, 3)
170        self.assertEqual(n2.day, 1)
171        for i in range(8):
172            itr.next()
173        n3 = itr.next()
174        self.assertEqual(n3.month, 12)
175        self.assertEqual(n3.year, 2010)
176        n4 = itr.next()
177        self.assertEqual(n4.month, 1)
178        self.assertEqual(n4.year, 2011)
179
180        base = datetime.datetime(2010, 1, 25)
181        itr = CronParser('0 0 1 */4 *', base)
182        n1 = itr.next()
183        self.assertEqual(n1.month, 5)
184        self.assertEqual(n1.day, 1)
185
186        base = datetime.datetime(2010, 1, 25)
187        itr = CronParser('0 0 1 1-3 *', base)
188        n1 = itr.next()
189        self.assertEqual(n1.month, 2)
190        self.assertEqual(n1.day, 1)
191        n2 = itr.next()
192        self.assertEqual(n2.month, 3)
193        self.assertEqual(n2.day, 1)
194        n3 = itr.next()
195        self.assertEqual(n3.month, 1)
196        self.assertEqual(n3.day, 1)
197
198    def testSundayToThursdayWithAlphaConversion(self):
199        base = datetime.datetime(2010, 8, 25, 15, 56)
200        itr = CronParser("30 22 * * sun-thu", base)
201        n1 = itr.next()
202        self.assertEqual(base.year, n1.year)
203        self.assertEqual(base.month, n1.month)
204        self.assertEqual(base.day, n1.day)
205        self.assertEqual(22, n1.hour)
206        self.assertEqual(30, n1.minute)
207
208    def testISOWeekday(self):
209        base = datetime.datetime(2010, 2, 25)
210        itr = CronParser('0 0 * * 7', base)
211        n1 = itr.next()
212        self.assertEqual(n1.isoweekday(), 7)
213        self.assertEqual(n1.day, 28)
214        n2 = itr.next()
215        self.assertEqual(n2.isoweekday(), 7)
216        self.assertEqual(n2.day, 7)
217        self.assertEqual(n2.month, 3)
218        base = datetime.datetime(2010, 2, 22)
219        itr = CronParser('0 0 * * */2', base)
220        n1 = itr.next()
221        self.assertEqual(n1.isoweekday(), 2)
222        self.assertEqual(n1.day, 23)
223        n2 = itr.next()
224        self.assertEqual(n2.isoweekday(), 4)
225        self.assertEqual(n2.day, 25)
226
227    def testBug2(self):
228
229        base = datetime.datetime(2012, 1, 1, 0, 0)
230        itr = CronParser('0 * * 3 *', base)
231        n1 = itr.next()
232        self.assertEqual(n1.year, base.year)
233        self.assertEqual(n1.month, 3)
234        self.assertEqual(n1.day, base.day)
235        self.assertEqual(n1.hour, base.hour)
236        self.assertEqual(n1.minute, base.minute)
237
238        n2 = itr.next()
239        self.assertEqual(n2.year, base.year)
240        self.assertEqual(n2.month, 3)
241        self.assertEqual(n2.day, base.day)
242        self.assertEqual(n2.hour, base.hour + 1)
243        self.assertEqual(n2.minute, base.minute)
244
245        n3 = itr.next()
246        self.assertEqual(n3.year, base.year)
247        self.assertEqual(n3.month, 3)
248        self.assertEqual(n3.day, base.day)
249        self.assertEqual(n3.hour, base.hour + 2)
250        self.assertEqual(n3.minute, base.minute)
251
252    def testBug3(self):
253        base = datetime.datetime(2013, 3, 1, 12, 17, 34, 257877)
254        c = CronParser('00 03 16,30 * *', base)
255
256        n1 = c.next()
257        self.assertEqual(n1.month, 3)
258        self.assertEqual(n1.day, 16)
259
260        n2 = c.next()
261        self.assertEqual(n2.month, 3)
262        self.assertEqual(n2.day, 30)
263
264        n3 = c.next()
265        self.assertEqual(n3.month, 4)
266        self.assertEqual(n3.day, 16)
267
268    def test_rangeGenerator(self):
269        base = datetime.datetime(2013, 3, 4, 0, 0)
270        itr = CronParser('1-9/2 0 1 * *', base)
271        n1 = itr.next()
272        n2 = itr.next()
273        n3 = itr.next()
274        n4 = itr.next()
275        n5 = itr.next()
276        self.assertEqual(n1.minute, 1)
277        self.assertEqual(n2.minute, 3)
278        self.assertEqual(n3.minute, 5)
279        self.assertEqual(n4.minute, 7)
280        self.assertEqual(n5.minute, 9)
281
282    def test_iterGenerator(self):
283        base = datetime.datetime(2013, 3, 4, 0, 0)
284        itr = CronParser('1-9/2 0 1 * *', base)
285        x = 0
286        for n in itr:
287            x += 1
288            if x > 4:
289                break
290        self.assertEqual(n.minute, 9)
291
292    def test_invalidcron(self):
293        base = datetime.datetime(2013, 3, 4, 0, 0)
294        itr = CronParser('5 4 31 2 *', base)
295        self.assertRaises(ValueError, itr.next)
296        itr = CronParser('1- * * * *', base)
297        self.assertRaises(ValueError, itr.next)
298        itr = CronParser('-1 * * * *', base)
299        self.assertRaises(ValueError, itr.next)
300        itr = CronParser('* * 5-1 * *', base)
301        self.assertRaises(ValueError, itr.next)
302        itr = CronParser('* * * janu-jun *', base)
303        self.assertRaises(ValueError, itr.next)
304        itr = CronParser('* * * * * *', base)
305        self.assertRaises(ValueError, itr.next)
306        itr = CronParser('* * * *', base)
307        self.assertRaises(ValueError, itr.next)
308
309    def testLastDayOfMonth(self):
310        base = datetime.datetime(2015, 9, 4)
311        itr = CronParser('0 0 L * *', base)
312        n1 = itr.next()
313        self.assertEqual(n1.month, 9)
314        self.assertEqual(n1.day, 30)
315        n2 = itr.next()
316        self.assertEqual(n2.month, 10)
317        self.assertEqual(n2.day, 31)
318        n3 = itr.next()
319        self.assertEqual(n3.month, 11)
320        self.assertEqual(n3.day, 30)
321        n4 = itr.next()
322        self.assertEqual(n4.month, 12)
323        self.assertEqual(n4.day, 31)
324
325        base = datetime.datetime(1996, 2, 27)
326        itr = CronParser('0 0 L * *', base)
327        n1 = itr.next()
328        self.assertEqual(n1.day, 29)
329        self.assertEqual(n1.month, 2)
330        n2 = itr.next()
331        self.assertEqual(n2.day, 31)
332        self.assertEqual(n2.month, 3)
333
334    def testSpecialExpr(self):
335        base = datetime.datetime(2000, 1, 1)
336        itr = CronParser('@yearly', base)
337        n1 = itr.next()
338        self.assertEqual(n1.day, 1)
339        self.assertEqual(n1.month, 1)
340        self.assertEqual(n1.year, base.year + 1)
341        self.assertEqual(n1.hour, 0)
342        self.assertEqual(n1.minute, 0)
343
344        itr = CronParser('@annually', base)
345        n1 = itr.next()
346        self.assertEqual(n1.day, 1)
347        self.assertEqual(n1.month, 1)
348        self.assertEqual(n1.year, base.year + 1)
349        self.assertEqual(n1.hour, 0)
350        self.assertEqual(n1.minute, 0)
351
352        itr = CronParser('@monthly', base)
353        n1 = itr.next()
354        self.assertEqual(n1.day, 1)
355        self.assertEqual(n1.month, base.month + 1)
356        self.assertEqual(n1.year, base.year)
357        self.assertEqual(n1.hour, 0)
358        self.assertEqual(n1.minute, 0)
359
360        itr = CronParser('@weekly', base)
361        n1 = itr.next()
362        self.assertEqual(n1.day, 2)
363        self.assertEqual(n1.month, base.month)
364        self.assertEqual(n1.year, base.year)
365        self.assertEqual(n1.hour, 0)
366        self.assertEqual(n1.minute, 0)
367        n2 = itr.next()
368        self.assertEqual(n2.day, 9)
369        self.assertEqual(n2.month, base.month)
370        self.assertEqual(n2.year, base.year)
371        self.assertEqual(n2.hour, 0)
372        self.assertEqual(n2.minute, 0)
373        n3 = itr.next()
374        self.assertEqual(n3.day, 16)
375        self.assertEqual(n3.month, base.month)
376        self.assertEqual(n3.year, base.year)
377        self.assertEqual(n3.hour, 0)
378        self.assertEqual(n3.minute, 0)
379
380        itr = CronParser('@daily', base)
381        n1 = itr.next()
382        self.assertEqual(n1.day, 2)
383        self.assertEqual(n1.month, base.month)
384        self.assertEqual(n1.year, base.year)
385        self.assertEqual(n1.hour, 0)
386        self.assertEqual(n1.minute, 0)
387
388        itr = CronParser('@midnight', base)
389        n1 = itr.next()
390        self.assertEqual(n1.day, 2)
391        self.assertEqual(n1.month, base.month)
392        self.assertEqual(n1.year, base.year)
393        self.assertEqual(n1.hour, 0)
394        self.assertEqual(n1.minute, 0)
395
396        itr = CronParser('@hourly', base)
397        n1 = itr.next()
398        self.assertEqual(n1.day, 1)
399        self.assertEqual(n1.month, base.month)
400        self.assertEqual(n1.year, base.year)
401        self.assertEqual(n1.hour, 1)
402        self.assertEqual(n1.minute, 0)
403
404
405
406class TestsForJobGraph(BaseTestScheduler):
407
408    def testJobGraph(self):
409        s = Scheduler(self.db)
410        myjob = JobGraph(self.db, 'job_1')
411        fname = 'foo'
412        # We have a few items to wear, and there's an "order" to respect...
413        # Items are: watch, jacket, shirt, tie, pants, undershorts, belt, shoes, socks
414        # Now, we can't put on the tie without wearing the shirt first, etc...
415        watch = s.queue_task(fname, task_name='watch')
416        jacket = s.queue_task(fname, task_name='jacket')
417        shirt = s.queue_task(fname, task_name='shirt')
418        tie = s.queue_task(fname, task_name='tie')
419        pants = s.queue_task(fname, task_name='pants')
420        undershorts = s.queue_task(fname, task_name='undershorts')
421        belt = s.queue_task(fname, task_name='belt')
422        shoes = s.queue_task(fname, task_name='shoes')
423        socks = s.queue_task(fname, task_name='socks')
424        # before the tie, comes the shirt
425        myjob.add_deps(tie.id, shirt.id)
426        # before the belt too comes the shirt
427        myjob.add_deps(belt.id, shirt.id)
428        # before the jacket, comes the tie
429        myjob.add_deps(jacket.id, tie.id)
430        # before the belt, come the pants
431        myjob.add_deps(belt.id, pants.id)
432        # before the shoes, comes the pants
433        myjob.add_deps(shoes.id, pants.id)
434        # before the pants, comes the undershorts
435        myjob.add_deps(pants.id, undershorts.id)
436        # before the shoes, comes the undershorts
437        myjob.add_deps(shoes.id, undershorts.id)
438        # before the jacket, comes the belt
439        myjob.add_deps(jacket.id, belt.id)
440        # before the shoes, comes the socks
441        myjob.add_deps(shoes.id, socks.id)
442
443        ## results in the following topological sort
444        # 9,3,6 --> 4,5 --> 8,7 --> 2
445        # socks, shirt, undershorts
446        # tie, pants
447        # shoes, belt
448        # jacket
449        known_toposort = [
450            set([socks.id, shirt.id, undershorts.id]),
451            set([tie.id, pants.id]),
452            set([shoes.id, belt.id]),
453            set([jacket.id])
454        ]
455        toposort = myjob.validate('job_1')
456        self.assertEqual(toposort, known_toposort)
457        # add a cyclic dependency, jacket to undershorts
458        myjob.add_deps(undershorts.id, jacket.id)
459        # no exceptions raised, but result None
460        self.assertEqual(myjob.validate('job_1'), None)
461
462    def testJobGraphFailing(self):
463        s = Scheduler(self.db)
464        myjob = JobGraph(self.db, 'job_1')
465        fname = 'foo'
466        # We have a few items to wear, and there's an "order" to respect...
467        # Items are: watch, jacket, shirt, tie, pants, undershorts, belt, shoes, socks
468        # Now, we can't put on the tie without wearing the shirt first, etc...
469        watch = s.queue_task(fname, task_name='watch')
470        jacket = s.queue_task(fname, task_name='jacket')
471        shirt = s.queue_task(fname, task_name='shirt')
472        tie = s.queue_task(fname, task_name='tie')
473        pants = s.queue_task(fname, task_name='pants')
474        undershorts = s.queue_task(fname, task_name='undershorts')
475        belt = s.queue_task(fname, task_name='belt')
476        shoes = s.queue_task(fname, task_name='shoes')
477        socks = s.queue_task(fname, task_name='socks')
478        # before the tie, comes the shirt
479        myjob.add_deps(tie.id, shirt.id)
480        # before the belt too comes the shirt
481        myjob.add_deps(belt.id, shirt.id)
482        # before the jacket, comes the tie
483        myjob.add_deps(jacket.id, tie.id)
484        # before the belt, come the pants
485        myjob.add_deps(belt.id, pants.id)
486        # before the shoes, comes the pants
487        myjob.add_deps(shoes.id, pants.id)
488        # before the pants, comes the undershorts
489        myjob.add_deps(pants.id, undershorts.id)
490        # before the shoes, comes the undershorts
491        myjob.add_deps(shoes.id, undershorts.id)
492        # before the jacket, comes the belt
493        myjob.add_deps(jacket.id, belt.id)
494        # before the shoes, comes the socks
495        myjob.add_deps(shoes.id, socks.id)
496        # add a cyclic dependency, jacket to undershorts
497        myjob.add_deps(undershorts.id, jacket.id)
498        # no exceptions raised, but result None
499        self.assertEqual(myjob.validate('job_1'), None)
500        # and no deps added
501        deps_inserted = self.db(self.db.scheduler_task_deps.id>0).count()
502        self.assertEqual(deps_inserted, 0)
503
504    def testJobGraphDifferentJobs(self):
505        s = Scheduler(self.db)
506        myjob1 = JobGraph(self.db, 'job_1')
507        myjob2 = JobGraph(self.db, 'job_2')
508        fname = 'foo'
509        # We have a few items to wear, and there's an "order" to respect...
510        # Items are: watch, jacket, shirt, tie, pants, undershorts, belt, shoes, socks
511        # Now, we can't put on the tie without wearing the shirt first, etc...
512        watch = s.queue_task(fname, task_name='watch')
513        jacket = s.queue_task(fname, task_name='jacket')
514        shirt = s.queue_task(fname, task_name='shirt')
515        tie = s.queue_task(fname, task_name='tie')
516        pants = s.queue_task(fname, task_name='pants')
517        undershorts = s.queue_task(fname, task_name='undershorts')
518        belt = s.queue_task(fname, task_name='belt')
519        shoes = s.queue_task(fname, task_name='shoes')
520        socks = s.queue_task(fname, task_name='socks')
521        # before the tie, comes the shirt
522        myjob1.add_deps(tie.id, shirt.id)
523        # before the belt too comes the shirt
524        myjob1.add_deps(belt.id, shirt.id)
525        # before the jacket, comes the tie
526        myjob1.add_deps(jacket.id, tie.id)
527        # before the belt, come the pants
528        myjob1.add_deps(belt.id, pants.id)
529        # before the shoes, comes the pants
530        myjob2.add_deps(shoes.id, pants.id)
531        # before the pants, comes the undershorts
532        myjob2.add_deps(pants.id, undershorts.id)
533        # before the shoes, comes the undershorts
534        myjob2.add_deps(shoes.id, undershorts.id)
535        # before the jacket, comes the belt
536        myjob2.add_deps(jacket.id, belt.id)
537        # before the shoes, comes the socks
538        myjob2.add_deps(shoes.id, socks.id)
539        # every job by itself can be completed
540        self.assertNotEqual(myjob1.validate('job_1'), None)
541        self.assertNotEqual(myjob1.validate('job_2'), None)
542        # and, implicitly, every queued task can be too
543        self.assertNotEqual(myjob1.validate(), None)
544        # add a cyclic dependency, jacket to undershorts
545        myjob2.add_deps(undershorts.id, jacket.id)
546        # every job can still be completed by itself
547        self.assertNotEqual(myjob1.validate('job_1'), None)
548        self.assertNotEqual(myjob1.validate('job_2'), None)
549        # but trying to see if every task will ever be completed fails
550        self.assertEqual(myjob2.validate(), None)
551
552
553class TestsForSchedulerAPIs(BaseTestScheduler):
554
555    def testQueue_Task(self):
556
557        def isnotqueued(result):
558            self.assertEqual(result.id, None)
559            self.assertEqual(result.uuid, None)
560            self.assertEqual(len(list(result.errors.keys())) > 0, True)
561
562        def isqueued(result):
563            self.assertNotEqual(result.id, None)
564            self.assertNotEqual(result.uuid, None)
565            self.assertEqual(len(list(result.errors.keys())), 0)
566
567        s = Scheduler(self.db)
568        fname = 'foo'
569        watch = s.queue_task(fname, task_name='watch')
570        # queuing a task returns id, errors, uuid
571        self.assertEqual(set(watch.keys()), set(['id', 'uuid', 'errors']))
572        # queueing nothing isn't allowed
573        self.assertRaises(TypeError, s.queue_task, *[])
574        # passing pargs and pvars wrongly
575        # # pargs as dict
576        isnotqueued(s.queue_task(fname, dict(a=1), dict(b=1)))
577        # # pvars as list
578        isnotqueued(s.queue_task(fname, ['foo', 'bar'], ['foo', 'bar']))
579        # two tasks with the same uuid won't be there
580        isqueued(s.queue_task(fname, uuid='a'))
581        isnotqueued(s.queue_task(fname, uuid='a'))
582        # # #FIXME add here every parameter
583
584    def testTask_Status(self):
585        s = Scheduler(self.db)
586        fname = 'foo'
587        watch = s.queue_task(fname, task_name='watch')
588        # fetch status by id
589        by_id = s.task_status(watch.id)
590        # fetch status by uuid
591        by_uuid = s.task_status(watch.uuid)
592        # fetch status by query
593        by_query = s.task_status(self.db.scheduler_task.function_name == 'foo')
594        self.assertEqual(by_id, by_uuid)
595        self.assertEqual(by_id, by_query)
596        # fetch status by anything else throws
597        self.assertRaises(SyntaxError, s.task_status, *[[1, 2]])
598        # adding output returns the joined set, plus "result"
599        rtn = s.task_status(watch.id, output=True)
600        self.assertEqual(set(rtn.keys()), set(['scheduler_run', 'scheduler_task', 'result']))
601
602
603class testForSchedulerRunnerBase(BaseTestScheduler):
604
605    def inner_teardown(self):
606        from gluon import current
607        fdest = os.path.join(current.request.folder, 'models', 'scheduler.py')
608        os.unlink(fdest)
609        additional_files = [
610            os.path.join(current.request.folder, 'private', 'demo8.pholder'),
611            os.path.join(current.request.folder, 'views', 'issue_1485_2.html'),
612        ]
613        for f in additional_files:
614            try:
615                os.unlink(f)
616            except:
617                pass
618
619    def writeview(self, content, dest=None):
620        from gluon import current
621        fdest = os.path.join(current.request.folder, 'views', dest)
622        with open(fdest, 'w') as q:
623            q.write(content)
624
625    def writefunction(self, content, initlines=None):
626        from gluon import current
627        fdest = os.path.join(current.request.folder, 'models', 'scheduler.py')
628        if initlines is None:
629            initlines = """
630import os
631import time
632from gluon.scheduler import Scheduler
633db_dal = os.path.abspath(os.path.join(request.folder, 'databases', 'dummy2.db'))
634sched_dal = DAL('sqlite://%s' % db_dal, folder=os.path.dirname(db_dal))
635sched = Scheduler(sched_dal, max_empty_runs=15, migrate=False, heartbeat=1)
636def termination():
637    sched.terminate()
638            """
639        with open(fdest, 'w') as q:
640            q.write(initlines)
641            q.write(content)
642
643    def exec_sched(self):
644        import subprocess
645        call_args = [sys.executable, 'web2py.py', '--no_banner', '-D', 'INFO','-K', test_app_name]
646        ret = subprocess.call(call_args, env=dict(os.environ))
647        return ret
648
649    def fetch_results(self, sched, task):
650        info = sched.task_status(task.id)
651        task_runs = self.db(self.db.scheduler_run.task_id == task.id).select()
652        return info, task_runs
653
654    def exec_asserts(self, stmts, tag):
655        for stmt in stmts:
656            self.assertEqual(stmt[1], True, msg="%s - %s" % (tag, stmt[0]))
657
658
659class TestsForSchedulerRunner(testForSchedulerRunnerBase):
660
661    def testRepeats_and_Expired_and_Prio(self):
662        s = Scheduler(self.db)
663        repeats = s.queue_task('demo1', ['a', 'b'], dict(c=1, d=2), repeats=2, period=5)
664        a_while_ago = datetime.datetime.now() - datetime.timedelta(seconds=60)
665        expired = s.queue_task('demo4', stop_time=a_while_ago)
666        prio1 = s.queue_task('demo1', ['scheduled_first'])
667        prio2 = s.queue_task('demo1', ['scheduled_second'], next_run_time=a_while_ago)
668        self.db.commit()
669        self.writefunction(r"""
670def demo1(*args,**vars):
671    print('you passed args=%s and vars=%s' % (args, vars))
672    return args[0]
673
674def demo4():
675    time.sleep(15)
676    print("I'm printing something")
677    return dict(a=1, b=2)
678""")
679        ret = self.exec_sched()
680        self.assertEqual(ret, 0)
681        # repeats check
682        task, task_run = self.fetch_results(s, repeats)
683        res = [
684            ("task status completed", task.status == 'COMPLETED'),
685            ("task times_run is 2", task.times_run == 2),
686            ("task ran 2 times only", len(task_run) == 2),
687            ("scheduler_run records are COMPLETED ", (task_run[0].status == task_run[1].status == 'COMPLETED')),
688            ("period is respected", (task_run[1].start_time >= task_run[0].start_time + datetime.timedelta(seconds=task.period)))
689        ]
690        self.exec_asserts(res, 'REPEATS')
691
692        # expired check
693        task, task_run = self.fetch_results(s, expired)
694        res = [
695            ("task status expired", task.status == 'EXPIRED'),
696            ("task times_run is 0", task.times_run == 0),
697            ("task didn't run at all", len(task_run) == 0)
698        ]
699        self.exec_asserts(res, 'EXPIRATION')
700
701        # prio check
702        task1 = s.task_status(prio1.id, output=True)
703        task2 = s.task_status(prio2.id, output=True)
704        res = [
705            ("tasks status completed", task1.scheduler_task.status == task2.scheduler_task.status == 'COMPLETED'),
706            ("priority2 was executed before priority1" , task1.scheduler_run.id > task2.scheduler_run.id)
707        ]
708        self.exec_asserts(res, 'PRIORITY')
709
710    def testNoReturn_and_Timeout_and_Progress(self):
711        s = Scheduler(self.db)
712        noret1 = s.queue_task('demo5')
713        noret2 = s.queue_task('demo3')
714        timeout1 = s.queue_task('demo4', timeout=5)
715        timeout2 = s.queue_task('demo4')
716        progress = s.queue_task('demo6', sync_output=2)
717        termination = s.queue_task('termination')
718        self.db.commit()
719        self.writefunction(r"""
720def demo3():
721    time.sleep(3)
722    print(1/0)
723    return None
724
725def demo4():
726    time.sleep(15)
727    print("I'm printing something")
728    return dict(a=1, b=2)
729
730def demo5():
731    time.sleep(3)
732    print("I'm printing something")
733    rtn = dict(a=1, b=2)
734
735def demo6():
736    time.sleep(5)
737    print('50%')
738    time.sleep(5)
739    print('!clear!100%')
740    return 1
741""")
742        ret = self.exec_sched()
743        self.assertEqual(ret, 0)
744        # noreturn check
745        task1, task_run1 = self.fetch_results(s, noret1)
746        task2, task_run2 = self.fetch_results(s, noret2)
747        res = [
748            ("tasks no_returns1 completed", task1.status == 'COMPLETED'),
749            ("tasks no_returns2 failed", task2.status == 'FAILED'),
750            ("no_returns1 doesn't have a scheduler_run record", len(task_run1) == 0),
751            ("no_returns2 has a scheduler_run record FAILED", (len(task_run2) == 1 and task_run2[0].status == 'FAILED')),
752        ]
753        self.exec_asserts(res, 'NO_RETURN')
754
755        # timeout check
756        task1 = s.task_status(timeout1.id, output=True)
757        task2 = s.task_status(timeout2.id, output=True)
758        res = [
759            ("tasks timeouts1 timeoutted", task1.scheduler_task.status == 'TIMEOUT'),
760            ("tasks timeouts2 completed", task2.scheduler_task.status == 'COMPLETED')
761        ]
762        self.exec_asserts(res, 'TIMEOUT')
763
764        # progress check
765        task1 = s.task_status(progress.id, output=True)
766        res = [
767            ("tasks percentages completed", task1.scheduler_task.status == 'COMPLETED'),
768            ("output contains only 100%", task1.scheduler_run.run_output.strip() == "100%")
769        ]
770        self.exec_asserts(res, 'PROGRESS')
771
772    def testDrift_and_env_and_immediate(self):
773        s = Scheduler(self.db)
774        immediate = s.queue_task('demo1', ['a', 'b'], dict(c=1, d=2), immediate=True)
775        env = s.queue_task('demo7')
776        drift = s.queue_task('demo1', ['a', 'b'], dict(c=1, d=2), period=93, prevent_drift=True)
777        termination = s.queue_task('termination')
778        self.db.commit()
779        self.writefunction(r"""
780def demo1(*args,**vars):
781    print('you passed args=%s and vars=%s' % (args, vars))
782    return args[0]
783import random
784def demo7():
785    time.sleep(random.randint(1,5))
786    print(W2P_TASK, request.now)
787    return W2P_TASK.id, W2P_TASK.uuid, W2P_TASK.run_id
788""")
789        ret = self.exec_sched()
790        self.assertEqual(ret, 0)
791        # immediate check, can only check that nothing breaks
792        task1 = s.task_status(immediate.id)
793        res = [
794            ("tasks status completed", task1.status == 'COMPLETED'),
795        ]
796        self.exec_asserts(res, 'IMMEDIATE')
797
798        # drift check
799        task, task_run = self.fetch_results(s, drift)
800        res = [
801            ("task status completed", task.status == 'COMPLETED'),
802            ("next_run_time is exactly start_time + period", (task.next_run_time == task.start_time + datetime.timedelta(seconds=task.period)))
803        ]
804        self.exec_asserts(res, 'DRIFT')
805
806        # env check
807        task1 = s.task_status(env.id, output=True)
808        res = [
809            ("task %s returned W2P_TASK correctly" % (task1.scheduler_task.id),  task1.result == [task1.scheduler_task.id, task1.scheduler_task.uuid, task1.scheduler_run.id]),
810        ]
811        self.exec_asserts(res, 'ENV')
812
813
814    def testRetryFailed(self):
815        s = Scheduler(self.db)
816        failed = s.queue_task('demo2', retry_failed=1, period=1)
817        failed_consecutive = s.queue_task('demo8', retry_failed=2, repeats=2, period=1)
818        self.db.commit()
819        self.writefunction(r"""
820def demo2():
821    1/0
822
823def demo8():
824    placeholder = os.path.join(request.folder, 'private', 'demo8.pholder')
825    with open(placeholder, 'a') as g:
826        g.write('\nplaceholder for demo8 created')
827    num_of_lines = 0
828    with open(placeholder) as f:
829        num_of_lines = len([a for a in f.read().split('\n') if a])
830    print('number of lines', num_of_lines)
831    if num_of_lines <= 2:
832       1/0
833    else:
834        os.unlink(placeholder)
835    return 1
836""")
837        ret = self.exec_sched()
838        # process finished just fine
839        self.assertEqual(ret, 0)
840        # failed - checks
841        task, task_run = self.fetch_results(s, failed)
842        res = [
843            ("task status failed", task.status == 'FAILED'),
844            ("task times_run is 0", task.times_run == 0),
845            ("task times_failed is 2", task.times_failed == 2),
846            ("task ran 2 times only", len(task_run) == 2),
847            ("scheduler_run records are FAILED", (task_run[0].status == task_run[1].status == 'FAILED')),
848            ("period is respected", (task_run[1].start_time >= task_run[0].start_time + datetime.timedelta(seconds=task.period)))
849        ]
850        self.exec_asserts(res, 'FAILED')
851
852        # failed consecutive - checks
853        task, task_run = self.fetch_results(s, failed_consecutive)
854        res = [
855            ("task status completed", task.status == 'COMPLETED'),
856            ("task times_run is 2", task.times_run == 2),
857            ("task times_failed is 0", task.times_failed == 0),
858            ("task ran 6 times", len(task_run) == 6),
859            ("scheduler_run records for COMPLETED is 2", len([run.status for run in task_run if run.status == 'COMPLETED']) == 2),
860            ("scheduler_run records for FAILED is 4", len([run.status for run in task_run if run.status == 'FAILED']) == 4),
861        ]
862        self.exec_asserts(res, 'FAILED_CONSECUTIVE')
863
864    def testRegressions(self):
865        s = Scheduler(self.db)
866        huge_result = s.queue_task('demo10', retry_failed=1, period=1)
867        issue_1485 = s.queue_task('issue_1485')
868        termination = s.queue_task('termination')
869        self.db.commit()
870        self.writefunction(r"""
871def demo10():
872    res = 'a' * 99999
873    return dict(res=res)
874
875def issue_1485():
876    return response.render('issue_1485.html', dict(variable='abc'))
877""")
878        self.writeview(r"""<span>{{=variable}}</span>""", 'issue_1485.html')
879        ret = self.exec_sched()
880        # process finished just fine
881        self.assertEqual(ret, 0)
882        # huge_result - checks
883        task_huge = s.task_status(huge_result.id, output=True)
884        res = [
885            ("task status completed", task_huge.scheduler_task.status == 'COMPLETED'),
886            ("task times_run is 1", task_huge.scheduler_task.times_run == 1),
887            ("result is the correct one", task_huge.result == dict(res='a' * 99999))
888        ]
889        self.exec_asserts(res, 'HUGE_RESULT')
890
891        task_issue_1485 = s.task_status(issue_1485.id, output=True)
892        res = [
893            ("task status completed", task_issue_1485.scheduler_task.status == 'COMPLETED'),
894            ("task times_run is 1", task_issue_1485.scheduler_task.times_run == 1),
895            ("result is the correct one", task_issue_1485.result == '<span>abc</span>')
896        ]
897        self.exec_asserts(res, 'issue_1485')
898
899
900if __name__ == '__main__':
901    unittest.main()
Note: See TracBrowser for help on using the repository browser.