source: OpenRLabs-Git/deploy/rlabs-docker/web2py-rlabs/gluon/scheduler.py

main
Last change on this file was 42bd667, checked in by David Fuertes <dfuertes@…>, 4 years ago

Historial Limpio

  • Property mode set to 100755
File size: 68.4 KB
Line 
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3# vim: set ts=4 sw=4 et ai:
4"""
5| This file is part of the web2py Web Framework
6| Copyrighted by Massimo Di Pierro <mdipierro@cs.depaul.edu>
7| License: LGPLv3 (http://www.gnu.org/licenses/lgpl.html)
8
9Background processes made simple
10---------------------------------
11"""
12
13from __future__ import print_function
14
15import socket
16import os
17import logging
18import types
19from functools import reduce
20import datetime
21import re
22import sys
23from json import loads, dumps
24import tempfile
25import traceback
26import threading
27import multiprocessing
28import time
29import signal
30
31from gluon import DAL, Field, IS_NOT_EMPTY, IS_IN_SET, IS_NOT_IN_DB, IS_EMPTY_OR
32from gluon import IS_INT_IN_RANGE, IS_DATETIME, IS_IN_DB
33from gluon.utils import web2py_uuid
34from gluon._compat import Queue, long, iteritems, PY2, to_bytes, string_types, integer_types
35from gluon.storage import Storage
36
37USAGE = """
38## Example
39
40For any existing application myapp
41
42Create File: myapp/models/scheduler.py ======
43from gluon.scheduler import Scheduler
44
45def demo1(*args, **vars):
46    print('you passed args=%s and vars=%s' % (args, vars))
47    return 'done!'
48
49def demo2():
50    1/0
51
52scheduler = Scheduler(db, dict(demo1=demo1, demo2=demo2))
53## run worker nodes with:
54
55   cd web2py
56   python web2py.py -K myapp
57or
58   python gluon/scheduler.py -u sqlite://storage.sqlite \
59                             -f applications/myapp/databases/ \
60                             -t mytasks.py
61(-h for info)
62python scheduler.py -h
63
64## schedule jobs using
65http://127.0.0.1:8000/myapp/appadmin/insert/db/scheduler_task
66
67## monitor scheduled jobs
68http://127.0.0.1:8000/myapp/appadmin/select/db?query=db.scheduler_task.id
69
70## view completed jobs
71http://127.0.0.1:8000/myapp/appadmin/select/db?query=db.scheduler_run.id
72
73## view workers
74http://127.0.0.1:8000/myapp/appadmin/select/db?query=db.scheduler_worker.id
75
76"""
77
78IDENTIFIER = "%s#%s" % (socket.gethostname(), os.getpid())
79
80logger = logging.getLogger('web2py.scheduler.%s' % IDENTIFIER)
81
82QUEUED = 'QUEUED'
83ASSIGNED = 'ASSIGNED'
84RUNNING = 'RUNNING'
85COMPLETED = 'COMPLETED'
86FAILED = 'FAILED'
87TIMEOUT = 'TIMEOUT'
88STOPPED = 'STOPPED'
89ACTIVE = 'ACTIVE'
90TERMINATE = 'TERMINATE'
91DISABLED = 'DISABLED'
92KILL = 'KILL'
93PICK = 'PICK'
94STOP_TASK = 'STOP_TASK'
95EXPIRED = 'EXPIRED'
96SECONDS = 1
97HEARTBEAT = 3 * SECONDS
98MAXHIBERNATION = 10
99CLEAROUT = '!clear!'
100RESULTINFILE = 'result_in_file:'
101
102CALLABLETYPES = (types.LambdaType, types.FunctionType,
103                 types.BuiltinFunctionType,
104                 types.MethodType, types.BuiltinMethodType)
105
106
107class Task(object):
108    """Defines a "task" object that gets passed from the main thread to the
109    executor's one
110    """
111    def __init__(self, app, function, timeout, args='[]', vars='{}', **kwargs):
112        logger.debug(' new task allocated: %s.%s', app, function)
113        self.app = app
114        self.function = function
115        self.timeout = timeout
116        self.args = args  # json
117        self.vars = vars  # json
118        self.__dict__.update(kwargs)
119
120    def __str__(self):
121        return '<Task: %s>' % self.function
122
123
124class TaskReport(object):
125    """Defines a "task report" object that gets passed from the executor's
126    thread to the main one
127    """
128    def __init__(self, status, result=None, output=None, tb=None):
129        logger.debug('    new task report: %s', status)
130        if tb:
131            logger.debug('   traceback: %s', tb)
132        else:
133            logger.debug('   result: %s', result)
134        self.status = status
135        self.result = result
136        self.output = output
137        self.tb = tb
138
139    def __str__(self):
140        return '<TaskReport: %s>' % self.status
141
142
143class JobGraph(object):
144    """Experimental: dependencies amongs tasks."""
145
146    def __init__(self, db, job_name):
147        self.job_name = job_name or 'job_0'
148        self.db = db
149
150    def add_deps(self, task_parent, task_child):
151        """Create a dependency between task_parent and task_child."""
152        self.db.scheduler_task_deps.insert(task_parent=task_parent,
153                                           task_child=task_child,
154                                           job_name=self.job_name)
155
156    def validate(self, job_name=None):
157        """Validate if all tasks job_name can be completed.
158
159        Checks if there are no mutual dependencies among tasks.
160        Commits at the end if successfull, or it rollbacks the entire
161        transaction. Handle with care!
162        """
163        db = self.db
164        sd = db.scheduler_task_deps
165        if job_name:
166            q = sd.job_name == job_name
167        else:
168            q = sd.id
169
170        edges = db(q).select()
171        nested_dict = {}
172        for row in edges:
173            k = row.task_parent
174            if k in nested_dict:
175                nested_dict[k].add(row.task_child)
176            else:
177                nested_dict[k] = set((row.task_child,))
178        try:
179            rtn = []
180            for k, v in nested_dict.items():
181                v.discard(k)  # Ignore self dependencies
182            extra_items_in_deps = reduce(set.union, nested_dict.values()) - set(nested_dict.keys())
183            nested_dict.update(dict((item, set()) for item in extra_items_in_deps))
184            while True:
185                ordered = set(item for item, dep in nested_dict.items() if not dep)
186                if not ordered:
187                    break
188                rtn.append(ordered)
189                nested_dict = dict(
190                    (item, (dep - ordered)) for item, dep in nested_dict.items()
191                    if item not in ordered
192                )
193            assert not nested_dict, "A cyclic dependency exists amongst %r" % nested_dict
194            db.commit()
195            return rtn
196        except Exception:
197            db.rollback()
198            return None
199
200
201class CronParser(object):
202
203    def __init__(self, cronline, base=None):
204        self.cronline = cronline
205        self.sched = base or datetime.datetime.now()
206        self.task = None
207
208    @staticmethod
209    def _rangetolist(s, period='min'):
210        if s.startswith('*'):
211            if period == 'min':
212                s = s.replace('*', '0-59', 1)
213            elif period == 'hr':
214                s = s.replace('*', '0-23', 1)
215            elif period == 'dom':
216                s = s.replace('*', '1-31', 1)
217            elif period == 'mon':
218                s = s.replace('*', '1-12', 1)
219            elif period == 'dow':
220                s = s.replace('*', '0-6', 1)
221        match = re.match(r'(\d+)-(\d+)/(\d+)', s)
222        if match:
223            max_ = int(match.group(2)) + 1
224            step_ = int(match.group(3))
225        else:
226            match = re.match(r'(\d+)/(\d+)', s)
227            if match:
228                ranges_max = dict(min=59, hr=23, mon=12, dom=31, dow=7)
229                max_ = ranges_max[period] + 1
230                step_ = int(match.group(2))
231        if match:
232            min_ = int(match.group(1))
233            retval = list(range(min_, max_, step_))
234        else:
235            retval = []
236        return retval
237
238    @staticmethod
239    def _sanitycheck(values, period):
240        if period == 'min':
241            check = all(0 <= i <= 59 for i in values)
242        elif period == 'hr':
243            check = all(0 <= i <= 23 for i in values)
244        elif period == 'dom':
245            domrange = list(range(1, 32)) + ['l']
246            check = all(i in domrange for i in values)
247        elif period == 'mon':
248            check = all(1 <= i <= 12 for i in values)
249        elif period == 'dow':
250            check = all(0 <= i <= 7 for i in values)
251        return check
252
253    def _parse(self):
254        line = self.cronline.lower()
255        task = {}
256        if line.startswith('@yearly'):
257            line = line.replace('@yearly', '0 0 1 1 *')
258        elif line.startswith('@annually'):
259            line = line.replace('@annually', '0 0 1 1 *')
260        elif line.startswith('@monthly'):
261            line = line.replace('@monthly', '0 0 1 * *')
262        elif line.startswith('@weekly'):
263            line = line.replace('@weekly', '0 0 * * 0')
264        elif line.startswith('@daily'):
265            line = line.replace('@daily', '0 0 * * *')
266        elif line.startswith('@midnight'):
267            line = line.replace('@midnight', '0 0 * * *')
268        elif line.startswith('@hourly'):
269            line = line.replace('@hourly', '0 * * * *')
270        params = line.strip().split()
271        if len(params) < 5:
272            raise ValueError('Invalid cron line (too short)')
273        elif len(params) > 5:
274            raise ValueError('Invalid cron line (too long)')
275        daysofweek = {'sun': 0, 'mon': 1, 'tue': 2, 'wed': 3, 'thu': 4,
276                      'fri': 5, 'sat': 6}
277        monthsofyear = {'jan': 1, 'feb': 2, 'mar': 3, 'apr': 4, 'may': 5,
278                        'jun': 6, 'jul': 7, 'aug': 8, 'sep': 9, 'oct': 10,
279                        'nov': 11, 'dec': 12}
280        for (s, i) in zip(params, ('min', 'hr', 'dom', 'mon', 'dow')):
281            if s != '*':
282                task[i] = []
283                vals = s.split(',')
284                for val in vals:
285                    if i == 'dow':
286                        refdict = daysofweek
287                    elif i == 'mon':
288                        refdict = monthsofyear
289                    if i in ('dow', 'mon') and '-' in val and '/' not in val:
290                        isnum = val.split('-')[0].isdigit()
291                        if isnum:
292                            val = '%s/1' % val
293                        else:
294                            val = '-'.join([str(refdict.get(v, ''))
295                                           for v in val.split('-')])
296                    if '-' in val and '/' not in val:
297                        val = '%s/1' % val
298                    if '/' in val:
299                        task[i] += self._rangetolist(val, i)
300                    elif val.isdigit():
301                        task[i].append(int(val))
302                    elif i in ('dow', 'mon'):
303                        if val in refdict:
304                            task[i].append(refdict[val])
305                    elif i == 'dom' and val == 'l':
306                        task[i].append(val)
307                if not task[i]:
308                    raise ValueError('Invalid cron value (%s)' % s)
309                if not self._sanitycheck(task[i], i):
310                    raise ValueError('Invalid cron value (%s)' % s)
311                task[i] = sorted(task[i])
312        self.task = task
313
314    @staticmethod
315    def _get_next_dow(sched, task):
316        task_dow = [a % 7 for a in task['dow']]
317        while sched.isoweekday() % 7 not in task_dow:
318            sched += datetime.timedelta(days=1)
319        return sched
320
321    @staticmethod
322    def _get_next_dom(sched, task):
323        if task['dom'] == ['l']:
324            # instead of calendar.isleap
325            try:
326                last_feb = 29
327                datetime.date(sched.year, 2, last_feb)
328            except ValueError:
329                last_feb = 28
330            lastdayofmonth = [
331                31, last_feb, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31
332            ]
333            task_dom = [lastdayofmonth[sched.month - 1]]
334        else:
335            task_dom = task['dom']
336        while sched.day not in task_dom:
337            sched += datetime.timedelta(days=1)
338        return sched
339
340    @staticmethod
341    def _get_next_mon(sched, task):
342        while sched.month not in task['mon']:
343            if sched.month < 12:
344                sched = sched.replace(month=sched.month + 1)
345            else:
346                sched = sched.replace(month=1, year=sched.year + 1)
347        return sched
348
349    @staticmethod
350    def _getnext_hhmm(sched, task, add_to=True):
351        if add_to:
352            sched += datetime.timedelta(minutes=1)
353        if 'min' in task:
354            while sched.minute not in task['min']:
355                sched += datetime.timedelta(minutes=1)
356        if 'hr' in task and sched.hour not in task['hr']:
357            while sched.hour not in task['hr']:
358                sched += datetime.timedelta(hours=1)
359        return sched
360
361    def _getnext_date(self, sched, task):
362        if 'dow' in task and 'dom' in task:
363            dow = self._get_next_dow(sched, task)
364            dom = self._get_next_dom(sched, task)
365            sched = min(dow, dom)
366        elif 'dow' in task:
367            sched = self._get_next_dow(sched, task)
368        elif 'dom' in task:
369            sched = self._get_next_dom(sched, task)
370        if 'mon' in task:
371            sched = self._get_next_mon(sched, task)
372        return sched.replace(hour=0, minute=0)
373
374    def next(self):
375        """Get next date according to specs."""
376        if not self.task:
377            self._parse()
378        task = self.task
379        sched = self.sched
380        x = 0
381        while x < 1000:  # avoid potential max recursions
382            x += 1
383            try:
384                next_date = self._getnext_date(sched, task)
385            except (ValueError, OverflowError) as e:
386                raise ValueError('Invalid cron expression (%s)' % e)
387            if next_date.date() > self.sched.date():
388                # we rolled date, check for valid hhmm
389                sched = self._getnext_hhmm(next_date, task, False)
390                break
391            else:
392                # same date, get next hhmm
393                sched_time = self._getnext_hhmm(sched, task, True)
394                if sched_time.date() > sched.date():
395                    # we rolled date again :(
396                    sched = sched_time
397                else:
398                    sched = sched_time
399                    break
400        else:
401            raise ValueError('Potential bug found, please submit your '
402                             'cron expression to the authors')
403        self.sched = sched
404        return sched
405
406    def __iter__(self):
407        """Support iteration."""
408        return self
409
410    __next__ = next
411
412
413# the two functions below deal with simplejson decoding as unicode,
414# esp for the dict decode and subsequent usage as function Keyword arguments
415# unicode variable names won't work!
416# borrowed from http://stackoverflow.com/questions/956867/
417
418def _decode_list(lst):
419    if not PY2:
420        return lst
421    newlist = []
422    for i in lst:
423        if isinstance(i, string_types):
424            i = to_bytes(i)
425        elif isinstance(i, list):
426            i = _decode_list(i)
427        newlist.append(i)
428    return newlist
429
430def _decode_dict(dct):
431    if not PY2:
432        return dct
433    newdict = {}
434    for k, v in iteritems(dct):
435        k = to_bytes(k)
436        if isinstance(v, string_types):
437            v = to_bytes(v)
438        elif isinstance(v, list):
439            v = _decode_list(v)
440        newdict[k] = v
441    return newdict
442
443
444def executor(retq, task, outq):
445    """The function used to execute tasks in the background process."""
446    logger.debug('    task started')
447
448    class LogOutput(object):
449        """Facility to log output at intervals."""
450
451        def __init__(self, out_queue):
452            self.out_queue = out_queue
453            self.stdout = sys.stdout
454            self.written = False
455            sys.stdout = self
456
457        def close(self):
458            sys.stdout = self.stdout
459            if self.written:
460                # see "Joining processes that use queues" section in
461                # https://docs.python.org/2/library/multiprocessing.html#programming-guidelines
462                # https://docs.python.org/3/library/multiprocessing.html#programming-guidelines
463                self.out_queue.cancel_join_thread()
464
465        def flush(self):
466            pass
467
468        def write(self, data):
469            self.out_queue.put(data)
470            self.written = True
471
472    W2P_TASK = Storage({
473                       'id': task.task_id,
474                       'uuid': task.uuid,
475                       'run_id': task.run_id
476                       })
477    stdout = LogOutput(outq)
478    try:
479        if task.app:
480            from gluon.shell import env, parse_path_info
481            from gluon import current
482            ## FIXME: why temporarily change the log level of the root logger?
483            #level = logging.getLogger().getEffectiveLevel()
484            #logging.getLogger().setLevel(logging.WARN)
485            # support for task.app like 'app/controller'
486            (a, c, f) = parse_path_info(task.app)
487            _env = env(a=a, c=c, import_models=True,
488                       extra_request={'is_scheduler': True})
489            #logging.getLogger().setLevel(level)
490            f = task.function
491            functions = current._scheduler.tasks
492            if functions:
493                _function = functions.get(f)
494            else:
495                # look into env
496                _function = _env.get(f)
497            if not isinstance(_function, CALLABLETYPES):
498                raise NameError(
499                    "name '%s' not found in scheduler's environment" % f)
500            # Inject W2P_TASK into environment
501            _env.update({'W2P_TASK': W2P_TASK})
502            # Inject W2P_TASK into current
503            current.W2P_TASK = W2P_TASK
504            globals().update(_env)
505            args = _decode_list(loads(task.args))
506            vars = loads(task.vars, object_hook=_decode_dict)
507            result = dumps(_function(*args, **vars))
508        else:
509            # for testing purpose only
510            result = eval(task.function)(
511                *loads(task.args, object_hook=_decode_dict),
512                **loads(task.vars, object_hook=_decode_dict))
513        if len(result) >= 1024:
514            fd, temp_path = tempfile.mkstemp(suffix='.w2p_sched')
515            with os.fdopen(fd, 'w') as f:
516                f.write(result)
517            result = RESULTINFILE + temp_path
518        retq.put(TaskReport('COMPLETED', result=result))
519    except:
520        tb = traceback.format_exc()
521        retq.put(TaskReport('FAILED', tb=tb))
522    finally:
523        stdout.close()
524
525
526class IS_CRONLINE(object):
527    """
528    Validates cronline
529    """
530    def __init__(self, error_message=None):
531        self.error_message = error_message
532
533    def __call__(self, value, record_id=None):
534        recur = CronParser(value, datetime.datetime.now())
535        try:
536            recur.next()
537            return (value, None)
538        except ValueError as e:
539            if not self.error_message:
540                return (value, e)
541            return (value, self.error_message)
542
543class TYPE(object):
544    """
545    Validator that checks whether field is valid json and validates its type.
546    Used for `args` and `vars` of the scheduler_task table
547    """
548
549    def __init__(self, myclass=list, parse=False):
550        self.myclass = myclass
551        self.parse = parse
552
553    def __call__(self, value, record_id=None):
554        from gluon import current
555        try:
556            obj = loads(value)
557        except:
558            return (value, current.T('invalid json'))
559        else:
560            if isinstance(obj, self.myclass):
561                if self.parse:
562                    return (obj, None)
563                else:
564                    return (value, None)
565            else:
566                return (value, current.T('Not of type: %s') % self.myclass)
567
568
569TASK_STATUS = (QUEUED, RUNNING, COMPLETED, FAILED, TIMEOUT, STOPPED, EXPIRED)
570RUN_STATUS = (RUNNING, COMPLETED, FAILED, TIMEOUT, STOPPED)
571WORKER_STATUS = (ACTIVE, PICK, DISABLED, TERMINATE, KILL, STOP_TASK)
572
573
574class Scheduler(threading.Thread):
575    """Scheduler object
576
577    Args:
578        db: DAL connection where Scheduler will create its tables
579        tasks(dict): either a dict containing name-->func or None.
580            If None, functions will be searched in the environment
581        migrate(bool): turn migration on/off for the Scheduler's tables
582        worker_name(str): force worker_name to identify each process.
583            Leave it to None to autoassign a name (hostname#pid)
584        group_names(list): process tasks belonging to this group
585            defaults to ['main'] if nothing gets passed
586        heartbeat(int): how many seconds the worker sleeps between one
587            execution and the following one. Indirectly sets how many seconds
588            will pass between checks for new tasks
589        max_empty_runs(int): how many loops are allowed to pass without
590            processing any tasks before exiting the process. 0 to keep always
591            the process alive
592        discard_results(bool): Scheduler stores executions's details into the
593            scheduler_run table. By default, only if there is a result the
594            details are kept. Turning this to True means discarding results
595            even for tasks that return something
596        utc_time(bool): do all datetime calculations assuming UTC as the
597            timezone. Remember to pass `start_time` and `stop_time` to tasks
598            accordingly
599        use_spawn(bool): use spawn for subprocess (only useable with python3)
600    """
601
602    def __init__(self, db, tasks=None, migrate=True,
603                 worker_name=None, group_names=None, heartbeat=HEARTBEAT,
604                 max_empty_runs=0, discard_results=False, utc_time=False, use_spawn=False):
605
606        threading.Thread.__init__(self)
607        self.setDaemon(True)
608        self.process = None     # the background process
609        self.process_queues = (None, None)
610        self.have_heartbeat = True   # set to False to kill
611        self.empty_runs = 0
612
613        self.db = db
614        self.db_thread = None
615        self.tasks = tasks
616        self.group_names = group_names or ['main']
617        self.heartbeat = heartbeat
618        self.worker_name = worker_name or IDENTIFIER
619        self.max_empty_runs = max_empty_runs
620        self.discard_results = discard_results
621        self.is_a_ticker = False
622        self.do_assign_tasks = False
623        self.greedy = False
624        self.utc_time = utc_time
625        self.w_stats_lock = threading.RLock()
626        self.w_stats = Storage(
627            dict(
628                status=RUNNING,
629                sleep=heartbeat,
630                total=0,
631                errors=0,
632                empty_runs=0,
633                queue=0,
634                distribution=None,
635                workers=0)
636        )  # dict holding statistics
637
638        from gluon import current
639        current._scheduler = self
640
641        self.define_tables(db, migrate=migrate)
642        self.use_spawn = use_spawn
643
644    def execute(self, task):
645        """Start the background process.
646
647        Args:
648            task : a `Task` object
649
650        Returns:
651            a `TaskReport` object
652        """
653        outq = None
654        retq = None
655        if (self.use_spawn and not PY2):
656            ctx = multiprocessing.get_context('spawn')
657            outq = ctx.Queue()
658            retq = ctx.Queue(maxsize=1)
659            self.process = p = ctx.Process(target=executor, args=(retq, task, outq))
660        else:
661            outq = multiprocessing.Queue()
662            retq = multiprocessing.Queue(maxsize=1)
663            self.process = p = \
664                multiprocessing.Process(target=executor, args=(retq, task, outq))
665
666        self.process_queues = (retq, outq)
667
668        logger.debug('   task starting')
669        p.start()
670        start = time.time()
671
672        if task.sync_output > 0:
673            run_timeout = task.sync_output
674        else:
675            run_timeout = task.timeout
676        task_output = tout = ''
677        try:
678            while p.is_alive() and (not task.timeout or
679                                    time.time() - start < task.timeout):
680                # NOTE: try always to empty the out queue before
681                #       the child process is joined,
682                #       see "Joining processes that use queues" section in
683                # https://docs.python.org/2/library/multiprocessing.html#programming-guidelines
684                # https://docs.python.org/3/library/multiprocessing.html#programming-guidelines
685                while True:
686                    try:
687                         tout += outq.get(timeout=2)
688                    except Queue.Empty:
689                        break
690                if tout:
691                    logger.debug(' partial output: "%s"', tout)
692                    if CLEAROUT in tout:
693                        task_output = tout[
694                            tout.rfind(CLEAROUT) + len(CLEAROUT):]
695                    else:
696                        task_output += tout
697                    try:
698                        db = self.db
699                        db(db.scheduler_run.id == task.run_id).update(run_output=task_output)
700                        db.commit()
701                        tout = ''
702                        logger.debug(' partial output saved')
703                    except Exception:
704                        logger.exception(' error while saving partial output')
705                        task_output = task_output[:-len(tout)]
706                p.join(timeout=run_timeout)
707        except:
708            logger.exception('    task stopped by general exception')
709            self.terminate_process()
710            tr = TaskReport(STOPPED)
711        else:
712            if p.is_alive():
713                logger.debug('    task timeout')
714                self.terminate_process(flush_ret=False)
715                try:
716                    # we try to get a traceback here
717                    tr = retq.get(timeout=2) # NOTE: risky after terminate
718                    tr.status = TIMEOUT
719                    tr.output = task_output
720                except Queue.Empty:
721                    tr = TaskReport(TIMEOUT)
722            else:
723                try:
724                    tr = retq.get_nowait()
725                except Queue.Empty:
726                    logger.debug('    task stopped')
727                    tr = TaskReport(STOPPED)
728                else:
729                    logger.debug('  task completed or failed')
730        result = tr.result
731        if result and result.startswith(RESULTINFILE):
732            temp_path = result.replace(RESULTINFILE, '', 1)
733            with open(temp_path) as f:
734                tr.result = f.read()
735            os.unlink(temp_path)
736        tr.output = task_output
737        return tr
738
739    _terminate_process_lock = threading.RLock()
740
741    def terminate_process(self, flush_out=True, flush_ret=True):
742        """Terminate any running tasks (internal use only)"""
743        if self.process is not None:
744            # must synchronize since we are called by main and heartbeat thread
745            with self._terminate_process_lock:
746                if flush_out:
747                    queue = self.process_queues[1]
748                    while not queue.empty():  # NOTE: empty() is not reliable
749                        try:
750                            queue.get_nowait()
751                        except Queue.Empty:
752                            pass
753                if flush_ret:
754                    queue = self.process_queues[0]
755                    while not queue.empty():
756                        try:
757                            queue.get_nowait()
758                        except Queue.Empty:
759                            pass
760                logger.debug('terminating process')
761                try:
762                    # NOTE: terminate should not be called when using shared
763                    #       resources, see "Avoid terminating processes"
764                    #       section in
765                    # https://docs.python.org/2/library/multiprocessing.html#programming-guidelines
766                    # https://docs.python.org/3/library/multiprocessing.html#programming-guidelines
767                    self.process.terminate()
768                    # NOTE: calling join after a terminate is risky,
769                    #       as explained in "Avoid terminating processes"
770                    #       section this can lead to a deadlock
771                    self.process.join()
772                finally:
773                    self.process = None
774
775    def die(self):
776        """Forces termination of the worker process along with any running
777        task"""
778        logger.info('die!')
779        self.have_heartbeat = False
780        self.terminate_process()
781
782    def give_up(self):
783        """Waits for any running task to be executed, then exits the worker
784        process"""
785        logger.info('Giving up as soon as possible!')
786        self.have_heartbeat = False
787
788    def run(self):
789        """This is executed by the heartbeat thread"""
790        counter = 0
791        while self.have_heartbeat:
792            self.send_heartbeat(counter)
793            counter += 1
794
795    def start_heartbeats(self):
796        self.start()
797
798    def __get_migrate(self, tablename, migrate=True):
799        if migrate is False:
800            return False
801        elif migrate is True:
802            return True
803        elif isinstance(migrate, str):
804            return "%s%s.table" % (migrate, tablename)
805        return True
806
807    def now(self):
808        """Shortcut that fetches current time based on UTC preferences."""
809        return self.utc_time and datetime.datetime.utcnow() or datetime.datetime.now()
810
811    def set_requirements(self, scheduler_task):
812        """Called to set defaults for lazy_tables connections."""
813        from gluon import current
814        if hasattr(current, 'request'):
815            scheduler_task.application_name.default = '%s/%s' % (
816                current.request.application, current.request.controller
817            )
818
819    def define_tables(self, db, migrate):
820        """Define Scheduler tables structure."""
821        from pydal.base import DEFAULT
822        logger.debug('defining tables (migrate=%s)', migrate)
823        now = self.now
824        db.define_table(
825            'scheduler_task',
826            Field('application_name', requires=IS_NOT_EMPTY(),
827                  default=None, writable=False),
828            Field('task_name', default=None),
829            Field('group_name', default='main'),
830            Field('status', requires=IS_IN_SET(TASK_STATUS),
831                  default=QUEUED, writable=False),
832            Field('broadcast', 'boolean', default=False),
833            Field('function_name',
834                  requires=IS_IN_SET(sorted(self.tasks.keys()))
835                  if self.tasks else DEFAULT),
836            Field('uuid', length=255,
837                  requires=IS_NOT_IN_DB(db, 'scheduler_task.uuid'),
838                  unique=True, default=web2py_uuid),
839            Field('args', 'text', default='[]', requires=TYPE(list)),
840            Field('vars', 'text', default='{}', requires=TYPE(dict)),
841            Field('enabled', 'boolean', default=True),
842            Field('start_time', 'datetime', default=now,
843                  requires=IS_DATETIME()),
844            Field('next_run_time', 'datetime', default=now),
845            Field('stop_time', 'datetime'),
846            Field('repeats', 'integer', default=1, comment="0=unlimited",
847                  requires=IS_INT_IN_RANGE(0, None)),
848            Field('retry_failed', 'integer', default=0, comment="-1=unlimited",
849                  requires=IS_INT_IN_RANGE(-1, None)),
850            Field('period', 'integer', default=60, comment='seconds',
851                  requires=IS_INT_IN_RANGE(0, None)),
852            Field('prevent_drift', 'boolean', default=False,
853                  comment='Exact start_times between runs'),
854            Field('cronline', default=None,
855                  comment='Discard "period", use this cron expr instead',
856                  requires=IS_EMPTY_OR(IS_CRONLINE())),
857            Field('timeout', 'integer', default=60, comment='seconds',
858                  requires=IS_INT_IN_RANGE(1, None)),
859            Field('sync_output', 'integer', default=0,
860                  comment="update output every n sec: 0=never",
861                  requires=IS_INT_IN_RANGE(0, None)),
862            Field('times_run', 'integer', default=0, writable=False),
863            Field('times_failed', 'integer', default=0, writable=False),
864            Field('last_run_time', 'datetime', writable=False, readable=False),
865            Field('assigned_worker_name', default='', writable=False),
866            on_define=self.set_requirements,
867            migrate=self.__get_migrate('scheduler_task', migrate),
868            format='(%(id)s) %(task_name)s')
869
870        db.define_table(
871            'scheduler_run',
872            Field('task_id', 'reference scheduler_task'),
873            Field('status', requires=IS_IN_SET(RUN_STATUS)),
874            Field('start_time', 'datetime'),
875            Field('stop_time', 'datetime'),
876            Field('run_output', 'text'),
877            Field('run_result', 'text'),
878            Field('traceback', 'text'),
879            Field('worker_name', default=self.worker_name),
880            migrate=self.__get_migrate('scheduler_run', migrate)
881        )
882
883        db.define_table(
884            'scheduler_worker',
885            Field('worker_name', length=255, unique=True),
886            Field('first_heartbeat', 'datetime'),
887            Field('last_heartbeat', 'datetime'),
888            Field('status', requires=IS_IN_SET(WORKER_STATUS)),
889            Field('is_ticker', 'boolean', default=False, writable=False),
890            Field('group_names', 'list:string', default=self.group_names),
891            Field('worker_stats', 'json'),
892            migrate=self.__get_migrate('scheduler_worker', migrate)
893        )
894
895        db.define_table(
896            'scheduler_task_deps',
897            Field('job_name', default='job_0'),
898            Field('task_parent', 'integer',
899                  requires=IS_IN_DB(db, 'scheduler_task.id', '%(task_name)s')
900                  ),
901            Field('task_child', 'reference scheduler_task'),
902            Field('can_visit', 'boolean', default=False),
903            migrate=self.__get_migrate('scheduler_task_deps', migrate)
904        )
905
906        if migrate is not False:
907            db.commit()
908
909    def loop(self, worker_name=None):
910        """Main loop.
911
912        This works basically as a neverending loop that:
913
914        - checks if the worker is ready to process tasks (is not DISABLED)
915        - pops a task from the queue
916        - if there is a task:
917
918          - spawns the executor background process
919          - waits for the process to be finished
920          - sleeps `heartbeat` seconds
921        - if there is not a task:
922
923          - checks for max_empty_runs
924          - sleeps `heartbeat` seconds
925
926        """
927        signal.signal(signal.SIGTERM, lambda signum, stack_frame: sys.exit(1))
928        try:
929            self.start_heartbeats()
930            while self.have_heartbeat:
931                with self.w_stats_lock:
932                    is_disabled = self.w_stats.status == DISABLED
933                if is_disabled:
934                    logger.debug('Someone stopped me, sleeping until better'
935                                 ' times come (%s)', self.w_stats.sleep)
936                    self.sleep()
937                    continue
938                logger.debug('looping...')
939                if self.is_a_ticker and self.do_assign_tasks:
940                    # I'm a ticker, and 5 loops passed without
941                    # reassigning tasks, let's do that
942                    self.wrapped_assign_tasks()
943                task = self.wrapped_pop_task()
944                if task:
945                    with self.w_stats_lock:
946                        self.w_stats.empty_runs = 0
947                        self.w_stats.status = RUNNING
948                        self.w_stats.total += 1
949                    self.wrapped_report_task(task, self.execute(task))
950                    with self.w_stats_lock:
951                        if not self.w_stats.status == DISABLED:
952                            self.w_stats.status = ACTIVE
953                else:
954                    with self.w_stats_lock:
955                        self.w_stats.empty_runs += 1
956                        if self.max_empty_runs != 0:
957                            logger.debug('empty runs %s/%s',
958                                         self.w_stats.empty_runs,
959                                         self.max_empty_runs)
960                            if self.w_stats.empty_runs >= self.max_empty_runs:
961                                logger.info(
962                                    'empty runs limit reached, killing myself')
963                                self.die()
964                    if self.is_a_ticker and self.greedy:
965                        # there could be other tasks ready to be assigned
966                        logger.info('TICKER: greedy loop')
967                        self.wrapped_assign_tasks()
968                    logger.debug('sleeping...')
969                    self.sleep()
970        except (KeyboardInterrupt, SystemExit):
971            logger.info('catched')
972            self.die()
973
974    def wrapped_pop_task(self):
975        """Commodity function to call `pop_task` and trap exceptions.
976
977        If an exception is raised, assume it happened because of database
978        contention and retries `pop_task` after 0.5 seconds
979        """
980        db = self.db
981        db.commit()  # for MySQL only; FIXME: Niphlod, still needed? could avoid when not MySQL?
982        for x in range(10):
983            try:
984                return self.pop_task()
985            except Exception:
986                logger.exception('    error popping tasks')
987                self.w_stats.errors += 1
988                db.rollback()
989                time.sleep(0.5)
990
991    def pop_task(self):
992        """Grab a task ready to be executed from the queue."""
993        now = self.now()
994        db = self.db
995        st = db.scheduler_task
996        grabbed = db(
997            (st.assigned_worker_name == self.worker_name) &
998            (st.status == ASSIGNED)
999        )
1000
1001        task = grabbed.select(limitby=(0, 1), orderby=st.next_run_time).first()
1002        if task:
1003            # none will touch my task!
1004            task.update_record(status=RUNNING, last_run_time=now)
1005            db.commit()
1006            logger.debug('   work to do %s', task.id)
1007        else:
1008            logger.info('nothing to do')
1009            return None
1010
1011        if task.cronline:
1012            cron_recur = CronParser(task.cronline,
1013                                    now.replace(second=0, microsecond=0))
1014            next_run_time = cron_recur.next()
1015        elif not task.prevent_drift:
1016            next_run_time = task.last_run_time + datetime.timedelta(
1017                seconds=task.period
1018            )
1019        else:
1020            # calc next_run_time based on available slots
1021            # see #1191
1022            next_run_time = task.start_time
1023            secondspassed = (now - next_run_time).total_seconds()
1024            times = secondspassed // task.period + 1
1025            next_run_time += datetime.timedelta(seconds=task.period * times)
1026
1027        times_run = task.times_run + 1
1028        if times_run < task.repeats or task.repeats == 0:
1029            # need to run (repeating task)
1030            run_again = True
1031        else:
1032            # no need to run again
1033            run_again = False
1034        run_id = 0
1035        while not self.discard_results:  # FIXME: forever?
1036            logger.debug('    new scheduler_run record')
1037            try:
1038                run_id = db.scheduler_run.insert(
1039                    task_id=task.id,
1040                    status=RUNNING,
1041                    start_time=now,
1042                    worker_name=self.worker_name)
1043                db.commit()
1044                break
1045            except Exception:
1046                logger.exception('    error inserting scheduler_run')
1047                db.rollback()
1048                time.sleep(0.5)
1049        logger.info('new task %(id)s "%(task_name)s"'
1050                    ' %(application_name)s.%(function_name)s' % task)
1051        return Task(
1052            app=task.application_name,
1053            function=task.function_name,
1054            timeout=task.timeout,
1055            args=task.args,  # in json
1056            vars=task.vars,  # in json
1057            task_id=task.id,
1058            run_id=run_id,
1059            run_again=run_again,
1060            next_run_time=next_run_time,
1061            times_run=times_run,
1062            stop_time=task.stop_time,
1063            retry_failed=task.retry_failed,
1064            times_failed=task.times_failed,
1065            sync_output=task.sync_output,
1066            uuid=task.uuid)
1067
1068    def wrapped_report_task(self, task, task_report):
1069        """Commodity function to call `report_task` and trap exceptions.
1070
1071        If an exception is raised, assume it happened because of database
1072        contention and retries `pop_task` after 0.5 seconds
1073        """
1074        db = self.db
1075        while True:  # FIXME: forever?
1076            try:
1077                self.report_task(task, task_report)
1078                db.commit()
1079                break
1080            except Exception:
1081                logger.exception('    error storing result')
1082                db.rollback()
1083                time.sleep(0.5)
1084
1085    def report_task(self, task, task_report):
1086        """Take care of storing the result according to preferences.
1087
1088        Deals with logic for repeating tasks.
1089        """
1090        now = self.now()
1091        db = self.db
1092        st = db.scheduler_task
1093        sr = db.scheduler_run
1094        if not self.discard_results:
1095            if task_report.result != 'null' or task_report.tb:
1096                # result is 'null' as a string if task completed
1097                # if it's stopped it's None as NoneType, so we record
1098                # the STOPPED "run" anyway
1099                logger.debug(' recording task report in db (%s)',
1100                             task_report.status)
1101                db(sr.id == task.run_id).update(
1102                    status=task_report.status,
1103                    stop_time=now,
1104                    run_result=task_report.result,
1105                    run_output=task_report.output,
1106                    traceback=task_report.tb)
1107            else:
1108                logger.debug(' deleting task report in db because of no result')
1109                db(sr.id == task.run_id).delete()
1110        # if there is a stop_time and the following run would exceed it
1111        is_expired = (task.stop_time and
1112                      task.next_run_time > task.stop_time or
1113                      False)
1114        status = (task.run_again and is_expired and EXPIRED or
1115                  task.run_again and not is_expired and QUEUED or
1116                  COMPLETED)
1117        if task_report.status == COMPLETED:
1118            d = dict(status=status,
1119                     next_run_time=task.next_run_time,
1120                     times_run=task.times_run,
1121                     times_failed=0
1122                     )
1123            db(st.id == task.task_id).update(**d)
1124            if status == COMPLETED:
1125                self.update_dependencies(task.task_id)
1126        else:
1127            st_mapping = {'FAILED': 'FAILED',
1128                          'TIMEOUT': 'TIMEOUT',
1129                          'STOPPED': 'FAILED'}[task_report.status]
1130            status = (task.retry_failed
1131                      and task.times_failed < task.retry_failed
1132                      and QUEUED or task.retry_failed == -1
1133                      and QUEUED or st_mapping)
1134            db(st.id == task.task_id).update(
1135                times_failed=st.times_failed + 1,
1136                next_run_time=task.next_run_time,
1137                status=status
1138            )
1139        logger.info('task completed (%s)', task_report.status)
1140
1141    def update_dependencies(self, task_id):
1142        """Unblock execution paths for Jobs."""
1143        db = self.db
1144        db(db.scheduler_task_deps.task_child == task_id).update(can_visit=True)
1145
1146    def adj_hibernation(self):
1147        """Used to increase the "sleep" interval for DISABLED workers."""
1148        with self.w_stats_lock:
1149            if self.w_stats.status == DISABLED:
1150                wk_st = self.w_stats.sleep
1151                hibernation = wk_st + HEARTBEAT if wk_st < MAXHIBERNATION else MAXHIBERNATION
1152                self.w_stats.sleep = hibernation
1153
1154    def send_heartbeat(self, counter):
1155        """Coordination among available workers.
1156
1157        It:
1158        - sends the heartbeat
1159        - elects a ticker among available workers (the only process that
1160            effectively dispatch tasks to workers)
1161        - deals with worker's statuses
1162        - does "housecleaning" for dead workers
1163        - triggers tasks assignment to workers
1164        """
1165        if self.db_thread:
1166            # BKR 20180612 check if connection still works
1167            try:
1168                self.db_thread(self.db_thread.scheduler_worker).count()
1169            except self.db_thread._adapter.connection.OperationalError:
1170                # if not -> throw away self.db_thread and force reconnect
1171                self.db_thread = None
1172
1173        if not self.db_thread:
1174            logger.debug('thread building own DAL object')
1175            self.db_thread = DAL(
1176                self.db._uri, folder=self.db._adapter.folder, decode_credentials=True)
1177            self.define_tables(self.db_thread, migrate=False)
1178
1179        try:
1180            now = self.now()
1181            db = self.db_thread
1182            sw = db.scheduler_worker
1183            st = db.scheduler_task
1184            # record heartbeat
1185            row = db(sw.worker_name == self.worker_name).select().first()
1186            with self.w_stats_lock:
1187                if not row:
1188                    sw.insert(status=ACTIVE, worker_name=self.worker_name,
1189                              first_heartbeat=now, last_heartbeat=now,
1190                              group_names=self.group_names,
1191                              worker_stats=self.w_stats)
1192                    self.w_stats.status = ACTIVE
1193                    self.w_stats.sleep = self.heartbeat
1194                    backed_status = ACTIVE
1195                else:
1196                    backed_status = row.status
1197                    if backed_status == DISABLED:
1198                        # keep sleeping
1199                        self.w_stats.status = DISABLED
1200                        logger.debug('........recording heartbeat (DISABLED)')
1201                        db(sw.worker_name == self.worker_name).update(
1202                            last_heartbeat=now,
1203                            worker_stats=self.w_stats)
1204                    elif backed_status == TERMINATE:
1205                        self.w_stats.status = TERMINATE
1206                        logger.debug("Waiting to terminate the current task")
1207                        self.give_up()
1208                    elif backed_status == KILL:
1209                        self.w_stats.status = KILL
1210                        self.die()
1211                        return
1212                    else:
1213                        if backed_status == STOP_TASK:
1214                            logger.info('Asked to kill the current task')
1215                            self.terminate_process()
1216                        logger.debug('........recording heartbeat (%s)',
1217                                     self.w_stats.status)
1218                        db(sw.worker_name == self.worker_name).update(
1219                            last_heartbeat=now, status=ACTIVE,
1220                            worker_stats=self.w_stats)
1221                        self.w_stats.sleep = self.heartbeat  # re-activating the process
1222                        if self.w_stats.status != RUNNING:
1223                            self.w_stats.status = ACTIVE
1224
1225            self.do_assign_tasks = False
1226            if counter % 5 == 0 or backed_status == PICK:
1227                try:
1228                    # delete dead workers
1229                    expiration = now - datetime.timedelta(
1230                        seconds=self.heartbeat * 3)
1231                    departure = now - datetime.timedelta(
1232                        seconds=self.heartbeat * 3 * 15)
1233                    logger.debug(
1234                        '    freeing workers that have not sent heartbeat')
1235                    dead_workers = db(
1236                        ((sw.last_heartbeat < expiration) & (sw.status == ACTIVE)) |
1237                        ((sw.last_heartbeat < departure) & (sw.status != ACTIVE))
1238                    )
1239                    dead_workers_name = dead_workers._select(sw.worker_name)
1240                    db(
1241                        (st.assigned_worker_name.belongs(dead_workers_name)) &
1242                        (st.status == RUNNING)
1243                    ).update(assigned_worker_name='', status=QUEUED)
1244                    dead_workers.delete()
1245                    try:
1246                        self.is_a_ticker = self.being_a_ticker()
1247                    except:
1248                        logger.exception('Error coordinating TICKER')
1249                    with self.w_stats_lock:
1250                        if self.w_stats.status == ACTIVE:
1251                            self.do_assign_tasks = True
1252                except:
1253                    logger.exception('Error cleaning up')
1254
1255            db.commit()
1256        except:
1257            logger.exception('Error retrieving status')
1258            db.rollback()
1259        self.adj_hibernation()
1260        self.sleep()
1261
1262    def being_a_ticker(self):
1263        """Elect a TICKER process that assigns tasks to available workers.
1264
1265        Does its best to elect a worker that is not busy processing other tasks
1266        to allow a proper distribution of tasks among all active workers ASAP
1267        """
1268        db = self.db_thread
1269        sw = db.scheduler_worker
1270        my_name = self.worker_name
1271        all_active = db(
1272            (sw.worker_name != my_name) & (sw.status == ACTIVE)
1273        ).select(sw.is_ticker, sw.worker_name)
1274        ticker = all_active.find(lambda row: row.is_ticker is True).first()
1275        with self.w_stats_lock:
1276            not_busy = self.w_stats.status == ACTIVE
1277        if not ticker:
1278            # if no other tickers are around
1279            if not_busy:
1280                # only if I'm not busy
1281                db(sw.worker_name == my_name).update(is_ticker=True)
1282                db(sw.worker_name != my_name).update(is_ticker=False)
1283                logger.info("TICKER: I'm a ticker")
1284            else:
1285                # I'm busy
1286                if len(all_active) >= 1:
1287                    # so I'll "downgrade" myself to a "poor worker"
1288                    db(sw.worker_name == my_name).update(is_ticker=False)
1289                else:
1290                    not_busy = True
1291            db.commit()
1292            return not_busy
1293        else:
1294            logger.info(
1295                "%s is a ticker, I'm a poor worker" % ticker.worker_name)
1296            return False
1297
1298    def wrapped_assign_tasks(self):
1299        """Commodity function to call `assign_tasks` and trap exceptions.
1300
1301        If an exception is raised, assume it happened because of database
1302        contention and retries `assign_task` after 0.5 seconds
1303        """
1304        logger.debug('Assigning tasks...')
1305        db = self.db
1306        db.commit()  # for MySQL only; FIXME: Niphlod, still needed? could avoid when not MySQL?
1307        for x in range(10):
1308            try:
1309                self.assign_tasks()
1310                db.commit()
1311                logger.debug('Tasks assigned...')
1312                break
1313            except Exception:
1314                logger.exception('TICKER: error assigning tasks')
1315                self.w_stats.errors += 1
1316                db.rollback()
1317                time.sleep(0.5)
1318
1319    def assign_tasks(self):
1320        """Assign task to workers, that can then pop them from the queue.
1321
1322        Deals with group_name(s) logic, in order to assign linearly tasks
1323        to available workers for those groups
1324        """
1325        now = self.now()
1326        db = self.db
1327        sw = db.scheduler_worker
1328        st = db.scheduler_task
1329        sd = db.scheduler_task_deps
1330        all_workers = db(sw.status == ACTIVE).select()
1331        # build workers as dict of groups
1332        wkgroups = {}
1333        for w in all_workers:
1334            if w.worker_stats['status'] == 'RUNNING':
1335                continue
1336            group_names = w.group_names
1337            for gname in group_names:
1338                if gname not in wkgroups:
1339                    wkgroups[gname] = dict(
1340                        workers=[{'name': w.worker_name, 'c': 0}])
1341                else:
1342                    wkgroups[gname]['workers'].append(
1343                        {'name': w.worker_name, 'c': 0})
1344        # set queued tasks that expired between "runs" (i.e., you turned off
1345        # the scheduler): then it wasn't expired, but now it is
1346        db(
1347            (st.status.belongs((QUEUED, ASSIGNED))) &
1348            (st.stop_time < now)
1349        ).update(status=EXPIRED)
1350
1351        # calculate dependencies
1352        deps_with_no_deps = db(
1353            (sd.can_visit == False) &
1354            (~sd.task_child.belongs(
1355                db(sd.can_visit == False)._select(sd.task_parent)
1356            )
1357            )
1358        )._select(sd.task_child)
1359        no_deps = db(
1360            (st.status.belongs((QUEUED, ASSIGNED))) &
1361            (
1362                (sd.id == None) | (st.id.belongs(deps_with_no_deps))
1363            )
1364        )._select(st.id, distinct=True, left=sd.on(
1365                 (st.id == sd.task_parent) &
1366                 (sd.can_visit == False)
1367        )
1368        )
1369
1370        all_available = db(
1371            (st.status.belongs((QUEUED, ASSIGNED))) &
1372            (st.next_run_time <= now) &
1373            (st.enabled == True) &
1374            (st.id.belongs(no_deps))
1375        )
1376
1377        limit = len(all_workers) * (50 / (len(wkgroups) or 1))
1378        # if there are a moltitude of tasks, let's figure out a maximum of
1379        # tasks per worker. This can be further tuned with some added
1380        # intelligence (like esteeming how many tasks will a worker complete
1381        # before the ticker reassign them around, but the gain is quite small
1382        # 50 is a sweet spot also for fast tasks, with sane heartbeat values
1383        # NB: ticker reassign tasks every 5 cycles, so if a worker completes
1384        # its 50 tasks in less than heartbeat*5 seconds,
1385        # it won't pick new tasks until heartbeat*5 seconds pass.
1386
1387        # If a worker is currently elaborating a long task, its tasks needs to
1388        # be reassigned to other workers
1389        # this shuffles up things a bit, in order to give a task equal chances
1390        # to be executed
1391
1392        # let's freeze it up
1393        db.commit()
1394        tnum = 0
1395        for group in wkgroups.keys():
1396            tasks = all_available(st.group_name == group).select(
1397                limitby=(0, limit), orderby=st.next_run_time)
1398            # let's break up the queue evenly among workers
1399            for task in tasks:
1400                tnum += 1
1401                gname = task.group_name
1402                ws = wkgroups.get(gname)
1403                if ws:
1404                    if task.broadcast:
1405                        for worker in ws['workers']:
1406                            new_task = db.scheduler_task.insert(
1407                                application_name = task.application_name,
1408                                task_name = task.task_name,
1409                                group_name = task.group_name,
1410                                status = ASSIGNED,
1411                                broadcast = False,
1412                                function_name = task.function_name,
1413                                args = task.args,
1414                                start_time = now,
1415                                repeats = 1,
1416                                retry_failed = task.retry_failed,
1417                                sync_output = task.sync_output,
1418                                assigned_worker_name = worker['name'])
1419                        if task.period:
1420                            next_run_time = now+datetime.timedelta(seconds=task.period)
1421                        else:
1422                            # must be cronline
1423                            cron_recur = CronParser(task.cronline,
1424                                    now.replace(second=0, microsecond=0))
1425                            next_run_time = cron_recur.next()
1426                        db(st.id == task.id).update(times_run=task.times_run+1,
1427                                                    next_run_time=next_run_time,
1428                                                    last_run_time=now)
1429                        db.commit()
1430                    else:
1431                        counter = 0
1432                        myw = 0
1433                        for i, w in enumerate(ws['workers']):
1434                            if w['c'] < counter:
1435                                myw = i
1436                            counter = w['c']
1437                        assigned_wn = wkgroups[gname]['workers'][myw]['name']
1438                        d = dict(
1439                            status=ASSIGNED,
1440                            assigned_worker_name=assigned_wn
1441                        )
1442                        db(
1443                            (st.id == task.id) &
1444                            (st.status.belongs((QUEUED, ASSIGNED)))
1445                        ).update(**d)
1446                        wkgroups[gname]['workers'][myw]['c'] += 1
1447                db.commit()
1448        # I didn't report tasks but I'm working nonetheless!!!!
1449        with self.w_stats_lock:
1450            if tnum > 0:
1451                self.w_stats.empty_runs = 0
1452            self.w_stats.queue = tnum
1453            self.w_stats.distribution = wkgroups
1454            self.w_stats.workers = len(all_workers)
1455        # I'll be greedy only if tasks assigned are equal to the limit
1456        # (meaning there could be others ready to be assigned)
1457        self.greedy = tnum >= limit
1458        logger.info('TICKER: workers are %s', len(all_workers))
1459        logger.info('TICKER: tasks are %s', tnum)
1460
1461    def sleep(self):
1462        """Calculate the number of seconds to sleep."""
1463        time.sleep(self.w_stats.sleep)
1464        # should only sleep until next available task
1465
1466    def set_worker_status(self, group_names=None, action=ACTIVE,
1467                          exclude=None, limit=None, worker_name=None):
1468        """Internal function to set worker's status."""
1469        db = self.db
1470        ws = db.scheduler_worker
1471        if not group_names:
1472            group_names = self.group_names
1473        elif isinstance(group_names, str):
1474            group_names = [group_names]
1475        if worker_name:
1476            db(ws.worker_name == worker_name).update(status=action)
1477            return
1478        exclusion = exclude and exclude.append(action) or [action]
1479        if not limit:
1480            for group in group_names:
1481                db(
1482                    (ws.group_names.contains(group)) &
1483                    (~ws.status.belongs(exclusion))
1484                ).update(status=action)
1485        else:
1486            for group in group_names:
1487                workers = db((ws.group_names.contains(group)) &
1488                             (~ws.status.belongs(exclusion))
1489                             )._select(ws.id, limitby=(0, limit))
1490                db(ws.id.belongs(workers)).update(status=action)
1491
1492    def disable(self, group_names=None, limit=None, worker_name=None):
1493        """Set DISABLED on the workers processing `group_names` tasks.
1494
1495        A DISABLED worker will be kept alive but it won't be able to process
1496        any waiting tasks, essentially putting it to sleep.
1497        By default, all group_names of Scheduler's instantation are selected
1498        """
1499        self.set_worker_status(
1500            group_names=group_names,
1501            action=DISABLED,
1502            exclude=[DISABLED, KILL, TERMINATE],
1503            limit=limit)
1504
1505    def resume(self, group_names=None, limit=None, worker_name=None):
1506        """Wakes a worker up (it will be able to process queued tasks)"""
1507        self.set_worker_status(
1508            group_names=group_names,
1509            action=ACTIVE,
1510            exclude=[KILL, TERMINATE],
1511            limit=limit)
1512
1513    def terminate(self, group_names=None, limit=None, worker_name=None):
1514        """Sets TERMINATE as worker status. The worker will wait for any
1515        currently running tasks to be executed and then it will exit gracefully
1516        """
1517        self.set_worker_status(
1518            group_names=group_names,
1519            action=TERMINATE,
1520            exclude=[KILL],
1521            limit=limit)
1522
1523    def kill(self, group_names=None, limit=None, worker_name=None):
1524        """Sets KILL as worker status. The worker will be killed even if it's
1525        processing a task."""
1526        self.set_worker_status(
1527            group_names=group_names,
1528            action=KILL,
1529            limit=limit)
1530
1531    def queue_task(self, function, pargs=[], pvars={}, **kwargs):
1532        """
1533        Queue tasks. This takes care of handling the validation of all
1534        parameters
1535
1536        Args:
1537            function: the function (anything callable with a __name__)
1538            pargs: "raw" args to be passed to the function. Automatically
1539                jsonified.
1540            pvars: "raw" kwargs to be passed to the function. Automatically
1541                jsonified
1542            kwargs: all the parameters available (basically, every
1543                `scheduler_task` column). If args and vars are here, they
1544                should be jsonified already, and they will override pargs
1545                and pvars
1546
1547        Returns:
1548            a dict just as a normal validate_and_insert(), plus a uuid key
1549            holding the uuid of the queued task. If validation is not passed
1550            ( i.e. some parameters are invalid) both id and uuid will be None,
1551            and you'll get an "error" dict holding the errors found.
1552        """
1553        if hasattr(function, '__name__'):
1554            function = function.__name__
1555        targs = 'args' in kwargs and kwargs.pop('args') or dumps(pargs)
1556        tvars = 'vars' in kwargs and kwargs.pop('vars') or dumps(pvars)
1557        tuuid = 'uuid' in kwargs and kwargs.pop('uuid') or web2py_uuid()
1558        tname = 'task_name' in kwargs and kwargs.pop('task_name') or function
1559        immediate = 'immediate' in kwargs and kwargs.pop('immediate') or None
1560        cronline = kwargs.get('cronline')
1561        kwargs.update(
1562            function_name=function,
1563            task_name=tname,
1564            args=targs,
1565            vars=tvars,
1566            uuid=tuuid,
1567            )
1568        if cronline:
1569            try:
1570                start_time = kwargs.get('start_time', self.now)
1571                next_run_time = CronParser(cronline, start_time).next()
1572                kwargs.update(start_time=start_time, next_run_time=next_run_time)
1573            except Exception:
1574                pass
1575        if 'start_time' in kwargs and 'next_run_time' not in kwargs:
1576            kwargs.update(next_run_time=kwargs['start_time'])
1577        db = self.db
1578        rtn = db.scheduler_task.validate_and_insert(**kwargs)
1579        if not rtn.errors:
1580            rtn.uuid = tuuid
1581            if immediate:
1582                db(
1583                    (db.scheduler_worker.is_ticker == True)
1584                ).update(status=PICK)
1585        else:
1586            rtn.uuid = None
1587        return rtn
1588
1589    def task_status(self, ref, output=False):
1590        """
1591        Retrieves task status and optionally the result of the task
1592
1593        Args:
1594            ref: can be
1595
1596              - an integer : lookup will be done by scheduler_task.id
1597              - a string : lookup will be done by scheduler_task.uuid
1598              - a `Query` : lookup as you wish, e.g. ::
1599
1600                    db.scheduler_task.task_name == 'test1'
1601
1602            output(bool): if `True`, fetch also the scheduler_run record
1603
1604        Returns:
1605            a single Row object, for the last queued task.
1606            If output == True, returns also the last scheduler_run record.
1607            The scheduler_run record is fetched by a left join, so it can
1608            have all fields == None
1609
1610        """
1611        from pydal.objects import Query
1612        db = self.db
1613        sr = db.scheduler_run
1614        st = db.scheduler_task
1615        if isinstance(ref, integer_types):
1616            q = st.id == ref
1617        elif isinstance(ref, str):
1618            q = st.uuid == ref
1619        elif isinstance(ref, Query):
1620            q = ref
1621        else:
1622            raise SyntaxError(
1623                "You can retrieve results only by id, uuid or Query")
1624        fields = [st.ALL]
1625        left = False
1626        orderby = ~st.id
1627        if output:
1628            fields = st.ALL, sr.ALL
1629            left = sr.on(sr.task_id == st.id)
1630            orderby = ~st.id | ~sr.id
1631        row = db(q).select(
1632            *fields,
1633            **dict(orderby=orderby,
1634                   left=left,
1635                   limitby=(0, 1))
1636        ).first()
1637        if row and output:
1638            row.result = row.scheduler_run.run_result and \
1639                loads(row.scheduler_run.run_result,
1640                      object_hook=_decode_dict) or None
1641        return row
1642
1643    def stop_task(self, ref):
1644        """Shortcut for task termination.
1645
1646        If the task is RUNNING it will terminate it, meaning that status
1647        will be set as FAILED.
1648
1649        If the task is QUEUED, its stop_time will be set as to "now",
1650            the enabled flag will be set to False, and the status to STOPPED
1651
1652        Args:
1653            ref: can be
1654
1655              - an integer : lookup will be done by scheduler_task.id
1656              - a string : lookup will be done by scheduler_task.uuid
1657
1658        Returns:
1659            - 1 if task was stopped (meaning an update has been done)
1660            - None if task was not found, or if task was not RUNNING or QUEUED
1661
1662        Note:
1663            Experimental
1664        """
1665        db = self.db
1666        st = db.scheduler_task
1667        sw = db.scheduler_worker
1668        if isinstance(ref, integer_types):
1669            q = st.id == ref
1670        elif isinstance(ref, str):
1671            q = st.uuid == ref
1672        else:
1673            raise SyntaxError(
1674                "You can retrieve results only by id or uuid")
1675        task = db(q).select(st.id, st.status, st.assigned_worker_name)
1676        task = task.first()
1677        rtn = None
1678        if not task:
1679            return rtn
1680        if task.status == 'RUNNING':
1681            q = sw.worker_name == task.assigned_worker_name
1682            rtn = db(q).update(status=STOP_TASK)
1683        elif task.status == 'QUEUED':
1684            rtn = db(q).update(
1685                stop_time=self.now(),
1686                enabled=False,
1687                status=STOPPED)
1688        return rtn
1689
1690    def get_workers(self, only_ticker=False):
1691        """ Returns a dict holding `worker_name : {**columns}`
1692        representing all "registered" workers
1693        only_ticker returns only the workers running as a TICKER,
1694        if there are any
1695        """
1696        db = self.db
1697        if only_ticker:
1698            workers = db(db.scheduler_worker.is_ticker == True).select()
1699        else:
1700            workers = db(db.scheduler_worker.id).select()
1701        all_workers = {}
1702        for row in workers:
1703            all_workers[row.worker_name] = Storage(
1704                status=row.status,
1705                first_heartbeat=row.first_heartbeat,
1706                last_heartbeat=row.last_heartbeat,
1707                group_names=row.group_names,
1708                is_ticker=row.is_ticker,
1709                worker_stats=row.worker_stats
1710            )
1711        return all_workers
1712
1713
1714def main():
1715    """
1716    allows to run worker without python web2py.py .... by simply::
1717
1718        python gluon/scheduler.py
1719
1720    """
1721    import optparse
1722    parser = optparse.OptionParser()
1723    parser.add_option(
1724        "-w", "--worker_name", dest="worker_name", default=None,
1725        help="start a worker with name")
1726    parser.add_option(
1727        "-b", "--heartbeat", dest="heartbeat", default=10,
1728        type='int', help="heartbeat time in seconds (default 10)")
1729    parser.add_option(
1730        "-L", "--logger_level", dest="logger_level",
1731        default=30,
1732        type='int',
1733        help="set debug output level (0-100, 0 means all, 100 means none;default is 30)")
1734    parser.add_option("-E", "--empty-runs",
1735                      dest="max_empty_runs",
1736                      type='int',
1737                      default=0,
1738                      help="max loops with no grabbed tasks permitted (0 for never check)")
1739    parser.add_option(
1740        "-g", "--group_names", dest="group_names",
1741        default='main',
1742        help="comma separated list of groups to be picked by the worker")
1743    parser.add_option(
1744        "-f", "--db_folder", dest="db_folder",
1745        default='/Users/mdipierro/web2py/applications/scheduler/databases',
1746        help="location of the dal database folder")
1747    parser.add_option(
1748        "-u", "--db_uri", dest="db_uri",
1749        default='sqlite://storage.sqlite',
1750        help="database URI string (web2py DAL syntax)")
1751    parser.add_option(
1752        "-t", "--tasks", dest="tasks", default=None,
1753        help="file containing task files, must define" +
1754        "tasks = {'task_name':(lambda: 'output')} or similar set of tasks")
1755    parser.add_option(
1756        "-U", "--utc-time", dest="utc_time", default=False,
1757        help="work with UTC timestamps"
1758    )
1759    (options, args) = parser.parse_args()
1760    if not options.tasks or not options.db_uri:
1761        print(USAGE)
1762    if options.tasks:
1763        path, filename = os.path.split(options.tasks)
1764        if filename.endswith('.py'):
1765            filename = filename[:-3]
1766        sys.path.append(path)
1767        print('importing tasks...')
1768        tasks = __import__(filename, globals(), locals(), [], -1).tasks
1769        print('tasks found: ' + ', '.join(list(tasks.keys())))
1770    else:
1771        tasks = {}
1772    group_names = [x.strip() for x in options.group_names.split(',')]
1773
1774    logging.getLogger().setLevel(options.logger_level)
1775
1776    print('groups for this worker: ' + ', '.join(group_names))
1777    print('connecting to database in folder: ' + options.db_folder or './')
1778    print('using URI: ' + options.db_uri)
1779    db = DAL(options.db_uri, folder=options.db_folder, decode_credentials=True)
1780    print('instantiating scheduler...')
1781    scheduler = Scheduler(db=db,
1782                          worker_name=options.worker_name,
1783                          tasks=tasks,
1784                          migrate=True,
1785                          group_names=group_names,
1786                          heartbeat=options.heartbeat,
1787                          max_empty_runs=options.max_empty_runs,
1788                          utc_time=options.utc_time)
1789    signal.signal(signal.SIGTERM, lambda signum, stack_frame: sys.exit(1))
1790    print('starting main worker loop...')
1791    scheduler.loop()
1792
1793if __name__ == '__main__':
1794    main()
Note: See TracBrowser for help on using the repository browser.