1 | # -*- coding: utf-8 -*- |
---|
2 | # vim: set ts=4 sw=4 et ai: |
---|
3 | |
---|
4 | """ |
---|
5 | | This file is part of the web2py Web Framework |
---|
6 | | Created by Attila Csipa <web2py@csipa.in.rs> |
---|
7 | | Modified by Massimo Di Pierro <mdipierro@cs.depaul.edu> |
---|
8 | | Worker, SoftWorker and SimplePool added by Paolo Pastori |
---|
9 | | License: LGPLv3 (http://www.gnu.org/licenses/lgpl.html) |
---|
10 | |
---|
11 | Cron-style interface |
---|
12 | """ |
---|
13 | |
---|
14 | import threading |
---|
15 | import os |
---|
16 | from logging import getLogger |
---|
17 | import time |
---|
18 | import sched |
---|
19 | import sys |
---|
20 | import re |
---|
21 | from functools import reduce |
---|
22 | import datetime |
---|
23 | import shlex |
---|
24 | |
---|
25 | from gluon import fileutils |
---|
26 | from gluon._compat import to_bytes, pickle |
---|
27 | from pydal.contrib import portalocker |
---|
28 | |
---|
29 | logger_name = 'web2py.cron' |
---|
30 | |
---|
31 | |
---|
32 | _stopping = False |
---|
33 | |
---|
34 | def reset(): |
---|
35 | global _stopping |
---|
36 | _stopping = False |
---|
37 | |
---|
38 | |
---|
39 | _subprocs_lock = threading.RLock() |
---|
40 | _subprocs = [] |
---|
41 | |
---|
42 | def subprocess_count(): |
---|
43 | with _subprocs_lock: |
---|
44 | return len(_subprocs) |
---|
45 | |
---|
46 | |
---|
47 | def absolute_path_link(path): |
---|
48 | """ |
---|
49 | Returns an absolute path for the destination of a symlink |
---|
50 | """ |
---|
51 | if os.path.islink(path): |
---|
52 | link = os.readlink(path) |
---|
53 | if not os.path.isabs(link): |
---|
54 | link = os.path.join(os.path.dirname(path), link) |
---|
55 | else: |
---|
56 | link = os.path.abspath(path) |
---|
57 | return link |
---|
58 | |
---|
59 | |
---|
60 | def stopcron(): |
---|
61 | """Graceful shutdown of cron""" |
---|
62 | global _stopping |
---|
63 | _stopping = True |
---|
64 | while subprocess_count(): |
---|
65 | with _subprocs_lock: |
---|
66 | proc = _subprocs.pop() |
---|
67 | if proc.poll() is None: |
---|
68 | try: |
---|
69 | proc.terminate() |
---|
70 | except Exception: |
---|
71 | getLogger(logger_name).exception('error in stopcron') |
---|
72 | |
---|
73 | |
---|
74 | def extcron(applications_parent, apps=None): |
---|
75 | getLogger(logger_name).debug('external cron invocation') |
---|
76 | crondance(applications_parent, 'external', startup=False, apps=apps) |
---|
77 | |
---|
78 | |
---|
79 | class hardcron(threading.Thread): |
---|
80 | |
---|
81 | def __init__(self, applications_parent, apps=None): |
---|
82 | threading.Thread.__init__(self) |
---|
83 | self.setDaemon(True) |
---|
84 | self.path = applications_parent |
---|
85 | self.apps = apps |
---|
86 | # processing of '@reboot' entries in crontab (startup=True) |
---|
87 | getLogger(logger_name).info('hard cron bootstrap') |
---|
88 | crondance(self.path, 'hard', startup=True, apps=self.apps) |
---|
89 | |
---|
90 | def launch(self): |
---|
91 | if not _stopping: |
---|
92 | self.logger.debug('hard cron invocation') |
---|
93 | crondance(self.path, 'hard', startup=False, apps=self.apps) |
---|
94 | |
---|
95 | def run(self): |
---|
96 | self.logger = getLogger(logger_name) |
---|
97 | self.logger.info('hard cron daemon started') |
---|
98 | s = sched.scheduler(time.time, time.sleep) |
---|
99 | while not _stopping: |
---|
100 | now = time.time() |
---|
101 | s.enter(60 - now % 60, 1, self.launch, ()) |
---|
102 | s.run() |
---|
103 | |
---|
104 | |
---|
105 | def softcron(applications_parent, apps=None): |
---|
106 | logger = getLogger(logger_name) |
---|
107 | try: |
---|
108 | if not _dancer((applications_parent, apps)): |
---|
109 | logger.warning('no thread available for soft crondance') |
---|
110 | except Exception: |
---|
111 | logger.exception('error executing soft crondance') |
---|
112 | |
---|
113 | |
---|
114 | class Token(object): |
---|
115 | |
---|
116 | def __init__(self, path): |
---|
117 | self.path = os.path.join(path, 'cron.master') |
---|
118 | if not os.path.exists(self.path): |
---|
119 | fileutils.write_file(self.path, to_bytes(''), 'wb') |
---|
120 | self.master = None |
---|
121 | self.now = time.time() |
---|
122 | self.logger = getLogger(logger_name) |
---|
123 | |
---|
124 | def acquire(self, startup=False): |
---|
125 | """ |
---|
126 | Returns the time when the lock is acquired or |
---|
127 | None if cron already running |
---|
128 | |
---|
129 | lock is implemented by writing a pickle (start, stop) in cron.master |
---|
130 | start is time when cron job starts and stop is time when cron completed |
---|
131 | stop == 0 if job started but did not yet complete |
---|
132 | if a cron job started within less than 60 seconds, acquire returns None |
---|
133 | if a cron job started before 60 seconds and did not stop, |
---|
134 | a warning is issued ("Stale cron.master detected") |
---|
135 | """ |
---|
136 | if sys.platform == 'win32': |
---|
137 | locktime = 59.5 |
---|
138 | else: |
---|
139 | locktime = 59.99 |
---|
140 | if portalocker.LOCK_EX is None: |
---|
141 | self.logger.warning('cron disabled because no file locking') |
---|
142 | return None |
---|
143 | self.master = fileutils.open_file(self.path, 'rb+') |
---|
144 | ret = None |
---|
145 | try: |
---|
146 | portalocker.lock(self.master, portalocker.LOCK_EX) |
---|
147 | try: |
---|
148 | (start, stop) = pickle.load(self.master) |
---|
149 | except: |
---|
150 | start = 0 |
---|
151 | stop = 1 |
---|
152 | if startup or self.now - start > locktime: |
---|
153 | ret = self.now |
---|
154 | if not stop: |
---|
155 | # this happens if previous cron job longer than 1 minute |
---|
156 | self.logger.warning('stale cron.master detected') |
---|
157 | self.logger.debug('acquiring lock') |
---|
158 | self.master.seek(0) |
---|
159 | pickle.dump((self.now, 0), self.master) |
---|
160 | self.master.flush() |
---|
161 | finally: |
---|
162 | portalocker.unlock(self.master) |
---|
163 | if not ret: |
---|
164 | # do this so no need to release |
---|
165 | self.master.close() |
---|
166 | return ret |
---|
167 | |
---|
168 | def release(self): |
---|
169 | """ |
---|
170 | Writes into cron.master the time when cron job was completed |
---|
171 | """ |
---|
172 | ret = self.master.closed |
---|
173 | if not self.master.closed: |
---|
174 | portalocker.lock(self.master, portalocker.LOCK_EX) |
---|
175 | self.logger.debug('releasing cron lock') |
---|
176 | self.master.seek(0) |
---|
177 | (start, stop) = pickle.load(self.master) |
---|
178 | if start == self.now: # if this is my lock |
---|
179 | self.master.seek(0) |
---|
180 | pickle.dump((self.now, time.time()), self.master) |
---|
181 | portalocker.unlock(self.master) |
---|
182 | self.master.close() |
---|
183 | return ret |
---|
184 | |
---|
185 | |
---|
186 | def rangetolist(s, period='min'): |
---|
187 | retval = [] |
---|
188 | if s.startswith('*'): |
---|
189 | if period == 'min': |
---|
190 | s = s.replace('*', '0-59', 1) |
---|
191 | elif period == 'hr': |
---|
192 | s = s.replace('*', '0-23', 1) |
---|
193 | elif period == 'dom': |
---|
194 | s = s.replace('*', '1-31', 1) |
---|
195 | elif period == 'mon': |
---|
196 | s = s.replace('*', '1-12', 1) |
---|
197 | elif period == 'dow': |
---|
198 | s = s.replace('*', '0-6', 1) |
---|
199 | match = re.match(r'(\d+)-(\d+)/(\d+)', s) |
---|
200 | if match: |
---|
201 | for i in range(int(match.group(1)), int(match.group(2)) + 1): |
---|
202 | if i % int(match.group(3)) == 0: |
---|
203 | retval.append(i) |
---|
204 | return retval |
---|
205 | |
---|
206 | |
---|
207 | def parsecronline(line): |
---|
208 | task = {} |
---|
209 | if line.startswith('@reboot'): |
---|
210 | line = line.replace('@reboot', '-1 * * * *') |
---|
211 | elif line.startswith('@yearly'): |
---|
212 | line = line.replace('@yearly', '0 0 1 1 *') |
---|
213 | elif line.startswith('@annually'): |
---|
214 | line = line.replace('@annually', '0 0 1 1 *') |
---|
215 | elif line.startswith('@monthly'): |
---|
216 | line = line.replace('@monthly', '0 0 1 * *') |
---|
217 | elif line.startswith('@weekly'): |
---|
218 | line = line.replace('@weekly', '0 0 * * 0') |
---|
219 | elif line.startswith('@daily'): |
---|
220 | line = line.replace('@daily', '0 0 * * *') |
---|
221 | elif line.startswith('@midnight'): |
---|
222 | line = line.replace('@midnight', '0 0 * * *') |
---|
223 | elif line.startswith('@hourly'): |
---|
224 | line = line.replace('@hourly', '0 * * * *') |
---|
225 | params = line.strip().split(None, 6) |
---|
226 | if len(params) < 7: |
---|
227 | return None |
---|
228 | daysofweek = {'sun': 0, 'mon': 1, 'tue': 2, 'wed': 3, |
---|
229 | 'thu': 4, 'fri': 5, 'sat': 6} |
---|
230 | for (s, id) in zip(params[:5], ['min', 'hr', 'dom', 'mon', 'dow']): |
---|
231 | if not s in [None, '*']: |
---|
232 | task[id] = [] |
---|
233 | vals = s.split(',') |
---|
234 | for val in vals: |
---|
235 | if val != '-1' and '-' in val and '/' not in val: |
---|
236 | val = '%s/1' % val |
---|
237 | if '/' in val: |
---|
238 | task[id] += rangetolist(val, id) |
---|
239 | elif val.isdigit() or val == '-1': |
---|
240 | task[id].append(int(val)) |
---|
241 | elif id == 'dow' and val[:3].lower() in daysofweek: |
---|
242 | task[id].append(daysofweek[val[:3].lower()]) |
---|
243 | task['user'] = params[5] |
---|
244 | task['cmd'] = params[6] |
---|
245 | return task |
---|
246 | |
---|
247 | |
---|
248 | class Worker(threading.Thread): |
---|
249 | |
---|
250 | def __init__(self, pool): |
---|
251 | threading.Thread.__init__(self) |
---|
252 | self.setDaemon(True) |
---|
253 | self.pool = pool |
---|
254 | self.run_lock = threading.Lock() |
---|
255 | self.run_lock.acquire() |
---|
256 | self.payload = None |
---|
257 | |
---|
258 | def run(self): |
---|
259 | logger = getLogger(logger_name) |
---|
260 | logger.info('Worker %s: started', self.name) |
---|
261 | while True: |
---|
262 | try: |
---|
263 | with self.run_lock: # waiting for run_lock.release() |
---|
264 | cmd = ' '.join(self.payload) |
---|
265 | logger.debug('Worker %s: now calling %r', self.name, cmd) |
---|
266 | import subprocess |
---|
267 | proc = subprocess.Popen(self.payload, |
---|
268 | stdin=subprocess.PIPE, |
---|
269 | stdout=subprocess.PIPE, |
---|
270 | stderr=subprocess.PIPE) |
---|
271 | with _subprocs_lock: |
---|
272 | _subprocs.append(proc) |
---|
273 | stdoutdata, stderrdata = proc.communicate() |
---|
274 | try: |
---|
275 | with _subprocs_lock: |
---|
276 | _subprocs.remove(proc) |
---|
277 | except ValueError: |
---|
278 | pass |
---|
279 | if proc.returncode != 0: |
---|
280 | logger.warning('Worker %s: %r call returned code %s:\n%s\n%s', |
---|
281 | self.name, cmd, proc.returncode, stdoutdata, stderrdata) |
---|
282 | else: |
---|
283 | logger.debug('Worker %s: %r call returned success:\n%s', |
---|
284 | self.name, cmd, stdoutdata) |
---|
285 | finally: |
---|
286 | self.run_lock.acquire() |
---|
287 | self.pool.stop(self) |
---|
288 | |
---|
289 | |
---|
290 | class SoftWorker(threading.Thread): |
---|
291 | |
---|
292 | def __init__(self, pool): |
---|
293 | threading.Thread.__init__(self) |
---|
294 | self.setDaemon(True) |
---|
295 | self.pool = pool |
---|
296 | self.run_lock = threading.Lock() |
---|
297 | self.run_lock.acquire() |
---|
298 | self.payload = None |
---|
299 | |
---|
300 | def run(self): |
---|
301 | logger = getLogger(logger_name) |
---|
302 | logger.info('SoftWorker %s: started', self.name) |
---|
303 | while True: |
---|
304 | try: |
---|
305 | with self.run_lock: # waiting for run_lock.release() |
---|
306 | getLogger(logger_name).debug('soft cron invocation') |
---|
307 | applications_parent, apps = self.payload |
---|
308 | crondance(applications_parent, 'soft', startup=False, apps=apps) |
---|
309 | finally: |
---|
310 | self.run_lock.acquire() |
---|
311 | self.pool.stop(self) |
---|
312 | |
---|
313 | |
---|
314 | class SimplePool(object): |
---|
315 | """ |
---|
316 | Very simple thread pool, |
---|
317 | (re)uses a maximum number of threads to launch cron tasks. |
---|
318 | |
---|
319 | Pool size can be incremented after initialization, |
---|
320 | this allows delayed configuration of a global instance |
---|
321 | for the case you do not want to use lazy initialization. |
---|
322 | """ |
---|
323 | |
---|
324 | def __init__(self, size, worker_cls=Worker): |
---|
325 | """ |
---|
326 | Create the pool setting initial size. |
---|
327 | |
---|
328 | Notice that no thread is created until the instance is called. |
---|
329 | """ |
---|
330 | self.size = size |
---|
331 | self.worker_cls = worker_cls |
---|
332 | self.lock = threading.RLock() |
---|
333 | self.idle = list() |
---|
334 | self.running = set() |
---|
335 | |
---|
336 | def grow(self, size): |
---|
337 | if size and size > self.size: |
---|
338 | self.size = size |
---|
339 | |
---|
340 | def start(self, t): |
---|
341 | with self.lock: |
---|
342 | try: |
---|
343 | self.idle.remove(t) |
---|
344 | except ValueError: |
---|
345 | pass |
---|
346 | self.running.add(t) |
---|
347 | |
---|
348 | def stop(self, t): |
---|
349 | with self.lock: |
---|
350 | self.idle.append(t) |
---|
351 | try: |
---|
352 | self.running.remove(t) |
---|
353 | except KeyError: |
---|
354 | pass |
---|
355 | |
---|
356 | def __call__(self, payload): |
---|
357 | """ |
---|
358 | Pass payload to a thread for immediate execution. |
---|
359 | |
---|
360 | Returns a boolean indicating if a thread is available. |
---|
361 | """ |
---|
362 | with self.lock: |
---|
363 | if len(self.running) == self.size: |
---|
364 | # no worker available |
---|
365 | return False |
---|
366 | idle_num = len(self.idle) |
---|
367 | if idle_num: |
---|
368 | # use an existing (idle) thread |
---|
369 | t = self.idle.pop(0) |
---|
370 | else: |
---|
371 | # create a new thread |
---|
372 | t = self.worker_cls(self) |
---|
373 | self.start(t) |
---|
374 | t.payload = payload |
---|
375 | t.run_lock.release() |
---|
376 | if not idle_num: |
---|
377 | t.start() |
---|
378 | return True |
---|
379 | |
---|
380 | |
---|
381 | _dancer = SimplePool(5, worker_cls=SoftWorker) |
---|
382 | |
---|
383 | def dancer_size(size): |
---|
384 | _dancer.grow(size) |
---|
385 | |
---|
386 | _launcher = SimplePool(5) |
---|
387 | |
---|
388 | def launcher_size(size): |
---|
389 | _launcher.grow(size) |
---|
390 | |
---|
391 | def crondance(applications_parent, ctype='hard', startup=False, apps=None): |
---|
392 | """ |
---|
393 | Does the periodic job of cron service: read the crontab(s) and launch |
---|
394 | the various commands. |
---|
395 | """ |
---|
396 | apppath = os.path.join(applications_parent, 'applications') |
---|
397 | token = Token(applications_parent) |
---|
398 | cronmaster = token.acquire(startup=startup) |
---|
399 | if not cronmaster: |
---|
400 | return |
---|
401 | try: |
---|
402 | now_s = time.localtime() |
---|
403 | checks = (('min', now_s.tm_min), |
---|
404 | ('hr', now_s.tm_hour), |
---|
405 | ('mon', now_s.tm_mon), |
---|
406 | ('dom', now_s.tm_mday), |
---|
407 | ('dow', (now_s.tm_wday + 1) % 7)) |
---|
408 | logger = getLogger(logger_name) |
---|
409 | |
---|
410 | if not apps: |
---|
411 | apps = [x for x in os.listdir(apppath) |
---|
412 | if os.path.isdir(os.path.join(apppath, x))] |
---|
413 | |
---|
414 | full_apath_links = set() |
---|
415 | |
---|
416 | if sys.executable.lower().endswith('pythonservice.exe'): |
---|
417 | _python_exe = os.path.join(sys.exec_prefix, 'python.exe') |
---|
418 | else: |
---|
419 | _python_exe = sys.executable |
---|
420 | base_commands = [_python_exe] |
---|
421 | w2p_path = fileutils.abspath('web2py.py', gluon=True) |
---|
422 | if os.path.exists(w2p_path): |
---|
423 | base_commands.append(w2p_path) |
---|
424 | base_commands.extend(('--cron_job', '--no_banner', '--no_gui', '--plain')) |
---|
425 | |
---|
426 | for app in apps: |
---|
427 | if _stopping: |
---|
428 | break |
---|
429 | apath = os.path.join(apppath, app) |
---|
430 | |
---|
431 | # if app is a symbolic link to other app, skip it |
---|
432 | full_apath_link = absolute_path_link(apath) |
---|
433 | if full_apath_link in full_apath_links: |
---|
434 | continue |
---|
435 | else: |
---|
436 | full_apath_links.add(full_apath_link) |
---|
437 | |
---|
438 | cronpath = os.path.join(apath, 'cron') |
---|
439 | crontab = os.path.join(cronpath, 'crontab') |
---|
440 | if not os.path.exists(crontab): |
---|
441 | continue |
---|
442 | try: |
---|
443 | cronlines = [line.strip() for line in fileutils.readlines_file(crontab, 'rt')] |
---|
444 | lines = [line for line in cronlines if line and not line.startswith('#')] |
---|
445 | tasks = [parsecronline(cline) for cline in lines] |
---|
446 | except Exception as e: |
---|
447 | logger.error('crontab read error %s', e) |
---|
448 | continue |
---|
449 | |
---|
450 | for task in tasks: |
---|
451 | if _stopping: |
---|
452 | break |
---|
453 | if not task: |
---|
454 | continue |
---|
455 | task_min = task.get('min', []) |
---|
456 | if not startup and task_min == [-1]: |
---|
457 | continue |
---|
458 | citems = [(k in task and not v in task[k]) for k, v in checks] |
---|
459 | if task_min != [-1] and reduce(lambda a, b: a or b, citems): |
---|
460 | continue |
---|
461 | |
---|
462 | logger.info('%s cron: %s executing %r in %s at %s', |
---|
463 | ctype, app, task.get('cmd'), |
---|
464 | os.getcwd(), datetime.datetime.now()) |
---|
465 | action = models = False |
---|
466 | command = task['cmd'] |
---|
467 | if command.startswith('**'): |
---|
468 | action = True |
---|
469 | command = command[2:] |
---|
470 | elif command.startswith('*'): |
---|
471 | action = models = True |
---|
472 | command = command[1:] |
---|
473 | |
---|
474 | if action: |
---|
475 | commands = base_commands[:] |
---|
476 | if command.endswith('.py'): |
---|
477 | commands.extend(('-S', app, '-R', command)) |
---|
478 | else: |
---|
479 | commands.extend(('-S', app + '/' + command)) |
---|
480 | if models: |
---|
481 | commands.append('-M') |
---|
482 | else: |
---|
483 | commands = shlex.split(command) |
---|
484 | |
---|
485 | try: |
---|
486 | if not _launcher(commands): |
---|
487 | logger.warning('no thread available, cannot execute %r', task['cmd']) |
---|
488 | except Exception: |
---|
489 | logger.exception('error executing %r', task['cmd']) |
---|
490 | finally: |
---|
491 | token.release() |
---|