1 | #!/usr/bin/env python |
---|
2 | # -*- coding: utf-8 -*- |
---|
3 | """ |
---|
4 | | This file is part of the web2py Web Framework |
---|
5 | | Created by niphlod@gmail.com |
---|
6 | | License: LGPLv3 (http://www.gnu.org/licenses/lgpl.html) |
---|
7 | |
---|
8 | Scheduler with redis backend |
---|
9 | --------------------------------- |
---|
10 | """ |
---|
11 | from __future__ import print_function |
---|
12 | |
---|
13 | import os |
---|
14 | import time |
---|
15 | import socket |
---|
16 | import datetime |
---|
17 | import logging |
---|
18 | from json import loads, dumps |
---|
19 | from gluon.utils import web2py_uuid |
---|
20 | from gluon.storage import Storage |
---|
21 | from gluon.scheduler import * |
---|
22 | from gluon.scheduler import _decode_dict |
---|
23 | from gluon.contrib.redis_utils import RWatchError |
---|
24 | |
---|
25 | USAGE = """ |
---|
26 | ## Example |
---|
27 | |
---|
28 | For any existing app |
---|
29 | |
---|
30 | Create File: app/models/scheduler.py ====== |
---|
31 | from gluon.contrib.redis_utils import RConn |
---|
32 | from gluon.contrib.redis_scheduler import RScheduler |
---|
33 | |
---|
34 | def demo1(*args,**vars): |
---|
35 | print('you passed args=%s and vars=%s' % (args, vars)) |
---|
36 | return 'done!' |
---|
37 | |
---|
38 | def demo2(): |
---|
39 | 1/0 |
---|
40 | |
---|
41 | rconn = RConn() |
---|
42 | mysched = RScheduler(db, dict(demo1=demo1,demo2=demo2), ...., redis_conn=rconn) |
---|
43 | |
---|
44 | ## run worker nodes with: |
---|
45 | |
---|
46 | cd web2py |
---|
47 | python web2py.py -K app |
---|
48 | |
---|
49 | """ |
---|
50 | |
---|
51 | |
---|
52 | path = os.getcwd() |
---|
53 | |
---|
54 | if 'WEB2PY_PATH' not in os.environ: |
---|
55 | os.environ['WEB2PY_PATH'] = path |
---|
56 | |
---|
57 | IDENTIFIER = "%s#%s" % (socket.gethostname(), os.getpid()) |
---|
58 | |
---|
59 | logger = logging.getLogger('web2py.scheduler.%s' % IDENTIFIER) |
---|
60 | |
---|
61 | POLLING = 'POLLING' |
---|
62 | |
---|
63 | |
---|
64 | class RScheduler(Scheduler): |
---|
65 | |
---|
66 | def __init__(self, db, tasks=None, migrate=True, |
---|
67 | worker_name=None, group_names=None, heartbeat=HEARTBEAT, |
---|
68 | max_empty_runs=0, discard_results=False, utc_time=False, |
---|
69 | redis_conn=None, mode=1): |
---|
70 | |
---|
71 | """ |
---|
72 | Highly-experimental coordination with redis |
---|
73 | Takes all args from Scheduler except redis_conn which |
---|
74 | must be something closer to a StrictRedis instance. |
---|
75 | |
---|
76 | My only regret - and the reason why I kept this under the hood for a |
---|
77 | while - is that it's hard to hook up in web2py to something happening |
---|
78 | right after the commit to a table, which will enable this version of the |
---|
79 | scheduler to process "immediate" tasks right away instead of waiting a |
---|
80 | few seconds (see FIXME in queue_task()) |
---|
81 | |
---|
82 | mode is reserved for future usage patterns. |
---|
83 | Right now it moves the coordination (which is the most intensive |
---|
84 | routine in the scheduler in matters of IPC) of workers to redis. |
---|
85 | I'd like to have incrementally redis-backed modes of operations, |
---|
86 | such as e.g.: |
---|
87 | - 1: IPC through redis (which is the current implementation) |
---|
88 | - 2: Store task results in redis (which will relieve further pressure |
---|
89 | from the db leaving the scheduler_run table empty and possibly |
---|
90 | keep things smooth as tasks results can be set to expire |
---|
91 | after a bit of time) |
---|
92 | - 3: Move all the logic for storing and queueing tasks to redis |
---|
93 | itself - which means no scheduler_task usage too - and use |
---|
94 | the database only as an historical record-bookkeeping |
---|
95 | (e.g. for reporting) |
---|
96 | |
---|
97 | As usual, I'm eager to see your comments. |
---|
98 | """ |
---|
99 | |
---|
100 | Scheduler.__init__(self, db, tasks=tasks, migrate=migrate, |
---|
101 | worker_name=worker_name, group_names=group_names, |
---|
102 | heartbeat=heartbeat, max_empty_runs=max_empty_runs, |
---|
103 | discard_results=discard_results, utc_time=utc_time) |
---|
104 | |
---|
105 | self.r_server = redis_conn |
---|
106 | from gluon import current |
---|
107 | self._application = current.request.application or 'appname' |
---|
108 | |
---|
109 | def _nkey(self, key): |
---|
110 | """Helper to restrict all keys to a namespace and track them.""" |
---|
111 | prefix = 'w2p:rsched:%s' % self._application |
---|
112 | allkeys = '%s:allkeys' % prefix |
---|
113 | newkey = "%s:%s" % (prefix, key) |
---|
114 | self.r_server.sadd(allkeys, newkey) |
---|
115 | return newkey |
---|
116 | |
---|
117 | def prune_all(self): |
---|
118 | """Global housekeeping.""" |
---|
119 | all_keys = self._nkey('allkeys') |
---|
120 | with self.r_server.pipeline() as pipe: |
---|
121 | while True: |
---|
122 | try: |
---|
123 | pipe.watch('PRUNE_ALL') |
---|
124 | while True: |
---|
125 | k = pipe.spop(all_keys) |
---|
126 | if k is None: |
---|
127 | break |
---|
128 | pipe.delete(k) |
---|
129 | pipe.execute() |
---|
130 | break |
---|
131 | except RWatchError: |
---|
132 | time.sleep(0.1) |
---|
133 | continue |
---|
134 | |
---|
135 | def dt2str(self, value): |
---|
136 | return value.strftime('%Y-%m-%d %H:%M:%S') |
---|
137 | |
---|
138 | def str2date(self, value): |
---|
139 | return datetime.datetime.strptime(value, '%Y-%m-%d %H:%M:%S') |
---|
140 | |
---|
141 | def send_heartbeat(self, counter): |
---|
142 | """ |
---|
143 | Workers coordination in redis. |
---|
144 | It has evolved into something is not that easy. |
---|
145 | Here we try to do what we need in a single transaction, |
---|
146 | and retry that transaction if something goes wrong |
---|
147 | """ |
---|
148 | with self.r_server.pipeline() as pipe: |
---|
149 | while True: |
---|
150 | try: |
---|
151 | pipe.watch('SEND_HEARTBEAT') |
---|
152 | self.inner_send_heartbeat(counter, pipe) |
---|
153 | pipe.execute() |
---|
154 | self.adj_hibernation() |
---|
155 | self.sleep() |
---|
156 | break |
---|
157 | except RWatchError: |
---|
158 | time.sleep(0.1) |
---|
159 | continue |
---|
160 | |
---|
161 | def inner_send_heartbeat(self, counter, pipe): |
---|
162 | """ |
---|
163 | Do a few things in the "maintenance" thread. |
---|
164 | |
---|
165 | Specifically: |
---|
166 | - registers the workers |
---|
167 | - accepts commands sent to workers (KILL, TERMINATE, PICK, DISABLED, etc) |
---|
168 | - adjusts sleep |
---|
169 | - saves stats |
---|
170 | - elects master |
---|
171 | - does "housecleaning" for dead workers |
---|
172 | - triggers tasks assignment |
---|
173 | """ |
---|
174 | r_server = pipe |
---|
175 | status_keyset = self._nkey('worker_statuses') |
---|
176 | status_key = self._nkey('worker_status:%s' % (self.worker_name)) |
---|
177 | now = self.now() |
---|
178 | mybackedstatus = r_server.hgetall(status_key) |
---|
179 | if not mybackedstatus: |
---|
180 | r_server.hmset( |
---|
181 | status_key, |
---|
182 | dict( |
---|
183 | status=ACTIVE, worker_name=self.worker_name, |
---|
184 | first_heartbeat=self.dt2str(now), |
---|
185 | last_heartbeat=self.dt2str(now), |
---|
186 | group_names=dumps(self.group_names), is_ticker=False, |
---|
187 | worker_stats=dumps(self.w_stats)) |
---|
188 | ) |
---|
189 | r_server.sadd(status_keyset, status_key) |
---|
190 | if not self.w_stats.status == POLLING: |
---|
191 | self.w_stats.status = ACTIVE |
---|
192 | self.w_stats.sleep = self.heartbeat |
---|
193 | mybackedstatus = ACTIVE |
---|
194 | else: |
---|
195 | mybackedstatus = mybackedstatus['status'] |
---|
196 | if mybackedstatus == DISABLED: |
---|
197 | # keep sleeping |
---|
198 | self.w_stats.status = DISABLED |
---|
199 | r_server.hmset( |
---|
200 | status_key, |
---|
201 | dict(last_heartbeat=self.dt2str(now), |
---|
202 | worker_stats=dumps(self.w_stats)) |
---|
203 | ) |
---|
204 | elif mybackedstatus == TERMINATE: |
---|
205 | self.w_stats.status = TERMINATE |
---|
206 | logger.debug("Waiting to terminate the current task") |
---|
207 | self.give_up() |
---|
208 | elif mybackedstatus == KILL: |
---|
209 | self.w_stats.status = KILL |
---|
210 | self.die() |
---|
211 | else: |
---|
212 | if mybackedstatus == STOP_TASK: |
---|
213 | logger.info('Asked to kill the current task') |
---|
214 | self.terminate_process() |
---|
215 | logger.info('........recording heartbeat (%s)', |
---|
216 | self.w_stats.status) |
---|
217 | r_server.hmset( |
---|
218 | status_key, |
---|
219 | dict( |
---|
220 | last_heartbeat=self.dt2str(now), status=ACTIVE, |
---|
221 | worker_stats=dumps(self.w_stats) |
---|
222 | ) |
---|
223 | ) |
---|
224 | # newroutine |
---|
225 | r_server.expire(status_key, self.heartbeat * 3 * 15) |
---|
226 | self.w_stats.sleep = self.heartbeat # re-activating the process |
---|
227 | if self.w_stats.status not in (RUNNING, POLLING): |
---|
228 | self.w_stats.status = ACTIVE |
---|
229 | |
---|
230 | self.do_assign_tasks = False |
---|
231 | if counter % 5 == 0 or mybackedstatus == PICK: |
---|
232 | try: |
---|
233 | logger.info( |
---|
234 | ' freeing workers that have not sent heartbeat') |
---|
235 | registered_workers = r_server.smembers(status_keyset) |
---|
236 | allkeys = self._nkey('allkeys') |
---|
237 | for worker in registered_workers: |
---|
238 | w = r_server.hgetall(worker) |
---|
239 | w = Storage(w) |
---|
240 | if not w: |
---|
241 | r_server.srem(status_keyset, worker) |
---|
242 | logger.info('removing %s from %s', worker, allkeys) |
---|
243 | r_server.srem(allkeys, worker) |
---|
244 | continue |
---|
245 | try: |
---|
246 | self.is_a_ticker = self.being_a_ticker(pipe) |
---|
247 | except: |
---|
248 | pass |
---|
249 | if self.w_stats.status in (ACTIVE, POLLING): |
---|
250 | self.do_assign_tasks = True |
---|
251 | if self.is_a_ticker and self.do_assign_tasks: |
---|
252 | # I'm a ticker, and 5 loops passed without reassigning tasks, |
---|
253 | # let's do that and loop again |
---|
254 | if not self.db_thread: |
---|
255 | logger.debug('thread building own DAL object') |
---|
256 | self.db_thread = DAL( |
---|
257 | self.db._uri, folder=self.db._adapter.folder) |
---|
258 | self.define_tables(self.db_thread, migrate=False) |
---|
259 | db = self.db_thread |
---|
260 | self.wrapped_assign_tasks(db) |
---|
261 | return None |
---|
262 | except: |
---|
263 | logger.error('Error assigning tasks') |
---|
264 | |
---|
265 | def being_a_ticker(self, pipe): |
---|
266 | """ |
---|
267 | Elects a ticker. |
---|
268 | |
---|
269 | This is slightly more convoluted than the original |
---|
270 | but if far more efficient |
---|
271 | """ |
---|
272 | r_server = pipe |
---|
273 | status_keyset = self._nkey('worker_statuses') |
---|
274 | registered_workers = r_server.smembers(status_keyset) |
---|
275 | ticker = None |
---|
276 | all_active = [] |
---|
277 | all_workers = [] |
---|
278 | for worker in registered_workers: |
---|
279 | w = r_server.hgetall(worker) |
---|
280 | if w['worker_name'] != self.worker_name and w['status'] == ACTIVE: |
---|
281 | all_active.append(w) |
---|
282 | if w['is_ticker'] == 'True' and ticker is None: |
---|
283 | ticker = w |
---|
284 | all_workers.append(w) |
---|
285 | not_busy = self.w_stats.status in (ACTIVE, POLLING) |
---|
286 | if not ticker: |
---|
287 | if not_busy: |
---|
288 | # only if this worker isn't busy, otherwise wait for a free one |
---|
289 | for worker in all_workers: |
---|
290 | key = self._nkey('worker_status:%s' % worker['worker_name']) |
---|
291 | if worker['worker_name'] == self.worker_name: |
---|
292 | r_server.hset(key, 'is_ticker', True) |
---|
293 | else: |
---|
294 | r_server.hset(key, 'is_ticker', False) |
---|
295 | logger.info("TICKER: I'm a ticker") |
---|
296 | else: |
---|
297 | # giving up, only if I'm not alone |
---|
298 | if len(all_active) > 1: |
---|
299 | key = self._nkey('worker_status:%s' % (self.worker_name)) |
---|
300 | r_server.hset(key, 'is_ticker', False) |
---|
301 | else: |
---|
302 | not_busy = True |
---|
303 | return not_busy |
---|
304 | else: |
---|
305 | logger.info( |
---|
306 | "%s is a ticker, I'm a poor worker" % ticker['worker_name']) |
---|
307 | return False |
---|
308 | |
---|
309 | def assign_tasks(self, db): |
---|
310 | """ |
---|
311 | The real beauty. |
---|
312 | |
---|
313 | We don't need to ASSIGN tasks, we just put |
---|
314 | them into the relevant queue |
---|
315 | """ |
---|
316 | st, sd = db.scheduler_task, db.scheduler_task_deps |
---|
317 | r_server = self.r_server |
---|
318 | now = self.now() |
---|
319 | status_keyset = self._nkey('worker_statuses') |
---|
320 | with r_server.pipeline() as pipe: |
---|
321 | while True: |
---|
322 | try: |
---|
323 | # making sure we're the only one doing the job |
---|
324 | pipe.watch('ASSIGN_TASKS') |
---|
325 | registered_workers = pipe.smembers(status_keyset) |
---|
326 | all_workers = [] |
---|
327 | for worker in registered_workers: |
---|
328 | w = pipe.hgetall(worker) |
---|
329 | if w['status'] == ACTIVE: |
---|
330 | all_workers.append(Storage(w)) |
---|
331 | pipe.execute() |
---|
332 | break |
---|
333 | except RWatchError: |
---|
334 | time.sleep(0.1) |
---|
335 | continue |
---|
336 | |
---|
337 | # build workers as dict of groups |
---|
338 | wkgroups = {} |
---|
339 | for w in all_workers: |
---|
340 | group_names = loads(w.group_names) |
---|
341 | for gname in group_names: |
---|
342 | if gname not in wkgroups: |
---|
343 | wkgroups[gname] = dict( |
---|
344 | workers=[{'name': w.worker_name, 'c': 0}]) |
---|
345 | else: |
---|
346 | wkgroups[gname]['workers'].append( |
---|
347 | {'name': w.worker_name, 'c': 0}) |
---|
348 | # set queued tasks that expired between "runs" (i.e., you turned off |
---|
349 | # the scheduler): then it wasn't expired, but now it is |
---|
350 | db( |
---|
351 | (st.status.belongs((QUEUED, ASSIGNED))) & |
---|
352 | (st.stop_time < now) |
---|
353 | ).update(status=EXPIRED) |
---|
354 | |
---|
355 | # calculate dependencies |
---|
356 | deps_with_no_deps = db( |
---|
357 | (sd.can_visit == False) & |
---|
358 | (~sd.task_child.belongs( |
---|
359 | db(sd.can_visit == False)._select(sd.task_parent) |
---|
360 | ) |
---|
361 | ) |
---|
362 | )._select(sd.task_child) |
---|
363 | no_deps = db( |
---|
364 | (st.status.belongs((QUEUED, ASSIGNED))) & |
---|
365 | ( |
---|
366 | (sd.id == None) | (st.id.belongs(deps_with_no_deps)) |
---|
367 | |
---|
368 | ) |
---|
369 | )._select(st.id, distinct=True, left=sd.on( |
---|
370 | (st.id == sd.task_parent) & |
---|
371 | (sd.can_visit == False) |
---|
372 | ) |
---|
373 | ) |
---|
374 | |
---|
375 | all_available = db( |
---|
376 | (st.status.belongs((QUEUED, ASSIGNED))) & |
---|
377 | (st.next_run_time <= now) & |
---|
378 | (st.enabled == True) & |
---|
379 | (st.id.belongs(no_deps)) |
---|
380 | ) |
---|
381 | |
---|
382 | limit = len(all_workers) * (50 / (len(wkgroups) or 1)) |
---|
383 | |
---|
384 | # let's freeze it up |
---|
385 | db.commit() |
---|
386 | x = 0 |
---|
387 | r_server = self.r_server |
---|
388 | for group in wkgroups.keys(): |
---|
389 | queued_list = self._nkey('queued:%s' % group) |
---|
390 | queued_set = self._nkey('queued_set:%s' % group) |
---|
391 | # if are running, let's don't assign them again |
---|
392 | running_list = self._nkey('running:%s' % group) |
---|
393 | while True: |
---|
394 | # the joys for rpoplpush! |
---|
395 | t = r_server.rpoplpush(running_list, queued_list) |
---|
396 | if not t: |
---|
397 | # no more |
---|
398 | break |
---|
399 | r_server.sadd(queued_set, t) |
---|
400 | |
---|
401 | tasks = all_available(st.group_name == group).select( |
---|
402 | limitby=(0, limit), orderby = st.next_run_time) |
---|
403 | |
---|
404 | # put tasks in the processing list |
---|
405 | |
---|
406 | for task in tasks: |
---|
407 | x += 1 |
---|
408 | gname = task.group_name |
---|
409 | |
---|
410 | if r_server.sismember(queued_set, task.id): |
---|
411 | # already queued, we don't put on the list |
---|
412 | continue |
---|
413 | r_server.sadd(queued_set, task.id) |
---|
414 | r_server.lpush(queued_list, task.id) |
---|
415 | d = dict(status=QUEUED) |
---|
416 | if not task.task_name: |
---|
417 | d['task_name'] = task.function_name |
---|
418 | db( |
---|
419 | (st.id == task.id) & |
---|
420 | (st.status.belongs((QUEUED, ASSIGNED))) |
---|
421 | ).update(**d) |
---|
422 | db.commit() |
---|
423 | # I didn't report tasks but I'm working nonetheless!!!! |
---|
424 | if x > 0: |
---|
425 | self.w_stats.empty_runs = 0 |
---|
426 | self.w_stats.queue = x |
---|
427 | self.w_stats.distribution = wkgroups |
---|
428 | self.w_stats.workers = len(all_workers) |
---|
429 | # I'll be greedy only if tasks queued are equal to the limit |
---|
430 | # (meaning there could be others ready to be queued) |
---|
431 | self.greedy = x >= limit |
---|
432 | logger.info('TICKER: workers are %s', len(all_workers)) |
---|
433 | logger.info('TICKER: tasks are %s', x) |
---|
434 | |
---|
435 | def pop_task(self, db): |
---|
436 | """Lift a task off a queue.""" |
---|
437 | r_server = self.r_server |
---|
438 | st = self.db.scheduler_task |
---|
439 | task = None |
---|
440 | # ready to process something |
---|
441 | for group in self.group_names: |
---|
442 | queued_set = self._nkey('queued_set:%s' % group) |
---|
443 | queued_list = self._nkey('queued:%s' % group) |
---|
444 | running_list = self._nkey('running:%s' % group) |
---|
445 | running_dict = self._nkey('running_dict:%s' % group) |
---|
446 | self.w_stats.status = POLLING |
---|
447 | # polling for 1 minute in total. If more groups are in, |
---|
448 | # polling is 1 minute in total |
---|
449 | logger.debug(' polling on %s', group) |
---|
450 | task_id = r_server.brpoplpush(queued_list, running_list, |
---|
451 | timeout=60 / len(self.group_names)) |
---|
452 | logger.debug(' finished polling') |
---|
453 | self.w_stats.status = ACTIVE |
---|
454 | if task_id: |
---|
455 | r_server.hset(running_dict, task_id, self.worker_name) |
---|
456 | r_server.srem(queued_set, task_id) |
---|
457 | task = db( |
---|
458 | (st.id == task_id) & |
---|
459 | (st.status == QUEUED) |
---|
460 | ).select().first() |
---|
461 | if not task: |
---|
462 | r_server.lrem(running_list, 0, task_id) |
---|
463 | r_server.hdel(running_dict, task_id) |
---|
464 | r_server.lrem(queued_list, 0, task_id) |
---|
465 | logger.error("we received a task that isn't there (%s)", |
---|
466 | task_id) |
---|
467 | return None |
---|
468 | break |
---|
469 | now = self.now() |
---|
470 | if task: |
---|
471 | task.update_record(status=RUNNING, last_run_time=now) |
---|
472 | # noone will touch my task! |
---|
473 | db.commit() |
---|
474 | logger.debug(' work to do %s', task.id) |
---|
475 | else: |
---|
476 | logger.info('nothing to do') |
---|
477 | return None |
---|
478 | times_run = task.times_run + 1 |
---|
479 | if task.cronline: |
---|
480 | cron_recur = CronParser(task.cronline, now.replace(second=0)) |
---|
481 | next_run_time = cron_recur.get_next() |
---|
482 | elif not task.prevent_drift: |
---|
483 | next_run_time = task.last_run_time + datetime.timedelta( |
---|
484 | seconds=task.period |
---|
485 | ) |
---|
486 | else: |
---|
487 | # calc next_run_time based on available slots |
---|
488 | # see #1191 |
---|
489 | next_run_time = task.start_time |
---|
490 | secondspassed = (now - next_run_time).total_seconds() |
---|
491 | steps = secondspassed // task.period + 1 |
---|
492 | next_run_time += datetime.timedelta(seconds=task.period * steps) |
---|
493 | |
---|
494 | if times_run < task.repeats or task.repeats == 0: |
---|
495 | # need to run (repeating task) |
---|
496 | run_again = True |
---|
497 | else: |
---|
498 | # no need to run again |
---|
499 | run_again = False |
---|
500 | run_id = 0 |
---|
501 | while True and not self.discard_results: |
---|
502 | logger.debug(' new scheduler_run record') |
---|
503 | try: |
---|
504 | run_id = db.scheduler_run.insert( |
---|
505 | task_id=task.id, |
---|
506 | status=RUNNING, |
---|
507 | start_time=now, |
---|
508 | worker_name=self.worker_name) |
---|
509 | db.commit() |
---|
510 | break |
---|
511 | except: |
---|
512 | time.sleep(0.5) |
---|
513 | db.rollback() |
---|
514 | logger.info('new task %(id)s "%(task_name)s"' |
---|
515 | ' %(application_name)s.%(function_name)s' % task) |
---|
516 | return Task( |
---|
517 | app=task.application_name, |
---|
518 | function=task.function_name, |
---|
519 | timeout=task.timeout, |
---|
520 | args=task.args, # in json |
---|
521 | vars=task.vars, # in json |
---|
522 | task_id=task.id, |
---|
523 | run_id=run_id, |
---|
524 | run_again=run_again, |
---|
525 | next_run_time=next_run_time, |
---|
526 | times_run=times_run, |
---|
527 | stop_time=task.stop_time, |
---|
528 | retry_failed=task.retry_failed, |
---|
529 | times_failed=task.times_failed, |
---|
530 | sync_output=task.sync_output, |
---|
531 | uuid=task.uuid, |
---|
532 | group_name=task.group_name) |
---|
533 | |
---|
534 | def report_task(self, task, task_report): |
---|
535 | """ |
---|
536 | Override. |
---|
537 | |
---|
538 | Needs it only because we need to pop from the |
---|
539 | running tasks |
---|
540 | """ |
---|
541 | r_server = self.r_server |
---|
542 | db = self.db |
---|
543 | now = self.now() |
---|
544 | st = db.scheduler_task |
---|
545 | sr = db.scheduler_run |
---|
546 | if not self.discard_results: |
---|
547 | if task_report.result != 'null' or task_report.tb: |
---|
548 | # result is 'null' as a string if task completed |
---|
549 | # if it's stopped it's None as NoneType, so we record |
---|
550 | # the STOPPED "run" anyway |
---|
551 | logger.debug(' recording task report in db (%s)', |
---|
552 | task_report.status) |
---|
553 | db(sr.id == task.run_id).update( |
---|
554 | status=task_report.status, |
---|
555 | stop_time=now, |
---|
556 | run_result=task_report.result, |
---|
557 | run_output=task_report.output, |
---|
558 | traceback=task_report.tb) |
---|
559 | else: |
---|
560 | logger.debug(' deleting task report in db because of no result') |
---|
561 | db(sr.id == task.run_id).delete() |
---|
562 | # if there is a stop_time and the following run would exceed it |
---|
563 | is_expired = (task.stop_time and |
---|
564 | task.next_run_time > task.stop_time and |
---|
565 | True or False) |
---|
566 | status = (task.run_again and is_expired and EXPIRED or |
---|
567 | task.run_again and not is_expired and |
---|
568 | QUEUED or COMPLETED) |
---|
569 | if task_report.status == COMPLETED: |
---|
570 | # assigned calculations |
---|
571 | d = dict(status=status, |
---|
572 | next_run_time=task.next_run_time, |
---|
573 | times_run=task.times_run, |
---|
574 | times_failed=0, |
---|
575 | assigned_worker_name=self.worker_name |
---|
576 | ) |
---|
577 | db(st.id == task.task_id).update(**d) |
---|
578 | if status == COMPLETED: |
---|
579 | self.update_dependencies(db, task.task_id) |
---|
580 | else: |
---|
581 | st_mapping = {'FAILED': 'FAILED', |
---|
582 | 'TIMEOUT': 'TIMEOUT', |
---|
583 | 'STOPPED': 'FAILED'}[task_report.status] |
---|
584 | status = (task.retry_failed and |
---|
585 | task.times_failed < task.retry_failed and |
---|
586 | QUEUED or task.retry_failed == -1 and |
---|
587 | QUEUED or st_mapping) |
---|
588 | db(st.id == task.task_id).update( |
---|
589 | times_failed=st.times_failed + 1, |
---|
590 | next_run_time=task.next_run_time, |
---|
591 | status=status, |
---|
592 | assigned_worker_name=self.worker_name |
---|
593 | ) |
---|
594 | logger.info('task completed (%s)', task_report.status) |
---|
595 | running_list = self._nkey('running:%s' % task.group_name) |
---|
596 | running_dict = self._nkey('running_dict:%s' % task.group_name) |
---|
597 | r_server.lrem(running_list, 0, task.task_id) |
---|
598 | r_server.hdel(running_dict, task.task_id) |
---|
599 | |
---|
600 | def wrapped_pop_task(self): |
---|
601 | """Commodity function to call `pop_task` and trap exceptions. |
---|
602 | If an exception is raised, assume it happened because of database |
---|
603 | contention and retries `pop_task` after 0.5 seconds |
---|
604 | """ |
---|
605 | db = self.db |
---|
606 | db.commit() # another nifty db.commit() only for Mysql |
---|
607 | x = 0 |
---|
608 | while x < 10: |
---|
609 | try: |
---|
610 | rtn = self.pop_task(db) |
---|
611 | return rtn |
---|
612 | break |
---|
613 | # this is here to "interrupt" any blrpoplpush op easily |
---|
614 | except KeyboardInterrupt: |
---|
615 | self.give_up() |
---|
616 | break |
---|
617 | except: |
---|
618 | self.w_stats.errors += 1 |
---|
619 | db.rollback() |
---|
620 | logger.error(' error popping tasks') |
---|
621 | x += 1 |
---|
622 | time.sleep(0.5) |
---|
623 | |
---|
624 | def get_workers(self, only_ticker=False): |
---|
625 | """Return a dict holding worker_name : {**columns} |
---|
626 | representing all "registered" workers. |
---|
627 | only_ticker returns only the worker running as a TICKER, |
---|
628 | if there is any |
---|
629 | """ |
---|
630 | r_server = self.r_server |
---|
631 | status_keyset = self._nkey('worker_statuses') |
---|
632 | registered_workers = r_server.smembers(status_keyset) |
---|
633 | all_workers = {} |
---|
634 | for worker in registered_workers: |
---|
635 | w = r_server.hgetall(worker) |
---|
636 | w = Storage(w) |
---|
637 | if not w: |
---|
638 | continue |
---|
639 | all_workers[w.worker_name] = Storage( |
---|
640 | status=w.status, |
---|
641 | first_heartbeat=self.str2date(w.first_heartbeat), |
---|
642 | last_heartbeat=self.str2date(w.last_heartbeat), |
---|
643 | group_names=loads(w.group_names, object_hook=_decode_dict), |
---|
644 | is_ticker=w.is_ticker == 'True' and True or False, |
---|
645 | worker_stats=loads(w.worker_stats, object_hook=_decode_dict) |
---|
646 | ) |
---|
647 | if only_ticker: |
---|
648 | for k, v in all_workers.iteritems(): |
---|
649 | if v['is_ticker']: |
---|
650 | return {k: v} |
---|
651 | return {} |
---|
652 | return all_workers |
---|
653 | |
---|
654 | def set_worker_status(self, group_names=None, action=ACTIVE, |
---|
655 | exclude=None, limit=None, worker_name=None): |
---|
656 | """Internal function to set worker's status""" |
---|
657 | r_server = self.r_server |
---|
658 | all_workers = self.get_workers() |
---|
659 | if not group_names: |
---|
660 | group_names = self.group_names |
---|
661 | elif isinstance(group_names, str): |
---|
662 | group_names = [group_names] |
---|
663 | exclusion = exclude and exclude.append(action) or [action] |
---|
664 | workers = [] |
---|
665 | if worker_name is not None: |
---|
666 | if worker_name in all_workers.keys(): |
---|
667 | workers = [worker_name] |
---|
668 | else: |
---|
669 | for k, v in all_workers.iteritems(): |
---|
670 | if v.status not in exclusion and set(group_names) & set(v.group_names): |
---|
671 | workers.append(k) |
---|
672 | if limit and worker_name is None: |
---|
673 | workers = workers[:limit] |
---|
674 | if workers: |
---|
675 | with r_server.pipeline() as pipe: |
---|
676 | while True: |
---|
677 | try: |
---|
678 | pipe.watch('SET_WORKER_STATUS') |
---|
679 | for w in workers: |
---|
680 | worker_key = self._nkey('worker_status:%s' % w) |
---|
681 | pipe.hset(worker_key, 'status', action) |
---|
682 | pipe.execute() |
---|
683 | break |
---|
684 | except RWatchError: |
---|
685 | time.sleep(0.1) |
---|
686 | continue |
---|
687 | |
---|
688 | def queue_task(self, function, pargs=[], pvars={}, **kwargs): |
---|
689 | """ |
---|
690 | FIXME: immediate should put item in queue. The hard part is |
---|
691 | that currently there are no hooks happening at post-commit time |
---|
692 | Queue tasks. This takes care of handling the validation of all |
---|
693 | parameters |
---|
694 | |
---|
695 | Args: |
---|
696 | function: the function (anything callable with a __name__) |
---|
697 | pargs: "raw" args to be passed to the function. Automatically |
---|
698 | jsonified. |
---|
699 | pvars: "raw" kwargs to be passed to the function. Automatically |
---|
700 | jsonified |
---|
701 | kwargs: all the parameters available (basically, every |
---|
702 | `scheduler_task` column). If args and vars are here, they should |
---|
703 | be jsonified already, and they will override pargs and pvars |
---|
704 | |
---|
705 | Returns: |
---|
706 | a dict just as a normal validate_and_insert(), plus a uuid key |
---|
707 | holding the uuid of the queued task. If validation is not passed |
---|
708 | ( i.e. some parameters are invalid) both id and uuid will be None, |
---|
709 | and you'll get an "error" dict holding the errors found. |
---|
710 | """ |
---|
711 | if hasattr(function, '__name__'): |
---|
712 | function = function.__name__ |
---|
713 | targs = 'args' in kwargs and kwargs.pop('args') or dumps(pargs) |
---|
714 | tvars = 'vars' in kwargs and kwargs.pop('vars') or dumps(pvars) |
---|
715 | tuuid = 'uuid' in kwargs and kwargs.pop('uuid') or web2py_uuid() |
---|
716 | tname = 'task_name' in kwargs and kwargs.pop('task_name') or function |
---|
717 | immediate = 'immediate' in kwargs and kwargs.pop('immediate') or None |
---|
718 | cronline = kwargs.get('cronline') |
---|
719 | kwargs.update(function_name=function, |
---|
720 | task_name=tname, |
---|
721 | args=targs, |
---|
722 | vars=tvars, |
---|
723 | uuid=tuuid) |
---|
724 | if cronline: |
---|
725 | try: |
---|
726 | start_time = kwargs.get('start_time', self.now) |
---|
727 | next_run_time = CronParser(cronline, start_time).get_next() |
---|
728 | kwargs.update(start_time=start_time, next_run_time=next_run_time) |
---|
729 | except: |
---|
730 | pass |
---|
731 | rtn = self.db.scheduler_task.validate_and_insert(**kwargs) |
---|
732 | if not rtn.errors: |
---|
733 | rtn.uuid = tuuid |
---|
734 | if immediate: |
---|
735 | r_server = self.r_server |
---|
736 | ticker = self.get_workers(only_ticker=True) |
---|
737 | if ticker.keys(): |
---|
738 | ticker = ticker.keys()[0] |
---|
739 | with r_server.pipeline() as pipe: |
---|
740 | while True: |
---|
741 | try: |
---|
742 | pipe.watch('SET_WORKER_STATUS') |
---|
743 | worker_key = self._nkey('worker_status:%s' % ticker) |
---|
744 | pipe.hset(worker_key, 'status', 'PICK') |
---|
745 | pipe.execute() |
---|
746 | break |
---|
747 | except RWatchError: |
---|
748 | time.sleep(0.1) |
---|
749 | continue |
---|
750 | else: |
---|
751 | rtn.uuid = None |
---|
752 | return rtn |
---|
753 | |
---|
754 | def stop_task(self, ref): |
---|
755 | """Shortcut for task termination. |
---|
756 | |
---|
757 | If the task is RUNNING it will terminate it, meaning that status |
---|
758 | will be set as FAILED. |
---|
759 | |
---|
760 | If the task is QUEUED, its stop_time will be set as to "now", |
---|
761 | the enabled flag will be set to False, and the status to STOPPED |
---|
762 | |
---|
763 | Args: |
---|
764 | ref: can be |
---|
765 | |
---|
766 | - an integer : lookup will be done by scheduler_task.id |
---|
767 | - a string : lookup will be done by scheduler_task.uuid |
---|
768 | |
---|
769 | Returns: |
---|
770 | - 1 if task was stopped (meaning an update has been done) |
---|
771 | - None if task was not found, or if task was not RUNNING or QUEUED |
---|
772 | |
---|
773 | Note: |
---|
774 | Experimental |
---|
775 | """ |
---|
776 | r_server = self.r_server |
---|
777 | st = self.db.scheduler_task |
---|
778 | if isinstance(ref, int): |
---|
779 | q = st.id == ref |
---|
780 | elif isinstance(ref, str): |
---|
781 | q = st.uuid == ref |
---|
782 | else: |
---|
783 | raise SyntaxError( |
---|
784 | "You can retrieve results only by id or uuid") |
---|
785 | task = self.db(q).select(st.id, st.status, st.group_name) |
---|
786 | task = task.first() |
---|
787 | rtn = None |
---|
788 | if not task: |
---|
789 | return rtn |
---|
790 | running_dict = self._nkey('running_dict:%s' % task.group_name) |
---|
791 | if task.status == 'RUNNING': |
---|
792 | worker_key = r_server.hget(running_dict, task.id) |
---|
793 | worker_key = self._nkey('worker_status:%s' % (worker_key)) |
---|
794 | r_server.hset(worker_key, 'status', STOP_TASK) |
---|
795 | elif task.status == 'QUEUED': |
---|
796 | rtn = self.db(q).update( |
---|
797 | stop_time=self.now(), |
---|
798 | enabled=False, |
---|
799 | status=STOPPED) |
---|
800 | return rtn |
---|