source: OpenRLabs-Git/deploy/rlabs-docker/web2py-rlabs/gluon/contrib/redis_scheduler.py

main
Last change on this file was 42bd667, checked in by David Fuertes <dfuertes@…>, 4 years ago

Historial Limpio

  • Property mode set to 100755
File size: 30.8 KB
Line 
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3"""
4| This file is part of the web2py Web Framework
5| Created by niphlod@gmail.com
6| License: LGPLv3 (http://www.gnu.org/licenses/lgpl.html)
7
8Scheduler with redis backend
9---------------------------------
10"""
11from __future__ import print_function
12
13import os
14import time
15import socket
16import datetime
17import logging
18from json import loads, dumps
19from gluon.utils import web2py_uuid
20from gluon.storage import Storage
21from gluon.scheduler import *
22from gluon.scheduler import _decode_dict
23from gluon.contrib.redis_utils import RWatchError
24
25USAGE = """
26## Example
27
28For any existing app
29
30Create File: app/models/scheduler.py ======
31from gluon.contrib.redis_utils import RConn
32from gluon.contrib.redis_scheduler import RScheduler
33
34def demo1(*args,**vars):
35    print('you passed args=%s and vars=%s' % (args, vars))
36    return 'done!'
37
38def demo2():
39    1/0
40
41rconn = RConn()
42mysched = RScheduler(db, dict(demo1=demo1,demo2=demo2), ...., redis_conn=rconn)
43
44## run worker nodes with:
45
46   cd web2py
47   python web2py.py -K app
48
49"""
50
51
52path = os.getcwd()
53
54if 'WEB2PY_PATH' not in os.environ:
55    os.environ['WEB2PY_PATH'] = path
56
57IDENTIFIER = "%s#%s" % (socket.gethostname(), os.getpid())
58
59logger = logging.getLogger('web2py.scheduler.%s' % IDENTIFIER)
60
61POLLING = 'POLLING'
62
63
64class RScheduler(Scheduler):
65
66    def __init__(self, db, tasks=None, migrate=True,
67                 worker_name=None, group_names=None, heartbeat=HEARTBEAT,
68                 max_empty_runs=0, discard_results=False, utc_time=False,
69                 redis_conn=None, mode=1):
70
71        """
72        Highly-experimental coordination with redis
73        Takes all args from Scheduler except redis_conn which
74        must be something closer to a StrictRedis instance.
75
76        My only regret - and the reason why I kept this under the hood for a
77        while - is that it's hard to hook up in web2py to something happening
78        right after the commit to a table, which will enable this version of the
79        scheduler to process "immediate" tasks right away instead of waiting a
80        few seconds (see FIXME in queue_task())
81
82        mode is reserved for future usage patterns.
83        Right now it moves the coordination (which is the most intensive
84        routine in the scheduler in matters of IPC) of workers to redis.
85        I'd like to have incrementally redis-backed modes of operations,
86        such as e.g.:
87            - 1: IPC through redis (which is the current implementation)
88            - 2: Store task results in redis (which will relieve further pressure
89                 from the db leaving the scheduler_run table empty and possibly
90                 keep things smooth as tasks results can be set to expire
91                 after a bit of time)
92            - 3: Move all the logic for storing and queueing tasks to redis
93                 itself - which means no scheduler_task usage too - and use
94                 the database only as an historical record-bookkeeping
95                 (e.g. for reporting)
96
97        As usual, I'm eager to see your comments.
98        """
99
100        Scheduler.__init__(self, db, tasks=tasks, migrate=migrate,
101                           worker_name=worker_name, group_names=group_names,
102                           heartbeat=heartbeat, max_empty_runs=max_empty_runs,
103                           discard_results=discard_results, utc_time=utc_time)
104
105        self.r_server = redis_conn
106        from gluon import current
107        self._application = current.request.application or 'appname'
108
109    def _nkey(self, key):
110        """Helper to restrict all keys to a namespace and track them."""
111        prefix = 'w2p:rsched:%s' % self._application
112        allkeys = '%s:allkeys' % prefix
113        newkey = "%s:%s" % (prefix, key)
114        self.r_server.sadd(allkeys, newkey)
115        return newkey
116
117    def prune_all(self):
118        """Global housekeeping."""
119        all_keys = self._nkey('allkeys')
120        with self.r_server.pipeline() as pipe:
121            while True:
122                try:
123                    pipe.watch('PRUNE_ALL')
124                    while True:
125                        k = pipe.spop(all_keys)
126                        if k is None:
127                            break
128                        pipe.delete(k)
129                    pipe.execute()
130                    break
131                except RWatchError:
132                    time.sleep(0.1)
133                    continue
134
135    def dt2str(self, value):
136        return value.strftime('%Y-%m-%d %H:%M:%S')
137
138    def str2date(self, value):
139        return datetime.datetime.strptime(value, '%Y-%m-%d %H:%M:%S')
140
141    def send_heartbeat(self, counter):
142        """
143        Workers coordination in redis.
144        It has evolved into something is not that easy.
145        Here we try to do what we need in a single transaction,
146        and retry that transaction if something goes wrong
147        """
148        with self.r_server.pipeline() as pipe:
149            while True:
150                try:
151                    pipe.watch('SEND_HEARTBEAT')
152                    self.inner_send_heartbeat(counter, pipe)
153                    pipe.execute()
154                    self.adj_hibernation()
155                    self.sleep()
156                    break
157                except RWatchError:
158                    time.sleep(0.1)
159                    continue
160
161    def inner_send_heartbeat(self, counter, pipe):
162        """
163        Do a few things in the "maintenance" thread.
164
165        Specifically:
166        - registers the workers
167        - accepts commands sent to workers (KILL, TERMINATE, PICK, DISABLED, etc)
168        - adjusts sleep
169        - saves stats
170        - elects master
171        - does "housecleaning" for dead workers
172        - triggers tasks assignment
173        """
174        r_server = pipe
175        status_keyset = self._nkey('worker_statuses')
176        status_key = self._nkey('worker_status:%s' % (self.worker_name))
177        now = self.now()
178        mybackedstatus = r_server.hgetall(status_key)
179        if not mybackedstatus:
180            r_server.hmset(
181                status_key,
182                dict(
183                    status=ACTIVE, worker_name=self.worker_name,
184                    first_heartbeat=self.dt2str(now),
185                    last_heartbeat=self.dt2str(now),
186                    group_names=dumps(self.group_names), is_ticker=False,
187                    worker_stats=dumps(self.w_stats))
188            )
189            r_server.sadd(status_keyset, status_key)
190            if not self.w_stats.status == POLLING:
191                self.w_stats.status = ACTIVE
192                self.w_stats.sleep = self.heartbeat
193                mybackedstatus = ACTIVE
194        else:
195            mybackedstatus = mybackedstatus['status']
196            if mybackedstatus == DISABLED:
197                # keep sleeping
198                self.w_stats.status = DISABLED
199                r_server.hmset(
200                    status_key,
201                    dict(last_heartbeat=self.dt2str(now),
202                         worker_stats=dumps(self.w_stats))
203                )
204            elif mybackedstatus == TERMINATE:
205                self.w_stats.status = TERMINATE
206                logger.debug("Waiting to terminate the current task")
207                self.give_up()
208            elif mybackedstatus == KILL:
209                self.w_stats.status = KILL
210                self.die()
211            else:
212                if mybackedstatus == STOP_TASK:
213                    logger.info('Asked to kill the current task')
214                    self.terminate_process()
215                logger.info('........recording heartbeat (%s)',
216                            self.w_stats.status)
217                r_server.hmset(
218                    status_key,
219                    dict(
220                        last_heartbeat=self.dt2str(now), status=ACTIVE,
221                        worker_stats=dumps(self.w_stats)
222                    )
223                )
224                # newroutine
225                r_server.expire(status_key, self.heartbeat * 3 * 15)
226                self.w_stats.sleep = self.heartbeat  # re-activating the process
227                if self.w_stats.status not in (RUNNING, POLLING):
228                    self.w_stats.status = ACTIVE
229
230            self.do_assign_tasks = False
231            if counter % 5 == 0 or mybackedstatus == PICK:
232                try:
233                    logger.info(
234                        '    freeing workers that have not sent heartbeat')
235                    registered_workers = r_server.smembers(status_keyset)
236                    allkeys = self._nkey('allkeys')
237                    for worker in registered_workers:
238                        w = r_server.hgetall(worker)
239                        w = Storage(w)
240                        if not w:
241                            r_server.srem(status_keyset, worker)
242                            logger.info('removing %s from %s', worker, allkeys)
243                            r_server.srem(allkeys, worker)
244                            continue
245                    try:
246                        self.is_a_ticker = self.being_a_ticker(pipe)
247                    except:
248                        pass
249                    if self.w_stats.status in (ACTIVE, POLLING):
250                        self.do_assign_tasks = True
251                    if self.is_a_ticker and self.do_assign_tasks:
252                        # I'm a ticker, and 5 loops passed without reassigning tasks,
253                        # let's do that and loop again
254                        if not self.db_thread:
255                            logger.debug('thread building own DAL object')
256                            self.db_thread = DAL(
257                                self.db._uri, folder=self.db._adapter.folder)
258                            self.define_tables(self.db_thread, migrate=False)
259                        db = self.db_thread
260                        self.wrapped_assign_tasks(db)
261                        return None
262                except:
263                    logger.error('Error assigning tasks')
264
265    def being_a_ticker(self, pipe):
266        """
267        Elects a ticker.
268
269        This is slightly more convoluted than the original
270        but if far more efficient
271        """
272        r_server = pipe
273        status_keyset = self._nkey('worker_statuses')
274        registered_workers = r_server.smembers(status_keyset)
275        ticker = None
276        all_active = []
277        all_workers = []
278        for worker in registered_workers:
279            w = r_server.hgetall(worker)
280            if w['worker_name'] != self.worker_name and w['status'] == ACTIVE:
281                all_active.append(w)
282                if w['is_ticker'] == 'True' and ticker is None:
283                    ticker = w
284            all_workers.append(w)
285        not_busy = self.w_stats.status in (ACTIVE, POLLING)
286        if not ticker:
287            if not_busy:
288                # only if this worker isn't busy, otherwise wait for a free one
289                for worker in all_workers:
290                    key = self._nkey('worker_status:%s' % worker['worker_name'])
291                    if worker['worker_name'] == self.worker_name:
292                        r_server.hset(key, 'is_ticker', True)
293                    else:
294                        r_server.hset(key, 'is_ticker', False)
295                logger.info("TICKER: I'm a ticker")
296            else:
297                # giving up, only if I'm not alone
298                if len(all_active) > 1:
299                    key = self._nkey('worker_status:%s' % (self.worker_name))
300                    r_server.hset(key, 'is_ticker', False)
301                else:
302                    not_busy = True
303            return not_busy
304        else:
305            logger.info(
306                "%s is a ticker, I'm a poor worker" % ticker['worker_name'])
307            return False
308
309    def assign_tasks(self, db):
310        """
311        The real beauty.
312
313        We don't need to ASSIGN tasks, we just put
314        them into the relevant queue
315        """
316        st, sd = db.scheduler_task, db.scheduler_task_deps
317        r_server = self.r_server
318        now = self.now()
319        status_keyset = self._nkey('worker_statuses')
320        with r_server.pipeline() as pipe:
321            while True:
322                try:
323                    # making sure we're the only one doing the job
324                    pipe.watch('ASSIGN_TASKS')
325                    registered_workers = pipe.smembers(status_keyset)
326                    all_workers = []
327                    for worker in registered_workers:
328                        w = pipe.hgetall(worker)
329                        if w['status'] == ACTIVE:
330                            all_workers.append(Storage(w))
331                    pipe.execute()
332                    break
333                except RWatchError:
334                    time.sleep(0.1)
335                    continue
336
337        # build workers as dict of groups
338        wkgroups = {}
339        for w in all_workers:
340            group_names = loads(w.group_names)
341            for gname in group_names:
342                if gname not in wkgroups:
343                    wkgroups[gname] = dict(
344                        workers=[{'name': w.worker_name, 'c': 0}])
345                else:
346                    wkgroups[gname]['workers'].append(
347                        {'name': w.worker_name, 'c': 0})
348        # set queued tasks that expired between "runs" (i.e., you turned off
349        # the scheduler): then it wasn't expired, but now it is
350        db(
351            (st.status.belongs((QUEUED, ASSIGNED))) &
352            (st.stop_time < now)
353        ).update(status=EXPIRED)
354
355        # calculate dependencies
356        deps_with_no_deps = db(
357            (sd.can_visit == False) &
358            (~sd.task_child.belongs(
359                db(sd.can_visit == False)._select(sd.task_parent)
360            )
361            )
362        )._select(sd.task_child)
363        no_deps = db(
364            (st.status.belongs((QUEUED, ASSIGNED))) &
365            (
366                (sd.id == None) | (st.id.belongs(deps_with_no_deps))
367
368            )
369        )._select(st.id, distinct=True, left=sd.on(
370                 (st.id == sd.task_parent) &
371                 (sd.can_visit == False)
372        )
373        )
374
375        all_available = db(
376            (st.status.belongs((QUEUED, ASSIGNED))) &
377            (st.next_run_time <= now) &
378            (st.enabled == True) &
379            (st.id.belongs(no_deps))
380        )
381
382        limit = len(all_workers) * (50 / (len(wkgroups) or 1))
383
384        # let's freeze it up
385        db.commit()
386        x = 0
387        r_server = self.r_server
388        for group in wkgroups.keys():
389            queued_list = self._nkey('queued:%s' % group)
390            queued_set = self._nkey('queued_set:%s' % group)
391            # if are running, let's don't assign them again
392            running_list = self._nkey('running:%s' % group)
393            while True:
394                # the joys for rpoplpush!
395                t = r_server.rpoplpush(running_list, queued_list)
396                if not t:
397                    # no more
398                    break
399                r_server.sadd(queued_set, t)
400
401            tasks = all_available(st.group_name == group).select(
402                limitby=(0, limit), orderby = st.next_run_time)
403
404            # put tasks in the processing list
405
406            for task in tasks:
407                x += 1
408                gname = task.group_name
409
410                if r_server.sismember(queued_set, task.id):
411                    # already queued, we don't put on the list
412                    continue
413                r_server.sadd(queued_set, task.id)
414                r_server.lpush(queued_list, task.id)
415                d = dict(status=QUEUED)
416                if not task.task_name:
417                    d['task_name'] = task.function_name
418                db(
419                    (st.id == task.id) &
420                    (st.status.belongs((QUEUED, ASSIGNED)))
421                ).update(**d)
422            db.commit()
423        # I didn't report tasks but I'm working nonetheless!!!!
424        if x > 0:
425            self.w_stats.empty_runs = 0
426        self.w_stats.queue = x
427        self.w_stats.distribution = wkgroups
428        self.w_stats.workers = len(all_workers)
429        # I'll be greedy only if tasks queued are equal to the limit
430        # (meaning there could be others ready to be queued)
431        self.greedy = x >= limit
432        logger.info('TICKER: workers are %s', len(all_workers))
433        logger.info('TICKER: tasks are %s', x)
434
435    def pop_task(self, db):
436        """Lift a task off a queue."""
437        r_server = self.r_server
438        st = self.db.scheduler_task
439        task = None
440        # ready to process something
441        for group in self.group_names:
442            queued_set = self._nkey('queued_set:%s' % group)
443            queued_list = self._nkey('queued:%s' % group)
444            running_list = self._nkey('running:%s' % group)
445            running_dict = self._nkey('running_dict:%s' % group)
446            self.w_stats.status = POLLING
447            # polling for 1 minute in total. If more groups are in,
448            # polling is 1 minute in total
449            logger.debug('    polling on %s', group)
450            task_id = r_server.brpoplpush(queued_list, running_list,
451                                          timeout=60 / len(self.group_names))
452            logger.debug('    finished polling')
453            self.w_stats.status = ACTIVE
454            if task_id:
455                r_server.hset(running_dict, task_id, self.worker_name)
456                r_server.srem(queued_set, task_id)
457                task = db(
458                    (st.id == task_id) &
459                    (st.status == QUEUED)
460                ).select().first()
461                if not task:
462                    r_server.lrem(running_list, 0, task_id)
463                    r_server.hdel(running_dict, task_id)
464                    r_server.lrem(queued_list, 0, task_id)
465                    logger.error("we received a task that isn't there (%s)",
466                                 task_id)
467                    return None
468                break
469        now = self.now()
470        if task:
471            task.update_record(status=RUNNING, last_run_time=now)
472            # noone will touch my task!
473            db.commit()
474            logger.debug('   work to do %s', task.id)
475        else:
476            logger.info('nothing to do')
477            return None
478        times_run = task.times_run + 1
479        if task.cronline:
480            cron_recur = CronParser(task.cronline, now.replace(second=0))
481            next_run_time = cron_recur.get_next()
482        elif not task.prevent_drift:
483            next_run_time = task.last_run_time + datetime.timedelta(
484                seconds=task.period
485            )
486        else:
487            # calc next_run_time based on available slots
488            # see #1191
489            next_run_time = task.start_time
490            secondspassed = (now - next_run_time).total_seconds()
491            steps = secondspassed // task.period + 1
492            next_run_time += datetime.timedelta(seconds=task.period * steps)
493
494        if times_run < task.repeats or task.repeats == 0:
495            # need to run (repeating task)
496            run_again = True
497        else:
498            # no need to run again
499            run_again = False
500        run_id = 0
501        while True and not self.discard_results:
502            logger.debug('    new scheduler_run record')
503            try:
504                run_id = db.scheduler_run.insert(
505                    task_id=task.id,
506                    status=RUNNING,
507                    start_time=now,
508                    worker_name=self.worker_name)
509                db.commit()
510                break
511            except:
512                time.sleep(0.5)
513                db.rollback()
514        logger.info('new task %(id)s "%(task_name)s"'
515                    ' %(application_name)s.%(function_name)s' % task)
516        return Task(
517            app=task.application_name,
518            function=task.function_name,
519            timeout=task.timeout,
520            args=task.args,  # in json
521            vars=task.vars,  # in json
522            task_id=task.id,
523            run_id=run_id,
524            run_again=run_again,
525            next_run_time=next_run_time,
526            times_run=times_run,
527            stop_time=task.stop_time,
528            retry_failed=task.retry_failed,
529            times_failed=task.times_failed,
530            sync_output=task.sync_output,
531            uuid=task.uuid,
532            group_name=task.group_name)
533
534    def report_task(self, task, task_report):
535        """
536        Override.
537
538        Needs it only because we need to pop from the
539        running tasks
540        """
541        r_server = self.r_server
542        db = self.db
543        now = self.now()
544        st = db.scheduler_task
545        sr = db.scheduler_run
546        if not self.discard_results:
547            if task_report.result != 'null' or task_report.tb:
548                # result is 'null' as a string if task completed
549                # if it's stopped it's None as NoneType, so we record
550                # the STOPPED "run" anyway
551                logger.debug(' recording task report in db (%s)',
552                             task_report.status)
553                db(sr.id == task.run_id).update(
554                    status=task_report.status,
555                    stop_time=now,
556                    run_result=task_report.result,
557                    run_output=task_report.output,
558                    traceback=task_report.tb)
559            else:
560                logger.debug(' deleting task report in db because of no result')
561                db(sr.id == task.run_id).delete()
562        # if there is a stop_time and the following run would exceed it
563        is_expired = (task.stop_time and
564                      task.next_run_time > task.stop_time and
565                      True or False)
566        status = (task.run_again and is_expired and EXPIRED or
567                  task.run_again and not is_expired and
568                  QUEUED or COMPLETED)
569        if task_report.status == COMPLETED:
570            # assigned calculations
571            d = dict(status=status,
572                     next_run_time=task.next_run_time,
573                     times_run=task.times_run,
574                     times_failed=0,
575                     assigned_worker_name=self.worker_name
576                     )
577            db(st.id == task.task_id).update(**d)
578            if status == COMPLETED:
579                self.update_dependencies(db, task.task_id)
580        else:
581            st_mapping = {'FAILED': 'FAILED',
582                          'TIMEOUT': 'TIMEOUT',
583                          'STOPPED': 'FAILED'}[task_report.status]
584            status = (task.retry_failed and
585                      task.times_failed < task.retry_failed and
586                      QUEUED or task.retry_failed == -1 and
587                      QUEUED or st_mapping)
588            db(st.id == task.task_id).update(
589                times_failed=st.times_failed + 1,
590                next_run_time=task.next_run_time,
591                status=status,
592                assigned_worker_name=self.worker_name
593            )
594        logger.info('task completed (%s)', task_report.status)
595        running_list = self._nkey('running:%s' % task.group_name)
596        running_dict = self._nkey('running_dict:%s' % task.group_name)
597        r_server.lrem(running_list, 0, task.task_id)
598        r_server.hdel(running_dict, task.task_id)
599
600    def wrapped_pop_task(self):
601        """Commodity function to call `pop_task` and trap exceptions.
602        If an exception is raised, assume it happened because of database
603        contention and retries `pop_task` after 0.5 seconds
604        """
605        db = self.db
606        db.commit()  # another nifty db.commit() only for Mysql
607        x = 0
608        while x < 10:
609            try:
610                rtn = self.pop_task(db)
611                return rtn
612                break
613            # this is here to "interrupt" any blrpoplpush op easily
614            except KeyboardInterrupt:
615                self.give_up()
616                break
617            except:
618                self.w_stats.errors += 1
619                db.rollback()
620                logger.error('    error popping tasks')
621                x += 1
622                time.sleep(0.5)
623
624    def get_workers(self, only_ticker=False):
625        """Return a dict holding worker_name : {**columns}
626        representing all "registered" workers.
627        only_ticker returns only the worker running as a TICKER,
628        if there is any
629        """
630        r_server = self.r_server
631        status_keyset = self._nkey('worker_statuses')
632        registered_workers = r_server.smembers(status_keyset)
633        all_workers = {}
634        for worker in registered_workers:
635            w = r_server.hgetall(worker)
636            w = Storage(w)
637            if not w:
638                continue
639            all_workers[w.worker_name] = Storage(
640                status=w.status,
641                first_heartbeat=self.str2date(w.first_heartbeat),
642                last_heartbeat=self.str2date(w.last_heartbeat),
643                group_names=loads(w.group_names, object_hook=_decode_dict),
644                is_ticker=w.is_ticker == 'True' and True or False,
645                worker_stats=loads(w.worker_stats, object_hook=_decode_dict)
646            )
647        if only_ticker:
648            for k, v in all_workers.iteritems():
649                if v['is_ticker']:
650                    return {k: v}
651            return {}
652        return all_workers
653
654    def set_worker_status(self, group_names=None, action=ACTIVE,
655                          exclude=None, limit=None, worker_name=None):
656        """Internal function to set worker's status"""
657        r_server = self.r_server
658        all_workers = self.get_workers()
659        if not group_names:
660            group_names = self.group_names
661        elif isinstance(group_names, str):
662            group_names = [group_names]
663        exclusion = exclude and exclude.append(action) or [action]
664        workers = []
665        if worker_name is not None:
666            if worker_name in all_workers.keys():
667                workers = [worker_name]
668        else:
669            for k, v in all_workers.iteritems():
670                if v.status not in exclusion and set(group_names) & set(v.group_names):
671                    workers.append(k)
672        if limit and worker_name is None:
673            workers = workers[:limit]
674        if workers:
675            with r_server.pipeline() as pipe:
676                while True:
677                    try:
678                        pipe.watch('SET_WORKER_STATUS')
679                        for w in workers:
680                            worker_key = self._nkey('worker_status:%s' % w)
681                            pipe.hset(worker_key, 'status', action)
682                        pipe.execute()
683                        break
684                    except RWatchError:
685                        time.sleep(0.1)
686                        continue
687
688    def queue_task(self, function, pargs=[], pvars={}, **kwargs):
689        """
690        FIXME: immediate should put item in queue. The hard part is
691        that currently there are no hooks happening at post-commit time
692        Queue tasks. This takes care of handling the validation of all
693        parameters
694
695        Args:
696            function: the function (anything callable with a __name__)
697            pargs: "raw" args to be passed to the function. Automatically
698                jsonified.
699            pvars: "raw" kwargs to be passed to the function. Automatically
700                jsonified
701            kwargs: all the parameters available (basically, every
702                `scheduler_task` column). If args and vars are here, they should
703                be jsonified already, and they will override pargs and pvars
704
705        Returns:
706            a dict just as a normal validate_and_insert(), plus a uuid key
707            holding the uuid of the queued task. If validation is not passed
708            ( i.e. some parameters are invalid) both id and uuid will be None,
709            and you'll get an "error" dict holding the errors found.
710        """
711        if hasattr(function, '__name__'):
712            function = function.__name__
713        targs = 'args' in kwargs and kwargs.pop('args') or dumps(pargs)
714        tvars = 'vars' in kwargs and kwargs.pop('vars') or dumps(pvars)
715        tuuid = 'uuid' in kwargs and kwargs.pop('uuid') or web2py_uuid()
716        tname = 'task_name' in kwargs and kwargs.pop('task_name') or function
717        immediate = 'immediate' in kwargs and kwargs.pop('immediate') or None
718        cronline = kwargs.get('cronline')
719        kwargs.update(function_name=function,
720            task_name=tname,
721            args=targs,
722            vars=tvars,
723            uuid=tuuid)
724        if cronline:
725            try:
726                start_time = kwargs.get('start_time', self.now)
727                next_run_time = CronParser(cronline, start_time).get_next()
728                kwargs.update(start_time=start_time, next_run_time=next_run_time)
729            except:
730                pass
731        rtn = self.db.scheduler_task.validate_and_insert(**kwargs)
732        if not rtn.errors:
733            rtn.uuid = tuuid
734            if immediate:
735                r_server = self.r_server
736                ticker = self.get_workers(only_ticker=True)
737                if ticker.keys():
738                    ticker = ticker.keys()[0]
739                    with r_server.pipeline() as pipe:
740                        while True:
741                            try:
742                                pipe.watch('SET_WORKER_STATUS')
743                                worker_key = self._nkey('worker_status:%s' % ticker)
744                                pipe.hset(worker_key, 'status', 'PICK')
745                                pipe.execute()
746                                break
747                            except RWatchError:
748                                time.sleep(0.1)
749                                continue
750        else:
751            rtn.uuid = None
752        return rtn
753
754    def stop_task(self, ref):
755        """Shortcut for task termination.
756
757        If the task is RUNNING it will terminate it, meaning that status
758        will be set as FAILED.
759
760        If the task is QUEUED, its stop_time will be set as to "now",
761            the enabled flag will be set to False, and the status to STOPPED
762
763        Args:
764            ref: can be
765
766              - an integer : lookup will be done by scheduler_task.id
767              - a string : lookup will be done by scheduler_task.uuid
768
769        Returns:
770            - 1 if task was stopped (meaning an update has been done)
771            - None if task was not found, or if task was not RUNNING or QUEUED
772
773        Note:
774            Experimental
775        """
776        r_server = self.r_server
777        st = self.db.scheduler_task
778        if isinstance(ref, int):
779            q = st.id == ref
780        elif isinstance(ref, str):
781            q = st.uuid == ref
782        else:
783            raise SyntaxError(
784                "You can retrieve results only by id or uuid")
785        task = self.db(q).select(st.id, st.status, st.group_name)
786        task = task.first()
787        rtn = None
788        if not task:
789            return rtn
790        running_dict = self._nkey('running_dict:%s' % task.group_name)
791        if task.status == 'RUNNING':
792            worker_key = r_server.hget(running_dict, task.id)
793            worker_key = self._nkey('worker_status:%s' % (worker_key))
794            r_server.hset(worker_key, 'status', STOP_TASK)
795        elif task.status == 'QUEUED':
796            rtn = self.db(q).update(
797                stop_time=self.now(),
798                enabled=False,
799                status=STOPPED)
800        return rtn
Note: See TracBrowser for help on using the repository browser.