1 | #!/usr/bin/env python |
---|
2 | # -*- coding: utf-8 -*- |
---|
3 | # vim: set ts=4 sw=4 et ai: |
---|
4 | """ |
---|
5 | | This file is part of the web2py Web Framework |
---|
6 | | Copyrighted by Massimo Di Pierro <mdipierro@cs.depaul.edu> |
---|
7 | | License: LGPLv3 (http://www.gnu.org/licenses/lgpl.html) |
---|
8 | |
---|
9 | Background processes made simple |
---|
10 | --------------------------------- |
---|
11 | """ |
---|
12 | |
---|
13 | from __future__ import print_function |
---|
14 | |
---|
15 | import socket |
---|
16 | import os |
---|
17 | import logging |
---|
18 | import types |
---|
19 | from functools import reduce |
---|
20 | import datetime |
---|
21 | import re |
---|
22 | import sys |
---|
23 | from json import loads, dumps |
---|
24 | import tempfile |
---|
25 | import traceback |
---|
26 | import threading |
---|
27 | import multiprocessing |
---|
28 | import time |
---|
29 | import signal |
---|
30 | |
---|
31 | from gluon import DAL, Field, IS_NOT_EMPTY, IS_IN_SET, IS_NOT_IN_DB, IS_EMPTY_OR |
---|
32 | from gluon import IS_INT_IN_RANGE, IS_DATETIME, IS_IN_DB |
---|
33 | from gluon.utils import web2py_uuid |
---|
34 | from gluon._compat import Queue, long, iteritems, PY2, to_bytes, string_types, integer_types |
---|
35 | from gluon.storage import Storage |
---|
36 | |
---|
37 | USAGE = """ |
---|
38 | ## Example |
---|
39 | |
---|
40 | For any existing application myapp |
---|
41 | |
---|
42 | Create File: myapp/models/scheduler.py ====== |
---|
43 | from gluon.scheduler import Scheduler |
---|
44 | |
---|
45 | def demo1(*args, **vars): |
---|
46 | print('you passed args=%s and vars=%s' % (args, vars)) |
---|
47 | return 'done!' |
---|
48 | |
---|
49 | def demo2(): |
---|
50 | 1/0 |
---|
51 | |
---|
52 | scheduler = Scheduler(db, dict(demo1=demo1, demo2=demo2)) |
---|
53 | ## run worker nodes with: |
---|
54 | |
---|
55 | cd web2py |
---|
56 | python web2py.py -K myapp |
---|
57 | or |
---|
58 | python gluon/scheduler.py -u sqlite://storage.sqlite \ |
---|
59 | -f applications/myapp/databases/ \ |
---|
60 | -t mytasks.py |
---|
61 | (-h for info) |
---|
62 | python scheduler.py -h |
---|
63 | |
---|
64 | ## schedule jobs using |
---|
65 | http://127.0.0.1:8000/myapp/appadmin/insert/db/scheduler_task |
---|
66 | |
---|
67 | ## monitor scheduled jobs |
---|
68 | http://127.0.0.1:8000/myapp/appadmin/select/db?query=db.scheduler_task.id |
---|
69 | |
---|
70 | ## view completed jobs |
---|
71 | http://127.0.0.1:8000/myapp/appadmin/select/db?query=db.scheduler_run.id |
---|
72 | |
---|
73 | ## view workers |
---|
74 | http://127.0.0.1:8000/myapp/appadmin/select/db?query=db.scheduler_worker.id |
---|
75 | |
---|
76 | """ |
---|
77 | |
---|
78 | IDENTIFIER = "%s#%s" % (socket.gethostname(), os.getpid()) |
---|
79 | |
---|
80 | logger = logging.getLogger('web2py.scheduler.%s' % IDENTIFIER) |
---|
81 | |
---|
82 | QUEUED = 'QUEUED' |
---|
83 | ASSIGNED = 'ASSIGNED' |
---|
84 | RUNNING = 'RUNNING' |
---|
85 | COMPLETED = 'COMPLETED' |
---|
86 | FAILED = 'FAILED' |
---|
87 | TIMEOUT = 'TIMEOUT' |
---|
88 | STOPPED = 'STOPPED' |
---|
89 | ACTIVE = 'ACTIVE' |
---|
90 | TERMINATE = 'TERMINATE' |
---|
91 | DISABLED = 'DISABLED' |
---|
92 | KILL = 'KILL' |
---|
93 | PICK = 'PICK' |
---|
94 | STOP_TASK = 'STOP_TASK' |
---|
95 | EXPIRED = 'EXPIRED' |
---|
96 | SECONDS = 1 |
---|
97 | HEARTBEAT = 3 * SECONDS |
---|
98 | MAXHIBERNATION = 10 |
---|
99 | CLEAROUT = '!clear!' |
---|
100 | RESULTINFILE = 'result_in_file:' |
---|
101 | |
---|
102 | CALLABLETYPES = (types.LambdaType, types.FunctionType, |
---|
103 | types.BuiltinFunctionType, |
---|
104 | types.MethodType, types.BuiltinMethodType) |
---|
105 | |
---|
106 | |
---|
107 | class Task(object): |
---|
108 | """Defines a "task" object that gets passed from the main thread to the |
---|
109 | executor's one |
---|
110 | """ |
---|
111 | def __init__(self, app, function, timeout, args='[]', vars='{}', **kwargs): |
---|
112 | logger.debug(' new task allocated: %s.%s', app, function) |
---|
113 | self.app = app |
---|
114 | self.function = function |
---|
115 | self.timeout = timeout |
---|
116 | self.args = args # json |
---|
117 | self.vars = vars # json |
---|
118 | self.__dict__.update(kwargs) |
---|
119 | |
---|
120 | def __str__(self): |
---|
121 | return '<Task: %s>' % self.function |
---|
122 | |
---|
123 | |
---|
124 | class TaskReport(object): |
---|
125 | """Defines a "task report" object that gets passed from the executor's |
---|
126 | thread to the main one |
---|
127 | """ |
---|
128 | def __init__(self, status, result=None, output=None, tb=None): |
---|
129 | logger.debug(' new task report: %s', status) |
---|
130 | if tb: |
---|
131 | logger.debug(' traceback: %s', tb) |
---|
132 | else: |
---|
133 | logger.debug(' result: %s', result) |
---|
134 | self.status = status |
---|
135 | self.result = result |
---|
136 | self.output = output |
---|
137 | self.tb = tb |
---|
138 | |
---|
139 | def __str__(self): |
---|
140 | return '<TaskReport: %s>' % self.status |
---|
141 | |
---|
142 | |
---|
143 | class JobGraph(object): |
---|
144 | """Experimental: dependencies amongs tasks.""" |
---|
145 | |
---|
146 | def __init__(self, db, job_name): |
---|
147 | self.job_name = job_name or 'job_0' |
---|
148 | self.db = db |
---|
149 | |
---|
150 | def add_deps(self, task_parent, task_child): |
---|
151 | """Create a dependency between task_parent and task_child.""" |
---|
152 | self.db.scheduler_task_deps.insert(task_parent=task_parent, |
---|
153 | task_child=task_child, |
---|
154 | job_name=self.job_name) |
---|
155 | |
---|
156 | def validate(self, job_name=None): |
---|
157 | """Validate if all tasks job_name can be completed. |
---|
158 | |
---|
159 | Checks if there are no mutual dependencies among tasks. |
---|
160 | Commits at the end if successfull, or it rollbacks the entire |
---|
161 | transaction. Handle with care! |
---|
162 | """ |
---|
163 | db = self.db |
---|
164 | sd = db.scheduler_task_deps |
---|
165 | if job_name: |
---|
166 | q = sd.job_name == job_name |
---|
167 | else: |
---|
168 | q = sd.id |
---|
169 | |
---|
170 | edges = db(q).select() |
---|
171 | nested_dict = {} |
---|
172 | for row in edges: |
---|
173 | k = row.task_parent |
---|
174 | if k in nested_dict: |
---|
175 | nested_dict[k].add(row.task_child) |
---|
176 | else: |
---|
177 | nested_dict[k] = set((row.task_child,)) |
---|
178 | try: |
---|
179 | rtn = [] |
---|
180 | for k, v in nested_dict.items(): |
---|
181 | v.discard(k) # Ignore self dependencies |
---|
182 | extra_items_in_deps = reduce(set.union, nested_dict.values()) - set(nested_dict.keys()) |
---|
183 | nested_dict.update(dict((item, set()) for item in extra_items_in_deps)) |
---|
184 | while True: |
---|
185 | ordered = set(item for item, dep in nested_dict.items() if not dep) |
---|
186 | if not ordered: |
---|
187 | break |
---|
188 | rtn.append(ordered) |
---|
189 | nested_dict = dict( |
---|
190 | (item, (dep - ordered)) for item, dep in nested_dict.items() |
---|
191 | if item not in ordered |
---|
192 | ) |
---|
193 | assert not nested_dict, "A cyclic dependency exists amongst %r" % nested_dict |
---|
194 | db.commit() |
---|
195 | return rtn |
---|
196 | except Exception: |
---|
197 | db.rollback() |
---|
198 | return None |
---|
199 | |
---|
200 | |
---|
201 | class CronParser(object): |
---|
202 | |
---|
203 | def __init__(self, cronline, base=None): |
---|
204 | self.cronline = cronline |
---|
205 | self.sched = base or datetime.datetime.now() |
---|
206 | self.task = None |
---|
207 | |
---|
208 | @staticmethod |
---|
209 | def _rangetolist(s, period='min'): |
---|
210 | if s.startswith('*'): |
---|
211 | if period == 'min': |
---|
212 | s = s.replace('*', '0-59', 1) |
---|
213 | elif period == 'hr': |
---|
214 | s = s.replace('*', '0-23', 1) |
---|
215 | elif period == 'dom': |
---|
216 | s = s.replace('*', '1-31', 1) |
---|
217 | elif period == 'mon': |
---|
218 | s = s.replace('*', '1-12', 1) |
---|
219 | elif period == 'dow': |
---|
220 | s = s.replace('*', '0-6', 1) |
---|
221 | match = re.match(r'(\d+)-(\d+)/(\d+)', s) |
---|
222 | if match: |
---|
223 | max_ = int(match.group(2)) + 1 |
---|
224 | step_ = int(match.group(3)) |
---|
225 | else: |
---|
226 | match = re.match(r'(\d+)/(\d+)', s) |
---|
227 | if match: |
---|
228 | ranges_max = dict(min=59, hr=23, mon=12, dom=31, dow=7) |
---|
229 | max_ = ranges_max[period] + 1 |
---|
230 | step_ = int(match.group(2)) |
---|
231 | if match: |
---|
232 | min_ = int(match.group(1)) |
---|
233 | retval = list(range(min_, max_, step_)) |
---|
234 | else: |
---|
235 | retval = [] |
---|
236 | return retval |
---|
237 | |
---|
238 | @staticmethod |
---|
239 | def _sanitycheck(values, period): |
---|
240 | if period == 'min': |
---|
241 | check = all(0 <= i <= 59 for i in values) |
---|
242 | elif period == 'hr': |
---|
243 | check = all(0 <= i <= 23 for i in values) |
---|
244 | elif period == 'dom': |
---|
245 | domrange = list(range(1, 32)) + ['l'] |
---|
246 | check = all(i in domrange for i in values) |
---|
247 | elif period == 'mon': |
---|
248 | check = all(1 <= i <= 12 for i in values) |
---|
249 | elif period == 'dow': |
---|
250 | check = all(0 <= i <= 7 for i in values) |
---|
251 | return check |
---|
252 | |
---|
253 | def _parse(self): |
---|
254 | line = self.cronline.lower() |
---|
255 | task = {} |
---|
256 | if line.startswith('@yearly'): |
---|
257 | line = line.replace('@yearly', '0 0 1 1 *') |
---|
258 | elif line.startswith('@annually'): |
---|
259 | line = line.replace('@annually', '0 0 1 1 *') |
---|
260 | elif line.startswith('@monthly'): |
---|
261 | line = line.replace('@monthly', '0 0 1 * *') |
---|
262 | elif line.startswith('@weekly'): |
---|
263 | line = line.replace('@weekly', '0 0 * * 0') |
---|
264 | elif line.startswith('@daily'): |
---|
265 | line = line.replace('@daily', '0 0 * * *') |
---|
266 | elif line.startswith('@midnight'): |
---|
267 | line = line.replace('@midnight', '0 0 * * *') |
---|
268 | elif line.startswith('@hourly'): |
---|
269 | line = line.replace('@hourly', '0 * * * *') |
---|
270 | params = line.strip().split() |
---|
271 | if len(params) < 5: |
---|
272 | raise ValueError('Invalid cron line (too short)') |
---|
273 | elif len(params) > 5: |
---|
274 | raise ValueError('Invalid cron line (too long)') |
---|
275 | daysofweek = {'sun': 0, 'mon': 1, 'tue': 2, 'wed': 3, 'thu': 4, |
---|
276 | 'fri': 5, 'sat': 6} |
---|
277 | monthsofyear = {'jan': 1, 'feb': 2, 'mar': 3, 'apr': 4, 'may': 5, |
---|
278 | 'jun': 6, 'jul': 7, 'aug': 8, 'sep': 9, 'oct': 10, |
---|
279 | 'nov': 11, 'dec': 12} |
---|
280 | for (s, i) in zip(params, ('min', 'hr', 'dom', 'mon', 'dow')): |
---|
281 | if s != '*': |
---|
282 | task[i] = [] |
---|
283 | vals = s.split(',') |
---|
284 | for val in vals: |
---|
285 | if i == 'dow': |
---|
286 | refdict = daysofweek |
---|
287 | elif i == 'mon': |
---|
288 | refdict = monthsofyear |
---|
289 | if i in ('dow', 'mon') and '-' in val and '/' not in val: |
---|
290 | isnum = val.split('-')[0].isdigit() |
---|
291 | if isnum: |
---|
292 | val = '%s/1' % val |
---|
293 | else: |
---|
294 | val = '-'.join([str(refdict.get(v, '')) |
---|
295 | for v in val.split('-')]) |
---|
296 | if '-' in val and '/' not in val: |
---|
297 | val = '%s/1' % val |
---|
298 | if '/' in val: |
---|
299 | task[i] += self._rangetolist(val, i) |
---|
300 | elif val.isdigit(): |
---|
301 | task[i].append(int(val)) |
---|
302 | elif i in ('dow', 'mon'): |
---|
303 | if val in refdict: |
---|
304 | task[i].append(refdict[val]) |
---|
305 | elif i == 'dom' and val == 'l': |
---|
306 | task[i].append(val) |
---|
307 | if not task[i]: |
---|
308 | raise ValueError('Invalid cron value (%s)' % s) |
---|
309 | if not self._sanitycheck(task[i], i): |
---|
310 | raise ValueError('Invalid cron value (%s)' % s) |
---|
311 | task[i] = sorted(task[i]) |
---|
312 | self.task = task |
---|
313 | |
---|
314 | @staticmethod |
---|
315 | def _get_next_dow(sched, task): |
---|
316 | task_dow = [a % 7 for a in task['dow']] |
---|
317 | while sched.isoweekday() % 7 not in task_dow: |
---|
318 | sched += datetime.timedelta(days=1) |
---|
319 | return sched |
---|
320 | |
---|
321 | @staticmethod |
---|
322 | def _get_next_dom(sched, task): |
---|
323 | if task['dom'] == ['l']: |
---|
324 | # instead of calendar.isleap |
---|
325 | try: |
---|
326 | last_feb = 29 |
---|
327 | datetime.date(sched.year, 2, last_feb) |
---|
328 | except ValueError: |
---|
329 | last_feb = 28 |
---|
330 | lastdayofmonth = [ |
---|
331 | 31, last_feb, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31 |
---|
332 | ] |
---|
333 | task_dom = [lastdayofmonth[sched.month - 1]] |
---|
334 | else: |
---|
335 | task_dom = task['dom'] |
---|
336 | while sched.day not in task_dom: |
---|
337 | sched += datetime.timedelta(days=1) |
---|
338 | return sched |
---|
339 | |
---|
340 | @staticmethod |
---|
341 | def _get_next_mon(sched, task): |
---|
342 | while sched.month not in task['mon']: |
---|
343 | if sched.month < 12: |
---|
344 | sched = sched.replace(month=sched.month + 1) |
---|
345 | else: |
---|
346 | sched = sched.replace(month=1, year=sched.year + 1) |
---|
347 | return sched |
---|
348 | |
---|
349 | @staticmethod |
---|
350 | def _getnext_hhmm(sched, task, add_to=True): |
---|
351 | if add_to: |
---|
352 | sched += datetime.timedelta(minutes=1) |
---|
353 | if 'min' in task: |
---|
354 | while sched.minute not in task['min']: |
---|
355 | sched += datetime.timedelta(minutes=1) |
---|
356 | if 'hr' in task and sched.hour not in task['hr']: |
---|
357 | while sched.hour not in task['hr']: |
---|
358 | sched += datetime.timedelta(hours=1) |
---|
359 | return sched |
---|
360 | |
---|
361 | def _getnext_date(self, sched, task): |
---|
362 | if 'dow' in task and 'dom' in task: |
---|
363 | dow = self._get_next_dow(sched, task) |
---|
364 | dom = self._get_next_dom(sched, task) |
---|
365 | sched = min(dow, dom) |
---|
366 | elif 'dow' in task: |
---|
367 | sched = self._get_next_dow(sched, task) |
---|
368 | elif 'dom' in task: |
---|
369 | sched = self._get_next_dom(sched, task) |
---|
370 | if 'mon' in task: |
---|
371 | sched = self._get_next_mon(sched, task) |
---|
372 | return sched.replace(hour=0, minute=0) |
---|
373 | |
---|
374 | def next(self): |
---|
375 | """Get next date according to specs.""" |
---|
376 | if not self.task: |
---|
377 | self._parse() |
---|
378 | task = self.task |
---|
379 | sched = self.sched |
---|
380 | x = 0 |
---|
381 | while x < 1000: # avoid potential max recursions |
---|
382 | x += 1 |
---|
383 | try: |
---|
384 | next_date = self._getnext_date(sched, task) |
---|
385 | except (ValueError, OverflowError) as e: |
---|
386 | raise ValueError('Invalid cron expression (%s)' % e) |
---|
387 | if next_date.date() > self.sched.date(): |
---|
388 | # we rolled date, check for valid hhmm |
---|
389 | sched = self._getnext_hhmm(next_date, task, False) |
---|
390 | break |
---|
391 | else: |
---|
392 | # same date, get next hhmm |
---|
393 | sched_time = self._getnext_hhmm(sched, task, True) |
---|
394 | if sched_time.date() > sched.date(): |
---|
395 | # we rolled date again :( |
---|
396 | sched = sched_time |
---|
397 | else: |
---|
398 | sched = sched_time |
---|
399 | break |
---|
400 | else: |
---|
401 | raise ValueError('Potential bug found, please submit your ' |
---|
402 | 'cron expression to the authors') |
---|
403 | self.sched = sched |
---|
404 | return sched |
---|
405 | |
---|
406 | def __iter__(self): |
---|
407 | """Support iteration.""" |
---|
408 | return self |
---|
409 | |
---|
410 | __next__ = next |
---|
411 | |
---|
412 | |
---|
413 | # the two functions below deal with simplejson decoding as unicode, |
---|
414 | # esp for the dict decode and subsequent usage as function Keyword arguments |
---|
415 | # unicode variable names won't work! |
---|
416 | # borrowed from http://stackoverflow.com/questions/956867/ |
---|
417 | |
---|
418 | def _decode_list(lst): |
---|
419 | if not PY2: |
---|
420 | return lst |
---|
421 | newlist = [] |
---|
422 | for i in lst: |
---|
423 | if isinstance(i, string_types): |
---|
424 | i = to_bytes(i) |
---|
425 | elif isinstance(i, list): |
---|
426 | i = _decode_list(i) |
---|
427 | newlist.append(i) |
---|
428 | return newlist |
---|
429 | |
---|
430 | def _decode_dict(dct): |
---|
431 | if not PY2: |
---|
432 | return dct |
---|
433 | newdict = {} |
---|
434 | for k, v in iteritems(dct): |
---|
435 | k = to_bytes(k) |
---|
436 | if isinstance(v, string_types): |
---|
437 | v = to_bytes(v) |
---|
438 | elif isinstance(v, list): |
---|
439 | v = _decode_list(v) |
---|
440 | newdict[k] = v |
---|
441 | return newdict |
---|
442 | |
---|
443 | |
---|
444 | def executor(retq, task, outq): |
---|
445 | """The function used to execute tasks in the background process.""" |
---|
446 | logger.debug(' task started') |
---|
447 | |
---|
448 | class LogOutput(object): |
---|
449 | """Facility to log output at intervals.""" |
---|
450 | |
---|
451 | def __init__(self, out_queue): |
---|
452 | self.out_queue = out_queue |
---|
453 | self.stdout = sys.stdout |
---|
454 | self.written = False |
---|
455 | sys.stdout = self |
---|
456 | |
---|
457 | def close(self): |
---|
458 | sys.stdout = self.stdout |
---|
459 | if self.written: |
---|
460 | # see "Joining processes that use queues" section in |
---|
461 | # https://docs.python.org/2/library/multiprocessing.html#programming-guidelines |
---|
462 | # https://docs.python.org/3/library/multiprocessing.html#programming-guidelines |
---|
463 | self.out_queue.cancel_join_thread() |
---|
464 | |
---|
465 | def flush(self): |
---|
466 | pass |
---|
467 | |
---|
468 | def write(self, data): |
---|
469 | self.out_queue.put(data) |
---|
470 | self.written = True |
---|
471 | |
---|
472 | W2P_TASK = Storage({ |
---|
473 | 'id': task.task_id, |
---|
474 | 'uuid': task.uuid, |
---|
475 | 'run_id': task.run_id |
---|
476 | }) |
---|
477 | stdout = LogOutput(outq) |
---|
478 | try: |
---|
479 | if task.app: |
---|
480 | from gluon.shell import env, parse_path_info |
---|
481 | from gluon import current |
---|
482 | ## FIXME: why temporarily change the log level of the root logger? |
---|
483 | #level = logging.getLogger().getEffectiveLevel() |
---|
484 | #logging.getLogger().setLevel(logging.WARN) |
---|
485 | # support for task.app like 'app/controller' |
---|
486 | (a, c, f) = parse_path_info(task.app) |
---|
487 | _env = env(a=a, c=c, import_models=True, |
---|
488 | extra_request={'is_scheduler': True}) |
---|
489 | #logging.getLogger().setLevel(level) |
---|
490 | f = task.function |
---|
491 | functions = current._scheduler.tasks |
---|
492 | if functions: |
---|
493 | _function = functions.get(f) |
---|
494 | else: |
---|
495 | # look into env |
---|
496 | _function = _env.get(f) |
---|
497 | if not isinstance(_function, CALLABLETYPES): |
---|
498 | raise NameError( |
---|
499 | "name '%s' not found in scheduler's environment" % f) |
---|
500 | # Inject W2P_TASK into environment |
---|
501 | _env.update({'W2P_TASK': W2P_TASK}) |
---|
502 | # Inject W2P_TASK into current |
---|
503 | current.W2P_TASK = W2P_TASK |
---|
504 | globals().update(_env) |
---|
505 | args = _decode_list(loads(task.args)) |
---|
506 | vars = loads(task.vars, object_hook=_decode_dict) |
---|
507 | result = dumps(_function(*args, **vars)) |
---|
508 | else: |
---|
509 | # for testing purpose only |
---|
510 | result = eval(task.function)( |
---|
511 | *loads(task.args, object_hook=_decode_dict), |
---|
512 | **loads(task.vars, object_hook=_decode_dict)) |
---|
513 | if len(result) >= 1024: |
---|
514 | fd, temp_path = tempfile.mkstemp(suffix='.w2p_sched') |
---|
515 | with os.fdopen(fd, 'w') as f: |
---|
516 | f.write(result) |
---|
517 | result = RESULTINFILE + temp_path |
---|
518 | retq.put(TaskReport('COMPLETED', result=result)) |
---|
519 | except: |
---|
520 | tb = traceback.format_exc() |
---|
521 | retq.put(TaskReport('FAILED', tb=tb)) |
---|
522 | finally: |
---|
523 | stdout.close() |
---|
524 | |
---|
525 | |
---|
526 | class IS_CRONLINE(object): |
---|
527 | """ |
---|
528 | Validates cronline |
---|
529 | """ |
---|
530 | def __init__(self, error_message=None): |
---|
531 | self.error_message = error_message |
---|
532 | |
---|
533 | def __call__(self, value, record_id=None): |
---|
534 | recur = CronParser(value, datetime.datetime.now()) |
---|
535 | try: |
---|
536 | recur.next() |
---|
537 | return (value, None) |
---|
538 | except ValueError as e: |
---|
539 | if not self.error_message: |
---|
540 | return (value, e) |
---|
541 | return (value, self.error_message) |
---|
542 | |
---|
543 | class TYPE(object): |
---|
544 | """ |
---|
545 | Validator that checks whether field is valid json and validates its type. |
---|
546 | Used for `args` and `vars` of the scheduler_task table |
---|
547 | """ |
---|
548 | |
---|
549 | def __init__(self, myclass=list, parse=False): |
---|
550 | self.myclass = myclass |
---|
551 | self.parse = parse |
---|
552 | |
---|
553 | def __call__(self, value, record_id=None): |
---|
554 | from gluon import current |
---|
555 | try: |
---|
556 | obj = loads(value) |
---|
557 | except: |
---|
558 | return (value, current.T('invalid json')) |
---|
559 | else: |
---|
560 | if isinstance(obj, self.myclass): |
---|
561 | if self.parse: |
---|
562 | return (obj, None) |
---|
563 | else: |
---|
564 | return (value, None) |
---|
565 | else: |
---|
566 | return (value, current.T('Not of type: %s') % self.myclass) |
---|
567 | |
---|
568 | |
---|
569 | TASK_STATUS = (QUEUED, RUNNING, COMPLETED, FAILED, TIMEOUT, STOPPED, EXPIRED) |
---|
570 | RUN_STATUS = (RUNNING, COMPLETED, FAILED, TIMEOUT, STOPPED) |
---|
571 | WORKER_STATUS = (ACTIVE, PICK, DISABLED, TERMINATE, KILL, STOP_TASK) |
---|
572 | |
---|
573 | |
---|
574 | class Scheduler(threading.Thread): |
---|
575 | """Scheduler object |
---|
576 | |
---|
577 | Args: |
---|
578 | db: DAL connection where Scheduler will create its tables |
---|
579 | tasks(dict): either a dict containing name-->func or None. |
---|
580 | If None, functions will be searched in the environment |
---|
581 | migrate(bool): turn migration on/off for the Scheduler's tables |
---|
582 | worker_name(str): force worker_name to identify each process. |
---|
583 | Leave it to None to autoassign a name (hostname#pid) |
---|
584 | group_names(list): process tasks belonging to this group |
---|
585 | defaults to ['main'] if nothing gets passed |
---|
586 | heartbeat(int): how many seconds the worker sleeps between one |
---|
587 | execution and the following one. Indirectly sets how many seconds |
---|
588 | will pass between checks for new tasks |
---|
589 | max_empty_runs(int): how many loops are allowed to pass without |
---|
590 | processing any tasks before exiting the process. 0 to keep always |
---|
591 | the process alive |
---|
592 | discard_results(bool): Scheduler stores executions's details into the |
---|
593 | scheduler_run table. By default, only if there is a result the |
---|
594 | details are kept. Turning this to True means discarding results |
---|
595 | even for tasks that return something |
---|
596 | utc_time(bool): do all datetime calculations assuming UTC as the |
---|
597 | timezone. Remember to pass `start_time` and `stop_time` to tasks |
---|
598 | accordingly |
---|
599 | use_spawn(bool): use spawn for subprocess (only useable with python3) |
---|
600 | """ |
---|
601 | |
---|
602 | def __init__(self, db, tasks=None, migrate=True, |
---|
603 | worker_name=None, group_names=None, heartbeat=HEARTBEAT, |
---|
604 | max_empty_runs=0, discard_results=False, utc_time=False, use_spawn=False): |
---|
605 | |
---|
606 | threading.Thread.__init__(self) |
---|
607 | self.setDaemon(True) |
---|
608 | self.process = None # the background process |
---|
609 | self.process_queues = (None, None) |
---|
610 | self.have_heartbeat = True # set to False to kill |
---|
611 | self.empty_runs = 0 |
---|
612 | |
---|
613 | self.db = db |
---|
614 | self.db_thread = None |
---|
615 | self.tasks = tasks |
---|
616 | self.group_names = group_names or ['main'] |
---|
617 | self.heartbeat = heartbeat |
---|
618 | self.worker_name = worker_name or IDENTIFIER |
---|
619 | self.max_empty_runs = max_empty_runs |
---|
620 | self.discard_results = discard_results |
---|
621 | self.is_a_ticker = False |
---|
622 | self.do_assign_tasks = False |
---|
623 | self.greedy = False |
---|
624 | self.utc_time = utc_time |
---|
625 | self.w_stats_lock = threading.RLock() |
---|
626 | self.w_stats = Storage( |
---|
627 | dict( |
---|
628 | status=RUNNING, |
---|
629 | sleep=heartbeat, |
---|
630 | total=0, |
---|
631 | errors=0, |
---|
632 | empty_runs=0, |
---|
633 | queue=0, |
---|
634 | distribution=None, |
---|
635 | workers=0) |
---|
636 | ) # dict holding statistics |
---|
637 | |
---|
638 | from gluon import current |
---|
639 | current._scheduler = self |
---|
640 | |
---|
641 | self.define_tables(db, migrate=migrate) |
---|
642 | self.use_spawn = use_spawn |
---|
643 | |
---|
644 | def execute(self, task): |
---|
645 | """Start the background process. |
---|
646 | |
---|
647 | Args: |
---|
648 | task : a `Task` object |
---|
649 | |
---|
650 | Returns: |
---|
651 | a `TaskReport` object |
---|
652 | """ |
---|
653 | outq = None |
---|
654 | retq = None |
---|
655 | if (self.use_spawn and not PY2): |
---|
656 | ctx = multiprocessing.get_context('spawn') |
---|
657 | outq = ctx.Queue() |
---|
658 | retq = ctx.Queue(maxsize=1) |
---|
659 | self.process = p = ctx.Process(target=executor, args=(retq, task, outq)) |
---|
660 | else: |
---|
661 | outq = multiprocessing.Queue() |
---|
662 | retq = multiprocessing.Queue(maxsize=1) |
---|
663 | self.process = p = \ |
---|
664 | multiprocessing.Process(target=executor, args=(retq, task, outq)) |
---|
665 | |
---|
666 | self.process_queues = (retq, outq) |
---|
667 | |
---|
668 | logger.debug(' task starting') |
---|
669 | p.start() |
---|
670 | start = time.time() |
---|
671 | |
---|
672 | if task.sync_output > 0: |
---|
673 | run_timeout = task.sync_output |
---|
674 | else: |
---|
675 | run_timeout = task.timeout |
---|
676 | task_output = tout = '' |
---|
677 | try: |
---|
678 | while p.is_alive() and (not task.timeout or |
---|
679 | time.time() - start < task.timeout): |
---|
680 | # NOTE: try always to empty the out queue before |
---|
681 | # the child process is joined, |
---|
682 | # see "Joining processes that use queues" section in |
---|
683 | # https://docs.python.org/2/library/multiprocessing.html#programming-guidelines |
---|
684 | # https://docs.python.org/3/library/multiprocessing.html#programming-guidelines |
---|
685 | while True: |
---|
686 | try: |
---|
687 | tout += outq.get(timeout=2) |
---|
688 | except Queue.Empty: |
---|
689 | break |
---|
690 | if tout: |
---|
691 | logger.debug(' partial output: "%s"', tout) |
---|
692 | if CLEAROUT in tout: |
---|
693 | task_output = tout[ |
---|
694 | tout.rfind(CLEAROUT) + len(CLEAROUT):] |
---|
695 | else: |
---|
696 | task_output += tout |
---|
697 | try: |
---|
698 | db = self.db |
---|
699 | db(db.scheduler_run.id == task.run_id).update(run_output=task_output) |
---|
700 | db.commit() |
---|
701 | tout = '' |
---|
702 | logger.debug(' partial output saved') |
---|
703 | except Exception: |
---|
704 | logger.exception(' error while saving partial output') |
---|
705 | task_output = task_output[:-len(tout)] |
---|
706 | p.join(timeout=run_timeout) |
---|
707 | except: |
---|
708 | logger.exception(' task stopped by general exception') |
---|
709 | self.terminate_process() |
---|
710 | tr = TaskReport(STOPPED) |
---|
711 | else: |
---|
712 | if p.is_alive(): |
---|
713 | logger.debug(' task timeout') |
---|
714 | self.terminate_process(flush_ret=False) |
---|
715 | try: |
---|
716 | # we try to get a traceback here |
---|
717 | tr = retq.get(timeout=2) # NOTE: risky after terminate |
---|
718 | tr.status = TIMEOUT |
---|
719 | tr.output = task_output |
---|
720 | except Queue.Empty: |
---|
721 | tr = TaskReport(TIMEOUT) |
---|
722 | else: |
---|
723 | try: |
---|
724 | tr = retq.get_nowait() |
---|
725 | except Queue.Empty: |
---|
726 | logger.debug(' task stopped') |
---|
727 | tr = TaskReport(STOPPED) |
---|
728 | else: |
---|
729 | logger.debug(' task completed or failed') |
---|
730 | result = tr.result |
---|
731 | if result and result.startswith(RESULTINFILE): |
---|
732 | temp_path = result.replace(RESULTINFILE, '', 1) |
---|
733 | with open(temp_path) as f: |
---|
734 | tr.result = f.read() |
---|
735 | os.unlink(temp_path) |
---|
736 | tr.output = task_output |
---|
737 | return tr |
---|
738 | |
---|
739 | _terminate_process_lock = threading.RLock() |
---|
740 | |
---|
741 | def terminate_process(self, flush_out=True, flush_ret=True): |
---|
742 | """Terminate any running tasks (internal use only)""" |
---|
743 | if self.process is not None: |
---|
744 | # must synchronize since we are called by main and heartbeat thread |
---|
745 | with self._terminate_process_lock: |
---|
746 | if flush_out: |
---|
747 | queue = self.process_queues[1] |
---|
748 | while not queue.empty(): # NOTE: empty() is not reliable |
---|
749 | try: |
---|
750 | queue.get_nowait() |
---|
751 | except Queue.Empty: |
---|
752 | pass |
---|
753 | if flush_ret: |
---|
754 | queue = self.process_queues[0] |
---|
755 | while not queue.empty(): |
---|
756 | try: |
---|
757 | queue.get_nowait() |
---|
758 | except Queue.Empty: |
---|
759 | pass |
---|
760 | logger.debug('terminating process') |
---|
761 | try: |
---|
762 | # NOTE: terminate should not be called when using shared |
---|
763 | # resources, see "Avoid terminating processes" |
---|
764 | # section in |
---|
765 | # https://docs.python.org/2/library/multiprocessing.html#programming-guidelines |
---|
766 | # https://docs.python.org/3/library/multiprocessing.html#programming-guidelines |
---|
767 | self.process.terminate() |
---|
768 | # NOTE: calling join after a terminate is risky, |
---|
769 | # as explained in "Avoid terminating processes" |
---|
770 | # section this can lead to a deadlock |
---|
771 | self.process.join() |
---|
772 | finally: |
---|
773 | self.process = None |
---|
774 | |
---|
775 | def die(self): |
---|
776 | """Forces termination of the worker process along with any running |
---|
777 | task""" |
---|
778 | logger.info('die!') |
---|
779 | self.have_heartbeat = False |
---|
780 | self.terminate_process() |
---|
781 | |
---|
782 | def give_up(self): |
---|
783 | """Waits for any running task to be executed, then exits the worker |
---|
784 | process""" |
---|
785 | logger.info('Giving up as soon as possible!') |
---|
786 | self.have_heartbeat = False |
---|
787 | |
---|
788 | def run(self): |
---|
789 | """This is executed by the heartbeat thread""" |
---|
790 | counter = 0 |
---|
791 | while self.have_heartbeat: |
---|
792 | self.send_heartbeat(counter) |
---|
793 | counter += 1 |
---|
794 | |
---|
795 | def start_heartbeats(self): |
---|
796 | self.start() |
---|
797 | |
---|
798 | def __get_migrate(self, tablename, migrate=True): |
---|
799 | if migrate is False: |
---|
800 | return False |
---|
801 | elif migrate is True: |
---|
802 | return True |
---|
803 | elif isinstance(migrate, str): |
---|
804 | return "%s%s.table" % (migrate, tablename) |
---|
805 | return True |
---|
806 | |
---|
807 | def now(self): |
---|
808 | """Shortcut that fetches current time based on UTC preferences.""" |
---|
809 | return self.utc_time and datetime.datetime.utcnow() or datetime.datetime.now() |
---|
810 | |
---|
811 | def set_requirements(self, scheduler_task): |
---|
812 | """Called to set defaults for lazy_tables connections.""" |
---|
813 | from gluon import current |
---|
814 | if hasattr(current, 'request'): |
---|
815 | scheduler_task.application_name.default = '%s/%s' % ( |
---|
816 | current.request.application, current.request.controller |
---|
817 | ) |
---|
818 | |
---|
819 | def define_tables(self, db, migrate): |
---|
820 | """Define Scheduler tables structure.""" |
---|
821 | from pydal.base import DEFAULT |
---|
822 | logger.debug('defining tables (migrate=%s)', migrate) |
---|
823 | now = self.now |
---|
824 | db.define_table( |
---|
825 | 'scheduler_task', |
---|
826 | Field('application_name', requires=IS_NOT_EMPTY(), |
---|
827 | default=None, writable=False), |
---|
828 | Field('task_name', default=None), |
---|
829 | Field('group_name', default='main'), |
---|
830 | Field('status', requires=IS_IN_SET(TASK_STATUS), |
---|
831 | default=QUEUED, writable=False), |
---|
832 | Field('broadcast', 'boolean', default=False), |
---|
833 | Field('function_name', |
---|
834 | requires=IS_IN_SET(sorted(self.tasks.keys())) |
---|
835 | if self.tasks else DEFAULT), |
---|
836 | Field('uuid', length=255, |
---|
837 | requires=IS_NOT_IN_DB(db, 'scheduler_task.uuid'), |
---|
838 | unique=True, default=web2py_uuid), |
---|
839 | Field('args', 'text', default='[]', requires=TYPE(list)), |
---|
840 | Field('vars', 'text', default='{}', requires=TYPE(dict)), |
---|
841 | Field('enabled', 'boolean', default=True), |
---|
842 | Field('start_time', 'datetime', default=now, |
---|
843 | requires=IS_DATETIME()), |
---|
844 | Field('next_run_time', 'datetime', default=now), |
---|
845 | Field('stop_time', 'datetime'), |
---|
846 | Field('repeats', 'integer', default=1, comment="0=unlimited", |
---|
847 | requires=IS_INT_IN_RANGE(0, None)), |
---|
848 | Field('retry_failed', 'integer', default=0, comment="-1=unlimited", |
---|
849 | requires=IS_INT_IN_RANGE(-1, None)), |
---|
850 | Field('period', 'integer', default=60, comment='seconds', |
---|
851 | requires=IS_INT_IN_RANGE(0, None)), |
---|
852 | Field('prevent_drift', 'boolean', default=False, |
---|
853 | comment='Exact start_times between runs'), |
---|
854 | Field('cronline', default=None, |
---|
855 | comment='Discard "period", use this cron expr instead', |
---|
856 | requires=IS_EMPTY_OR(IS_CRONLINE())), |
---|
857 | Field('timeout', 'integer', default=60, comment='seconds', |
---|
858 | requires=IS_INT_IN_RANGE(1, None)), |
---|
859 | Field('sync_output', 'integer', default=0, |
---|
860 | comment="update output every n sec: 0=never", |
---|
861 | requires=IS_INT_IN_RANGE(0, None)), |
---|
862 | Field('times_run', 'integer', default=0, writable=False), |
---|
863 | Field('times_failed', 'integer', default=0, writable=False), |
---|
864 | Field('last_run_time', 'datetime', writable=False, readable=False), |
---|
865 | Field('assigned_worker_name', default='', writable=False), |
---|
866 | on_define=self.set_requirements, |
---|
867 | migrate=self.__get_migrate('scheduler_task', migrate), |
---|
868 | format='(%(id)s) %(task_name)s') |
---|
869 | |
---|
870 | db.define_table( |
---|
871 | 'scheduler_run', |
---|
872 | Field('task_id', 'reference scheduler_task'), |
---|
873 | Field('status', requires=IS_IN_SET(RUN_STATUS)), |
---|
874 | Field('start_time', 'datetime'), |
---|
875 | Field('stop_time', 'datetime'), |
---|
876 | Field('run_output', 'text'), |
---|
877 | Field('run_result', 'text'), |
---|
878 | Field('traceback', 'text'), |
---|
879 | Field('worker_name', default=self.worker_name), |
---|
880 | migrate=self.__get_migrate('scheduler_run', migrate) |
---|
881 | ) |
---|
882 | |
---|
883 | db.define_table( |
---|
884 | 'scheduler_worker', |
---|
885 | Field('worker_name', length=255, unique=True), |
---|
886 | Field('first_heartbeat', 'datetime'), |
---|
887 | Field('last_heartbeat', 'datetime'), |
---|
888 | Field('status', requires=IS_IN_SET(WORKER_STATUS)), |
---|
889 | Field('is_ticker', 'boolean', default=False, writable=False), |
---|
890 | Field('group_names', 'list:string', default=self.group_names), |
---|
891 | Field('worker_stats', 'json'), |
---|
892 | migrate=self.__get_migrate('scheduler_worker', migrate) |
---|
893 | ) |
---|
894 | |
---|
895 | db.define_table( |
---|
896 | 'scheduler_task_deps', |
---|
897 | Field('job_name', default='job_0'), |
---|
898 | Field('task_parent', 'integer', |
---|
899 | requires=IS_IN_DB(db, 'scheduler_task.id', '%(task_name)s') |
---|
900 | ), |
---|
901 | Field('task_child', 'reference scheduler_task'), |
---|
902 | Field('can_visit', 'boolean', default=False), |
---|
903 | migrate=self.__get_migrate('scheduler_task_deps', migrate) |
---|
904 | ) |
---|
905 | |
---|
906 | if migrate is not False: |
---|
907 | db.commit() |
---|
908 | |
---|
909 | def loop(self, worker_name=None): |
---|
910 | """Main loop. |
---|
911 | |
---|
912 | This works basically as a neverending loop that: |
---|
913 | |
---|
914 | - checks if the worker is ready to process tasks (is not DISABLED) |
---|
915 | - pops a task from the queue |
---|
916 | - if there is a task: |
---|
917 | |
---|
918 | - spawns the executor background process |
---|
919 | - waits for the process to be finished |
---|
920 | - sleeps `heartbeat` seconds |
---|
921 | - if there is not a task: |
---|
922 | |
---|
923 | - checks for max_empty_runs |
---|
924 | - sleeps `heartbeat` seconds |
---|
925 | |
---|
926 | """ |
---|
927 | signal.signal(signal.SIGTERM, lambda signum, stack_frame: sys.exit(1)) |
---|
928 | try: |
---|
929 | self.start_heartbeats() |
---|
930 | while self.have_heartbeat: |
---|
931 | with self.w_stats_lock: |
---|
932 | is_disabled = self.w_stats.status == DISABLED |
---|
933 | if is_disabled: |
---|
934 | logger.debug('Someone stopped me, sleeping until better' |
---|
935 | ' times come (%s)', self.w_stats.sleep) |
---|
936 | self.sleep() |
---|
937 | continue |
---|
938 | logger.debug('looping...') |
---|
939 | if self.is_a_ticker and self.do_assign_tasks: |
---|
940 | # I'm a ticker, and 5 loops passed without |
---|
941 | # reassigning tasks, let's do that |
---|
942 | self.wrapped_assign_tasks() |
---|
943 | task = self.wrapped_pop_task() |
---|
944 | if task: |
---|
945 | with self.w_stats_lock: |
---|
946 | self.w_stats.empty_runs = 0 |
---|
947 | self.w_stats.status = RUNNING |
---|
948 | self.w_stats.total += 1 |
---|
949 | self.wrapped_report_task(task, self.execute(task)) |
---|
950 | with self.w_stats_lock: |
---|
951 | if not self.w_stats.status == DISABLED: |
---|
952 | self.w_stats.status = ACTIVE |
---|
953 | else: |
---|
954 | with self.w_stats_lock: |
---|
955 | self.w_stats.empty_runs += 1 |
---|
956 | if self.max_empty_runs != 0: |
---|
957 | logger.debug('empty runs %s/%s', |
---|
958 | self.w_stats.empty_runs, |
---|
959 | self.max_empty_runs) |
---|
960 | if self.w_stats.empty_runs >= self.max_empty_runs: |
---|
961 | logger.info( |
---|
962 | 'empty runs limit reached, killing myself') |
---|
963 | self.die() |
---|
964 | if self.is_a_ticker and self.greedy: |
---|
965 | # there could be other tasks ready to be assigned |
---|
966 | logger.info('TICKER: greedy loop') |
---|
967 | self.wrapped_assign_tasks() |
---|
968 | logger.debug('sleeping...') |
---|
969 | self.sleep() |
---|
970 | except (KeyboardInterrupt, SystemExit): |
---|
971 | logger.info('catched') |
---|
972 | self.die() |
---|
973 | |
---|
974 | def wrapped_pop_task(self): |
---|
975 | """Commodity function to call `pop_task` and trap exceptions. |
---|
976 | |
---|
977 | If an exception is raised, assume it happened because of database |
---|
978 | contention and retries `pop_task` after 0.5 seconds |
---|
979 | """ |
---|
980 | db = self.db |
---|
981 | db.commit() # for MySQL only; FIXME: Niphlod, still needed? could avoid when not MySQL? |
---|
982 | for x in range(10): |
---|
983 | try: |
---|
984 | return self.pop_task() |
---|
985 | except Exception: |
---|
986 | logger.exception(' error popping tasks') |
---|
987 | self.w_stats.errors += 1 |
---|
988 | db.rollback() |
---|
989 | time.sleep(0.5) |
---|
990 | |
---|
991 | def pop_task(self): |
---|
992 | """Grab a task ready to be executed from the queue.""" |
---|
993 | now = self.now() |
---|
994 | db = self.db |
---|
995 | st = db.scheduler_task |
---|
996 | grabbed = db( |
---|
997 | (st.assigned_worker_name == self.worker_name) & |
---|
998 | (st.status == ASSIGNED) |
---|
999 | ) |
---|
1000 | |
---|
1001 | task = grabbed.select(limitby=(0, 1), orderby=st.next_run_time).first() |
---|
1002 | if task: |
---|
1003 | # none will touch my task! |
---|
1004 | task.update_record(status=RUNNING, last_run_time=now) |
---|
1005 | db.commit() |
---|
1006 | logger.debug(' work to do %s', task.id) |
---|
1007 | else: |
---|
1008 | logger.info('nothing to do') |
---|
1009 | return None |
---|
1010 | |
---|
1011 | if task.cronline: |
---|
1012 | cron_recur = CronParser(task.cronline, |
---|
1013 | now.replace(second=0, microsecond=0)) |
---|
1014 | next_run_time = cron_recur.next() |
---|
1015 | elif not task.prevent_drift: |
---|
1016 | next_run_time = task.last_run_time + datetime.timedelta( |
---|
1017 | seconds=task.period |
---|
1018 | ) |
---|
1019 | else: |
---|
1020 | # calc next_run_time based on available slots |
---|
1021 | # see #1191 |
---|
1022 | next_run_time = task.start_time |
---|
1023 | secondspassed = (now - next_run_time).total_seconds() |
---|
1024 | times = secondspassed // task.period + 1 |
---|
1025 | next_run_time += datetime.timedelta(seconds=task.period * times) |
---|
1026 | |
---|
1027 | times_run = task.times_run + 1 |
---|
1028 | if times_run < task.repeats or task.repeats == 0: |
---|
1029 | # need to run (repeating task) |
---|
1030 | run_again = True |
---|
1031 | else: |
---|
1032 | # no need to run again |
---|
1033 | run_again = False |
---|
1034 | run_id = 0 |
---|
1035 | while not self.discard_results: # FIXME: forever? |
---|
1036 | logger.debug(' new scheduler_run record') |
---|
1037 | try: |
---|
1038 | run_id = db.scheduler_run.insert( |
---|
1039 | task_id=task.id, |
---|
1040 | status=RUNNING, |
---|
1041 | start_time=now, |
---|
1042 | worker_name=self.worker_name) |
---|
1043 | db.commit() |
---|
1044 | break |
---|
1045 | except Exception: |
---|
1046 | logger.exception(' error inserting scheduler_run') |
---|
1047 | db.rollback() |
---|
1048 | time.sleep(0.5) |
---|
1049 | logger.info('new task %(id)s "%(task_name)s"' |
---|
1050 | ' %(application_name)s.%(function_name)s' % task) |
---|
1051 | return Task( |
---|
1052 | app=task.application_name, |
---|
1053 | function=task.function_name, |
---|
1054 | timeout=task.timeout, |
---|
1055 | args=task.args, # in json |
---|
1056 | vars=task.vars, # in json |
---|
1057 | task_id=task.id, |
---|
1058 | run_id=run_id, |
---|
1059 | run_again=run_again, |
---|
1060 | next_run_time=next_run_time, |
---|
1061 | times_run=times_run, |
---|
1062 | stop_time=task.stop_time, |
---|
1063 | retry_failed=task.retry_failed, |
---|
1064 | times_failed=task.times_failed, |
---|
1065 | sync_output=task.sync_output, |
---|
1066 | uuid=task.uuid) |
---|
1067 | |
---|
1068 | def wrapped_report_task(self, task, task_report): |
---|
1069 | """Commodity function to call `report_task` and trap exceptions. |
---|
1070 | |
---|
1071 | If an exception is raised, assume it happened because of database |
---|
1072 | contention and retries `pop_task` after 0.5 seconds |
---|
1073 | """ |
---|
1074 | db = self.db |
---|
1075 | while True: # FIXME: forever? |
---|
1076 | try: |
---|
1077 | self.report_task(task, task_report) |
---|
1078 | db.commit() |
---|
1079 | break |
---|
1080 | except Exception: |
---|
1081 | logger.exception(' error storing result') |
---|
1082 | db.rollback() |
---|
1083 | time.sleep(0.5) |
---|
1084 | |
---|
1085 | def report_task(self, task, task_report): |
---|
1086 | """Take care of storing the result according to preferences. |
---|
1087 | |
---|
1088 | Deals with logic for repeating tasks. |
---|
1089 | """ |
---|
1090 | now = self.now() |
---|
1091 | db = self.db |
---|
1092 | st = db.scheduler_task |
---|
1093 | sr = db.scheduler_run |
---|
1094 | if not self.discard_results: |
---|
1095 | if task_report.result != 'null' or task_report.tb: |
---|
1096 | # result is 'null' as a string if task completed |
---|
1097 | # if it's stopped it's None as NoneType, so we record |
---|
1098 | # the STOPPED "run" anyway |
---|
1099 | logger.debug(' recording task report in db (%s)', |
---|
1100 | task_report.status) |
---|
1101 | db(sr.id == task.run_id).update( |
---|
1102 | status=task_report.status, |
---|
1103 | stop_time=now, |
---|
1104 | run_result=task_report.result, |
---|
1105 | run_output=task_report.output, |
---|
1106 | traceback=task_report.tb) |
---|
1107 | else: |
---|
1108 | logger.debug(' deleting task report in db because of no result') |
---|
1109 | db(sr.id == task.run_id).delete() |
---|
1110 | # if there is a stop_time and the following run would exceed it |
---|
1111 | is_expired = (task.stop_time and |
---|
1112 | task.next_run_time > task.stop_time or |
---|
1113 | False) |
---|
1114 | status = (task.run_again and is_expired and EXPIRED or |
---|
1115 | task.run_again and not is_expired and QUEUED or |
---|
1116 | COMPLETED) |
---|
1117 | if task_report.status == COMPLETED: |
---|
1118 | d = dict(status=status, |
---|
1119 | next_run_time=task.next_run_time, |
---|
1120 | times_run=task.times_run, |
---|
1121 | times_failed=0 |
---|
1122 | ) |
---|
1123 | db(st.id == task.task_id).update(**d) |
---|
1124 | if status == COMPLETED: |
---|
1125 | self.update_dependencies(task.task_id) |
---|
1126 | else: |
---|
1127 | st_mapping = {'FAILED': 'FAILED', |
---|
1128 | 'TIMEOUT': 'TIMEOUT', |
---|
1129 | 'STOPPED': 'FAILED'}[task_report.status] |
---|
1130 | status = (task.retry_failed |
---|
1131 | and task.times_failed < task.retry_failed |
---|
1132 | and QUEUED or task.retry_failed == -1 |
---|
1133 | and QUEUED or st_mapping) |
---|
1134 | db(st.id == task.task_id).update( |
---|
1135 | times_failed=st.times_failed + 1, |
---|
1136 | next_run_time=task.next_run_time, |
---|
1137 | status=status |
---|
1138 | ) |
---|
1139 | logger.info('task completed (%s)', task_report.status) |
---|
1140 | |
---|
1141 | def update_dependencies(self, task_id): |
---|
1142 | """Unblock execution paths for Jobs.""" |
---|
1143 | db = self.db |
---|
1144 | db(db.scheduler_task_deps.task_child == task_id).update(can_visit=True) |
---|
1145 | |
---|
1146 | def adj_hibernation(self): |
---|
1147 | """Used to increase the "sleep" interval for DISABLED workers.""" |
---|
1148 | with self.w_stats_lock: |
---|
1149 | if self.w_stats.status == DISABLED: |
---|
1150 | wk_st = self.w_stats.sleep |
---|
1151 | hibernation = wk_st + HEARTBEAT if wk_st < MAXHIBERNATION else MAXHIBERNATION |
---|
1152 | self.w_stats.sleep = hibernation |
---|
1153 | |
---|
1154 | def send_heartbeat(self, counter): |
---|
1155 | """Coordination among available workers. |
---|
1156 | |
---|
1157 | It: |
---|
1158 | - sends the heartbeat |
---|
1159 | - elects a ticker among available workers (the only process that |
---|
1160 | effectively dispatch tasks to workers) |
---|
1161 | - deals with worker's statuses |
---|
1162 | - does "housecleaning" for dead workers |
---|
1163 | - triggers tasks assignment to workers |
---|
1164 | """ |
---|
1165 | if self.db_thread: |
---|
1166 | # BKR 20180612 check if connection still works |
---|
1167 | try: |
---|
1168 | self.db_thread(self.db_thread.scheduler_worker).count() |
---|
1169 | except self.db_thread._adapter.connection.OperationalError: |
---|
1170 | # if not -> throw away self.db_thread and force reconnect |
---|
1171 | self.db_thread = None |
---|
1172 | |
---|
1173 | if not self.db_thread: |
---|
1174 | logger.debug('thread building own DAL object') |
---|
1175 | self.db_thread = DAL( |
---|
1176 | self.db._uri, folder=self.db._adapter.folder, decode_credentials=True) |
---|
1177 | self.define_tables(self.db_thread, migrate=False) |
---|
1178 | |
---|
1179 | try: |
---|
1180 | now = self.now() |
---|
1181 | db = self.db_thread |
---|
1182 | sw = db.scheduler_worker |
---|
1183 | st = db.scheduler_task |
---|
1184 | # record heartbeat |
---|
1185 | row = db(sw.worker_name == self.worker_name).select().first() |
---|
1186 | with self.w_stats_lock: |
---|
1187 | if not row: |
---|
1188 | sw.insert(status=ACTIVE, worker_name=self.worker_name, |
---|
1189 | first_heartbeat=now, last_heartbeat=now, |
---|
1190 | group_names=self.group_names, |
---|
1191 | worker_stats=self.w_stats) |
---|
1192 | self.w_stats.status = ACTIVE |
---|
1193 | self.w_stats.sleep = self.heartbeat |
---|
1194 | backed_status = ACTIVE |
---|
1195 | else: |
---|
1196 | backed_status = row.status |
---|
1197 | if backed_status == DISABLED: |
---|
1198 | # keep sleeping |
---|
1199 | self.w_stats.status = DISABLED |
---|
1200 | logger.debug('........recording heartbeat (DISABLED)') |
---|
1201 | db(sw.worker_name == self.worker_name).update( |
---|
1202 | last_heartbeat=now, |
---|
1203 | worker_stats=self.w_stats) |
---|
1204 | elif backed_status == TERMINATE: |
---|
1205 | self.w_stats.status = TERMINATE |
---|
1206 | logger.debug("Waiting to terminate the current task") |
---|
1207 | self.give_up() |
---|
1208 | elif backed_status == KILL: |
---|
1209 | self.w_stats.status = KILL |
---|
1210 | self.die() |
---|
1211 | return |
---|
1212 | else: |
---|
1213 | if backed_status == STOP_TASK: |
---|
1214 | logger.info('Asked to kill the current task') |
---|
1215 | self.terminate_process() |
---|
1216 | logger.debug('........recording heartbeat (%s)', |
---|
1217 | self.w_stats.status) |
---|
1218 | db(sw.worker_name == self.worker_name).update( |
---|
1219 | last_heartbeat=now, status=ACTIVE, |
---|
1220 | worker_stats=self.w_stats) |
---|
1221 | self.w_stats.sleep = self.heartbeat # re-activating the process |
---|
1222 | if self.w_stats.status != RUNNING: |
---|
1223 | self.w_stats.status = ACTIVE |
---|
1224 | |
---|
1225 | self.do_assign_tasks = False |
---|
1226 | if counter % 5 == 0 or backed_status == PICK: |
---|
1227 | try: |
---|
1228 | # delete dead workers |
---|
1229 | expiration = now - datetime.timedelta( |
---|
1230 | seconds=self.heartbeat * 3) |
---|
1231 | departure = now - datetime.timedelta( |
---|
1232 | seconds=self.heartbeat * 3 * 15) |
---|
1233 | logger.debug( |
---|
1234 | ' freeing workers that have not sent heartbeat') |
---|
1235 | dead_workers = db( |
---|
1236 | ((sw.last_heartbeat < expiration) & (sw.status == ACTIVE)) | |
---|
1237 | ((sw.last_heartbeat < departure) & (sw.status != ACTIVE)) |
---|
1238 | ) |
---|
1239 | dead_workers_name = dead_workers._select(sw.worker_name) |
---|
1240 | db( |
---|
1241 | (st.assigned_worker_name.belongs(dead_workers_name)) & |
---|
1242 | (st.status == RUNNING) |
---|
1243 | ).update(assigned_worker_name='', status=QUEUED) |
---|
1244 | dead_workers.delete() |
---|
1245 | try: |
---|
1246 | self.is_a_ticker = self.being_a_ticker() |
---|
1247 | except: |
---|
1248 | logger.exception('Error coordinating TICKER') |
---|
1249 | with self.w_stats_lock: |
---|
1250 | if self.w_stats.status == ACTIVE: |
---|
1251 | self.do_assign_tasks = True |
---|
1252 | except: |
---|
1253 | logger.exception('Error cleaning up') |
---|
1254 | |
---|
1255 | db.commit() |
---|
1256 | except: |
---|
1257 | logger.exception('Error retrieving status') |
---|
1258 | db.rollback() |
---|
1259 | self.adj_hibernation() |
---|
1260 | self.sleep() |
---|
1261 | |
---|
1262 | def being_a_ticker(self): |
---|
1263 | """Elect a TICKER process that assigns tasks to available workers. |
---|
1264 | |
---|
1265 | Does its best to elect a worker that is not busy processing other tasks |
---|
1266 | to allow a proper distribution of tasks among all active workers ASAP |
---|
1267 | """ |
---|
1268 | db = self.db_thread |
---|
1269 | sw = db.scheduler_worker |
---|
1270 | my_name = self.worker_name |
---|
1271 | all_active = db( |
---|
1272 | (sw.worker_name != my_name) & (sw.status == ACTIVE) |
---|
1273 | ).select(sw.is_ticker, sw.worker_name) |
---|
1274 | ticker = all_active.find(lambda row: row.is_ticker is True).first() |
---|
1275 | with self.w_stats_lock: |
---|
1276 | not_busy = self.w_stats.status == ACTIVE |
---|
1277 | if not ticker: |
---|
1278 | # if no other tickers are around |
---|
1279 | if not_busy: |
---|
1280 | # only if I'm not busy |
---|
1281 | db(sw.worker_name == my_name).update(is_ticker=True) |
---|
1282 | db(sw.worker_name != my_name).update(is_ticker=False) |
---|
1283 | logger.info("TICKER: I'm a ticker") |
---|
1284 | else: |
---|
1285 | # I'm busy |
---|
1286 | if len(all_active) >= 1: |
---|
1287 | # so I'll "downgrade" myself to a "poor worker" |
---|
1288 | db(sw.worker_name == my_name).update(is_ticker=False) |
---|
1289 | else: |
---|
1290 | not_busy = True |
---|
1291 | db.commit() |
---|
1292 | return not_busy |
---|
1293 | else: |
---|
1294 | logger.info( |
---|
1295 | "%s is a ticker, I'm a poor worker" % ticker.worker_name) |
---|
1296 | return False |
---|
1297 | |
---|
1298 | def wrapped_assign_tasks(self): |
---|
1299 | """Commodity function to call `assign_tasks` and trap exceptions. |
---|
1300 | |
---|
1301 | If an exception is raised, assume it happened because of database |
---|
1302 | contention and retries `assign_task` after 0.5 seconds |
---|
1303 | """ |
---|
1304 | logger.debug('Assigning tasks...') |
---|
1305 | db = self.db |
---|
1306 | db.commit() # for MySQL only; FIXME: Niphlod, still needed? could avoid when not MySQL? |
---|
1307 | for x in range(10): |
---|
1308 | try: |
---|
1309 | self.assign_tasks() |
---|
1310 | db.commit() |
---|
1311 | logger.debug('Tasks assigned...') |
---|
1312 | break |
---|
1313 | except Exception: |
---|
1314 | logger.exception('TICKER: error assigning tasks') |
---|
1315 | self.w_stats.errors += 1 |
---|
1316 | db.rollback() |
---|
1317 | time.sleep(0.5) |
---|
1318 | |
---|
1319 | def assign_tasks(self): |
---|
1320 | """Assign task to workers, that can then pop them from the queue. |
---|
1321 | |
---|
1322 | Deals with group_name(s) logic, in order to assign linearly tasks |
---|
1323 | to available workers for those groups |
---|
1324 | """ |
---|
1325 | now = self.now() |
---|
1326 | db = self.db |
---|
1327 | sw = db.scheduler_worker |
---|
1328 | st = db.scheduler_task |
---|
1329 | sd = db.scheduler_task_deps |
---|
1330 | all_workers = db(sw.status == ACTIVE).select() |
---|
1331 | # build workers as dict of groups |
---|
1332 | wkgroups = {} |
---|
1333 | for w in all_workers: |
---|
1334 | if w.worker_stats['status'] == 'RUNNING': |
---|
1335 | continue |
---|
1336 | group_names = w.group_names |
---|
1337 | for gname in group_names: |
---|
1338 | if gname not in wkgroups: |
---|
1339 | wkgroups[gname] = dict( |
---|
1340 | workers=[{'name': w.worker_name, 'c': 0}]) |
---|
1341 | else: |
---|
1342 | wkgroups[gname]['workers'].append( |
---|
1343 | {'name': w.worker_name, 'c': 0}) |
---|
1344 | # set queued tasks that expired between "runs" (i.e., you turned off |
---|
1345 | # the scheduler): then it wasn't expired, but now it is |
---|
1346 | db( |
---|
1347 | (st.status.belongs((QUEUED, ASSIGNED))) & |
---|
1348 | (st.stop_time < now) |
---|
1349 | ).update(status=EXPIRED) |
---|
1350 | |
---|
1351 | # calculate dependencies |
---|
1352 | deps_with_no_deps = db( |
---|
1353 | (sd.can_visit == False) & |
---|
1354 | (~sd.task_child.belongs( |
---|
1355 | db(sd.can_visit == False)._select(sd.task_parent) |
---|
1356 | ) |
---|
1357 | ) |
---|
1358 | )._select(sd.task_child) |
---|
1359 | no_deps = db( |
---|
1360 | (st.status.belongs((QUEUED, ASSIGNED))) & |
---|
1361 | ( |
---|
1362 | (sd.id == None) | (st.id.belongs(deps_with_no_deps)) |
---|
1363 | ) |
---|
1364 | )._select(st.id, distinct=True, left=sd.on( |
---|
1365 | (st.id == sd.task_parent) & |
---|
1366 | (sd.can_visit == False) |
---|
1367 | ) |
---|
1368 | ) |
---|
1369 | |
---|
1370 | all_available = db( |
---|
1371 | (st.status.belongs((QUEUED, ASSIGNED))) & |
---|
1372 | (st.next_run_time <= now) & |
---|
1373 | (st.enabled == True) & |
---|
1374 | (st.id.belongs(no_deps)) |
---|
1375 | ) |
---|
1376 | |
---|
1377 | limit = len(all_workers) * (50 / (len(wkgroups) or 1)) |
---|
1378 | # if there are a moltitude of tasks, let's figure out a maximum of |
---|
1379 | # tasks per worker. This can be further tuned with some added |
---|
1380 | # intelligence (like esteeming how many tasks will a worker complete |
---|
1381 | # before the ticker reassign them around, but the gain is quite small |
---|
1382 | # 50 is a sweet spot also for fast tasks, with sane heartbeat values |
---|
1383 | # NB: ticker reassign tasks every 5 cycles, so if a worker completes |
---|
1384 | # its 50 tasks in less than heartbeat*5 seconds, |
---|
1385 | # it won't pick new tasks until heartbeat*5 seconds pass. |
---|
1386 | |
---|
1387 | # If a worker is currently elaborating a long task, its tasks needs to |
---|
1388 | # be reassigned to other workers |
---|
1389 | # this shuffles up things a bit, in order to give a task equal chances |
---|
1390 | # to be executed |
---|
1391 | |
---|
1392 | # let's freeze it up |
---|
1393 | db.commit() |
---|
1394 | tnum = 0 |
---|
1395 | for group in wkgroups.keys(): |
---|
1396 | tasks = all_available(st.group_name == group).select( |
---|
1397 | limitby=(0, limit), orderby=st.next_run_time) |
---|
1398 | # let's break up the queue evenly among workers |
---|
1399 | for task in tasks: |
---|
1400 | tnum += 1 |
---|
1401 | gname = task.group_name |
---|
1402 | ws = wkgroups.get(gname) |
---|
1403 | if ws: |
---|
1404 | if task.broadcast: |
---|
1405 | for worker in ws['workers']: |
---|
1406 | new_task = db.scheduler_task.insert( |
---|
1407 | application_name = task.application_name, |
---|
1408 | task_name = task.task_name, |
---|
1409 | group_name = task.group_name, |
---|
1410 | status = ASSIGNED, |
---|
1411 | broadcast = False, |
---|
1412 | function_name = task.function_name, |
---|
1413 | args = task.args, |
---|
1414 | start_time = now, |
---|
1415 | repeats = 1, |
---|
1416 | retry_failed = task.retry_failed, |
---|
1417 | sync_output = task.sync_output, |
---|
1418 | assigned_worker_name = worker['name']) |
---|
1419 | if task.period: |
---|
1420 | next_run_time = now+datetime.timedelta(seconds=task.period) |
---|
1421 | else: |
---|
1422 | # must be cronline |
---|
1423 | cron_recur = CronParser(task.cronline, |
---|
1424 | now.replace(second=0, microsecond=0)) |
---|
1425 | next_run_time = cron_recur.next() |
---|
1426 | db(st.id == task.id).update(times_run=task.times_run+1, |
---|
1427 | next_run_time=next_run_time, |
---|
1428 | last_run_time=now) |
---|
1429 | db.commit() |
---|
1430 | else: |
---|
1431 | counter = 0 |
---|
1432 | myw = 0 |
---|
1433 | for i, w in enumerate(ws['workers']): |
---|
1434 | if w['c'] < counter: |
---|
1435 | myw = i |
---|
1436 | counter = w['c'] |
---|
1437 | assigned_wn = wkgroups[gname]['workers'][myw]['name'] |
---|
1438 | d = dict( |
---|
1439 | status=ASSIGNED, |
---|
1440 | assigned_worker_name=assigned_wn |
---|
1441 | ) |
---|
1442 | db( |
---|
1443 | (st.id == task.id) & |
---|
1444 | (st.status.belongs((QUEUED, ASSIGNED))) |
---|
1445 | ).update(**d) |
---|
1446 | wkgroups[gname]['workers'][myw]['c'] += 1 |
---|
1447 | db.commit() |
---|
1448 | # I didn't report tasks but I'm working nonetheless!!!! |
---|
1449 | with self.w_stats_lock: |
---|
1450 | if tnum > 0: |
---|
1451 | self.w_stats.empty_runs = 0 |
---|
1452 | self.w_stats.queue = tnum |
---|
1453 | self.w_stats.distribution = wkgroups |
---|
1454 | self.w_stats.workers = len(all_workers) |
---|
1455 | # I'll be greedy only if tasks assigned are equal to the limit |
---|
1456 | # (meaning there could be others ready to be assigned) |
---|
1457 | self.greedy = tnum >= limit |
---|
1458 | logger.info('TICKER: workers are %s', len(all_workers)) |
---|
1459 | logger.info('TICKER: tasks are %s', tnum) |
---|
1460 | |
---|
1461 | def sleep(self): |
---|
1462 | """Calculate the number of seconds to sleep.""" |
---|
1463 | time.sleep(self.w_stats.sleep) |
---|
1464 | # should only sleep until next available task |
---|
1465 | |
---|
1466 | def set_worker_status(self, group_names=None, action=ACTIVE, |
---|
1467 | exclude=None, limit=None, worker_name=None): |
---|
1468 | """Internal function to set worker's status.""" |
---|
1469 | db = self.db |
---|
1470 | ws = db.scheduler_worker |
---|
1471 | if not group_names: |
---|
1472 | group_names = self.group_names |
---|
1473 | elif isinstance(group_names, str): |
---|
1474 | group_names = [group_names] |
---|
1475 | if worker_name: |
---|
1476 | db(ws.worker_name == worker_name).update(status=action) |
---|
1477 | return |
---|
1478 | exclusion = exclude and exclude.append(action) or [action] |
---|
1479 | if not limit: |
---|
1480 | for group in group_names: |
---|
1481 | db( |
---|
1482 | (ws.group_names.contains(group)) & |
---|
1483 | (~ws.status.belongs(exclusion)) |
---|
1484 | ).update(status=action) |
---|
1485 | else: |
---|
1486 | for group in group_names: |
---|
1487 | workers = db((ws.group_names.contains(group)) & |
---|
1488 | (~ws.status.belongs(exclusion)) |
---|
1489 | )._select(ws.id, limitby=(0, limit)) |
---|
1490 | db(ws.id.belongs(workers)).update(status=action) |
---|
1491 | |
---|
1492 | def disable(self, group_names=None, limit=None, worker_name=None): |
---|
1493 | """Set DISABLED on the workers processing `group_names` tasks. |
---|
1494 | |
---|
1495 | A DISABLED worker will be kept alive but it won't be able to process |
---|
1496 | any waiting tasks, essentially putting it to sleep. |
---|
1497 | By default, all group_names of Scheduler's instantation are selected |
---|
1498 | """ |
---|
1499 | self.set_worker_status( |
---|
1500 | group_names=group_names, |
---|
1501 | action=DISABLED, |
---|
1502 | exclude=[DISABLED, KILL, TERMINATE], |
---|
1503 | limit=limit) |
---|
1504 | |
---|
1505 | def resume(self, group_names=None, limit=None, worker_name=None): |
---|
1506 | """Wakes a worker up (it will be able to process queued tasks)""" |
---|
1507 | self.set_worker_status( |
---|
1508 | group_names=group_names, |
---|
1509 | action=ACTIVE, |
---|
1510 | exclude=[KILL, TERMINATE], |
---|
1511 | limit=limit) |
---|
1512 | |
---|
1513 | def terminate(self, group_names=None, limit=None, worker_name=None): |
---|
1514 | """Sets TERMINATE as worker status. The worker will wait for any |
---|
1515 | currently running tasks to be executed and then it will exit gracefully |
---|
1516 | """ |
---|
1517 | self.set_worker_status( |
---|
1518 | group_names=group_names, |
---|
1519 | action=TERMINATE, |
---|
1520 | exclude=[KILL], |
---|
1521 | limit=limit) |
---|
1522 | |
---|
1523 | def kill(self, group_names=None, limit=None, worker_name=None): |
---|
1524 | """Sets KILL as worker status. The worker will be killed even if it's |
---|
1525 | processing a task.""" |
---|
1526 | self.set_worker_status( |
---|
1527 | group_names=group_names, |
---|
1528 | action=KILL, |
---|
1529 | limit=limit) |
---|
1530 | |
---|
1531 | def queue_task(self, function, pargs=[], pvars={}, **kwargs): |
---|
1532 | """ |
---|
1533 | Queue tasks. This takes care of handling the validation of all |
---|
1534 | parameters |
---|
1535 | |
---|
1536 | Args: |
---|
1537 | function: the function (anything callable with a __name__) |
---|
1538 | pargs: "raw" args to be passed to the function. Automatically |
---|
1539 | jsonified. |
---|
1540 | pvars: "raw" kwargs to be passed to the function. Automatically |
---|
1541 | jsonified |
---|
1542 | kwargs: all the parameters available (basically, every |
---|
1543 | `scheduler_task` column). If args and vars are here, they |
---|
1544 | should be jsonified already, and they will override pargs |
---|
1545 | and pvars |
---|
1546 | |
---|
1547 | Returns: |
---|
1548 | a dict just as a normal validate_and_insert(), plus a uuid key |
---|
1549 | holding the uuid of the queued task. If validation is not passed |
---|
1550 | ( i.e. some parameters are invalid) both id and uuid will be None, |
---|
1551 | and you'll get an "error" dict holding the errors found. |
---|
1552 | """ |
---|
1553 | if hasattr(function, '__name__'): |
---|
1554 | function = function.__name__ |
---|
1555 | targs = 'args' in kwargs and kwargs.pop('args') or dumps(pargs) |
---|
1556 | tvars = 'vars' in kwargs and kwargs.pop('vars') or dumps(pvars) |
---|
1557 | tuuid = 'uuid' in kwargs and kwargs.pop('uuid') or web2py_uuid() |
---|
1558 | tname = 'task_name' in kwargs and kwargs.pop('task_name') or function |
---|
1559 | immediate = 'immediate' in kwargs and kwargs.pop('immediate') or None |
---|
1560 | cronline = kwargs.get('cronline') |
---|
1561 | kwargs.update( |
---|
1562 | function_name=function, |
---|
1563 | task_name=tname, |
---|
1564 | args=targs, |
---|
1565 | vars=tvars, |
---|
1566 | uuid=tuuid, |
---|
1567 | ) |
---|
1568 | if cronline: |
---|
1569 | try: |
---|
1570 | start_time = kwargs.get('start_time', self.now) |
---|
1571 | next_run_time = CronParser(cronline, start_time).next() |
---|
1572 | kwargs.update(start_time=start_time, next_run_time=next_run_time) |
---|
1573 | except Exception: |
---|
1574 | pass |
---|
1575 | if 'start_time' in kwargs and 'next_run_time' not in kwargs: |
---|
1576 | kwargs.update(next_run_time=kwargs['start_time']) |
---|
1577 | db = self.db |
---|
1578 | rtn = db.scheduler_task.validate_and_insert(**kwargs) |
---|
1579 | if not rtn.errors: |
---|
1580 | rtn.uuid = tuuid |
---|
1581 | if immediate: |
---|
1582 | db( |
---|
1583 | (db.scheduler_worker.is_ticker == True) |
---|
1584 | ).update(status=PICK) |
---|
1585 | else: |
---|
1586 | rtn.uuid = None |
---|
1587 | return rtn |
---|
1588 | |
---|
1589 | def task_status(self, ref, output=False): |
---|
1590 | """ |
---|
1591 | Retrieves task status and optionally the result of the task |
---|
1592 | |
---|
1593 | Args: |
---|
1594 | ref: can be |
---|
1595 | |
---|
1596 | - an integer : lookup will be done by scheduler_task.id |
---|
1597 | - a string : lookup will be done by scheduler_task.uuid |
---|
1598 | - a `Query` : lookup as you wish, e.g. :: |
---|
1599 | |
---|
1600 | db.scheduler_task.task_name == 'test1' |
---|
1601 | |
---|
1602 | output(bool): if `True`, fetch also the scheduler_run record |
---|
1603 | |
---|
1604 | Returns: |
---|
1605 | a single Row object, for the last queued task. |
---|
1606 | If output == True, returns also the last scheduler_run record. |
---|
1607 | The scheduler_run record is fetched by a left join, so it can |
---|
1608 | have all fields == None |
---|
1609 | |
---|
1610 | """ |
---|
1611 | from pydal.objects import Query |
---|
1612 | db = self.db |
---|
1613 | sr = db.scheduler_run |
---|
1614 | st = db.scheduler_task |
---|
1615 | if isinstance(ref, integer_types): |
---|
1616 | q = st.id == ref |
---|
1617 | elif isinstance(ref, str): |
---|
1618 | q = st.uuid == ref |
---|
1619 | elif isinstance(ref, Query): |
---|
1620 | q = ref |
---|
1621 | else: |
---|
1622 | raise SyntaxError( |
---|
1623 | "You can retrieve results only by id, uuid or Query") |
---|
1624 | fields = [st.ALL] |
---|
1625 | left = False |
---|
1626 | orderby = ~st.id |
---|
1627 | if output: |
---|
1628 | fields = st.ALL, sr.ALL |
---|
1629 | left = sr.on(sr.task_id == st.id) |
---|
1630 | orderby = ~st.id | ~sr.id |
---|
1631 | row = db(q).select( |
---|
1632 | *fields, |
---|
1633 | **dict(orderby=orderby, |
---|
1634 | left=left, |
---|
1635 | limitby=(0, 1)) |
---|
1636 | ).first() |
---|
1637 | if row and output: |
---|
1638 | row.result = row.scheduler_run.run_result and \ |
---|
1639 | loads(row.scheduler_run.run_result, |
---|
1640 | object_hook=_decode_dict) or None |
---|
1641 | return row |
---|
1642 | |
---|
1643 | def stop_task(self, ref): |
---|
1644 | """Shortcut for task termination. |
---|
1645 | |
---|
1646 | If the task is RUNNING it will terminate it, meaning that status |
---|
1647 | will be set as FAILED. |
---|
1648 | |
---|
1649 | If the task is QUEUED, its stop_time will be set as to "now", |
---|
1650 | the enabled flag will be set to False, and the status to STOPPED |
---|
1651 | |
---|
1652 | Args: |
---|
1653 | ref: can be |
---|
1654 | |
---|
1655 | - an integer : lookup will be done by scheduler_task.id |
---|
1656 | - a string : lookup will be done by scheduler_task.uuid |
---|
1657 | |
---|
1658 | Returns: |
---|
1659 | - 1 if task was stopped (meaning an update has been done) |
---|
1660 | - None if task was not found, or if task was not RUNNING or QUEUED |
---|
1661 | |
---|
1662 | Note: |
---|
1663 | Experimental |
---|
1664 | """ |
---|
1665 | db = self.db |
---|
1666 | st = db.scheduler_task |
---|
1667 | sw = db.scheduler_worker |
---|
1668 | if isinstance(ref, integer_types): |
---|
1669 | q = st.id == ref |
---|
1670 | elif isinstance(ref, str): |
---|
1671 | q = st.uuid == ref |
---|
1672 | else: |
---|
1673 | raise SyntaxError( |
---|
1674 | "You can retrieve results only by id or uuid") |
---|
1675 | task = db(q).select(st.id, st.status, st.assigned_worker_name) |
---|
1676 | task = task.first() |
---|
1677 | rtn = None |
---|
1678 | if not task: |
---|
1679 | return rtn |
---|
1680 | if task.status == 'RUNNING': |
---|
1681 | q = sw.worker_name == task.assigned_worker_name |
---|
1682 | rtn = db(q).update(status=STOP_TASK) |
---|
1683 | elif task.status == 'QUEUED': |
---|
1684 | rtn = db(q).update( |
---|
1685 | stop_time=self.now(), |
---|
1686 | enabled=False, |
---|
1687 | status=STOPPED) |
---|
1688 | return rtn |
---|
1689 | |
---|
1690 | def get_workers(self, only_ticker=False): |
---|
1691 | """ Returns a dict holding `worker_name : {**columns}` |
---|
1692 | representing all "registered" workers |
---|
1693 | only_ticker returns only the workers running as a TICKER, |
---|
1694 | if there are any |
---|
1695 | """ |
---|
1696 | db = self.db |
---|
1697 | if only_ticker: |
---|
1698 | workers = db(db.scheduler_worker.is_ticker == True).select() |
---|
1699 | else: |
---|
1700 | workers = db(db.scheduler_worker.id).select() |
---|
1701 | all_workers = {} |
---|
1702 | for row in workers: |
---|
1703 | all_workers[row.worker_name] = Storage( |
---|
1704 | status=row.status, |
---|
1705 | first_heartbeat=row.first_heartbeat, |
---|
1706 | last_heartbeat=row.last_heartbeat, |
---|
1707 | group_names=row.group_names, |
---|
1708 | is_ticker=row.is_ticker, |
---|
1709 | worker_stats=row.worker_stats |
---|
1710 | ) |
---|
1711 | return all_workers |
---|
1712 | |
---|
1713 | |
---|
1714 | def main(): |
---|
1715 | """ |
---|
1716 | allows to run worker without python web2py.py .... by simply:: |
---|
1717 | |
---|
1718 | python gluon/scheduler.py |
---|
1719 | |
---|
1720 | """ |
---|
1721 | import optparse |
---|
1722 | parser = optparse.OptionParser() |
---|
1723 | parser.add_option( |
---|
1724 | "-w", "--worker_name", dest="worker_name", default=None, |
---|
1725 | help="start a worker with name") |
---|
1726 | parser.add_option( |
---|
1727 | "-b", "--heartbeat", dest="heartbeat", default=10, |
---|
1728 | type='int', help="heartbeat time in seconds (default 10)") |
---|
1729 | parser.add_option( |
---|
1730 | "-L", "--logger_level", dest="logger_level", |
---|
1731 | default=30, |
---|
1732 | type='int', |
---|
1733 | help="set debug output level (0-100, 0 means all, 100 means none;default is 30)") |
---|
1734 | parser.add_option("-E", "--empty-runs", |
---|
1735 | dest="max_empty_runs", |
---|
1736 | type='int', |
---|
1737 | default=0, |
---|
1738 | help="max loops with no grabbed tasks permitted (0 for never check)") |
---|
1739 | parser.add_option( |
---|
1740 | "-g", "--group_names", dest="group_names", |
---|
1741 | default='main', |
---|
1742 | help="comma separated list of groups to be picked by the worker") |
---|
1743 | parser.add_option( |
---|
1744 | "-f", "--db_folder", dest="db_folder", |
---|
1745 | default='/Users/mdipierro/web2py/applications/scheduler/databases', |
---|
1746 | help="location of the dal database folder") |
---|
1747 | parser.add_option( |
---|
1748 | "-u", "--db_uri", dest="db_uri", |
---|
1749 | default='sqlite://storage.sqlite', |
---|
1750 | help="database URI string (web2py DAL syntax)") |
---|
1751 | parser.add_option( |
---|
1752 | "-t", "--tasks", dest="tasks", default=None, |
---|
1753 | help="file containing task files, must define" + |
---|
1754 | "tasks = {'task_name':(lambda: 'output')} or similar set of tasks") |
---|
1755 | parser.add_option( |
---|
1756 | "-U", "--utc-time", dest="utc_time", default=False, |
---|
1757 | help="work with UTC timestamps" |
---|
1758 | ) |
---|
1759 | (options, args) = parser.parse_args() |
---|
1760 | if not options.tasks or not options.db_uri: |
---|
1761 | print(USAGE) |
---|
1762 | if options.tasks: |
---|
1763 | path, filename = os.path.split(options.tasks) |
---|
1764 | if filename.endswith('.py'): |
---|
1765 | filename = filename[:-3] |
---|
1766 | sys.path.append(path) |
---|
1767 | print('importing tasks...') |
---|
1768 | tasks = __import__(filename, globals(), locals(), [], -1).tasks |
---|
1769 | print('tasks found: ' + ', '.join(list(tasks.keys()))) |
---|
1770 | else: |
---|
1771 | tasks = {} |
---|
1772 | group_names = [x.strip() for x in options.group_names.split(',')] |
---|
1773 | |
---|
1774 | logging.getLogger().setLevel(options.logger_level) |
---|
1775 | |
---|
1776 | print('groups for this worker: ' + ', '.join(group_names)) |
---|
1777 | print('connecting to database in folder: ' + options.db_folder or './') |
---|
1778 | print('using URI: ' + options.db_uri) |
---|
1779 | db = DAL(options.db_uri, folder=options.db_folder, decode_credentials=True) |
---|
1780 | print('instantiating scheduler...') |
---|
1781 | scheduler = Scheduler(db=db, |
---|
1782 | worker_name=options.worker_name, |
---|
1783 | tasks=tasks, |
---|
1784 | migrate=True, |
---|
1785 | group_names=group_names, |
---|
1786 | heartbeat=options.heartbeat, |
---|
1787 | max_empty_runs=options.max_empty_runs, |
---|
1788 | utc_time=options.utc_time) |
---|
1789 | signal.signal(signal.SIGTERM, lambda signum, stack_frame: sys.exit(1)) |
---|
1790 | print('starting main worker loop...') |
---|
1791 | scheduler.loop() |
---|
1792 | |
---|
1793 | if __name__ == '__main__': |
---|
1794 | main() |
---|