source: OpenRLabs-Git/deploy/rlabs-docker/web2py-rlabs/gluon/newcron.py

main
Last change on this file was 42bd667, checked in by David Fuertes <dfuertes@…>, 4 years ago

Historial Limpio

  • Property mode set to 100755
File size: 16.0 KB
Line 
1# -*- coding: utf-8 -*-
2# vim: set ts=4 sw=4 et ai:
3
4"""
5| This file is part of the web2py Web Framework
6| Created by Attila Csipa <web2py@csipa.in.rs>
7| Modified by Massimo Di Pierro <mdipierro@cs.depaul.edu>
8| Worker, SoftWorker and SimplePool added by Paolo Pastori
9| License: LGPLv3 (http://www.gnu.org/licenses/lgpl.html)
10
11Cron-style interface
12"""
13
14import threading
15import os
16from logging import getLogger
17import time
18import sched
19import sys
20import re
21from functools import reduce
22import datetime
23import shlex
24
25from gluon import fileutils
26from gluon._compat import to_bytes, pickle
27from pydal.contrib import portalocker
28
29logger_name = 'web2py.cron'
30
31
32_stopping = False
33
34def reset():
35    global _stopping
36    _stopping = False
37
38
39_subprocs_lock = threading.RLock()
40_subprocs = []
41
42def subprocess_count():
43    with _subprocs_lock:
44        return len(_subprocs)
45
46
47def absolute_path_link(path):
48    """
49    Returns an absolute path for the destination of a symlink
50    """
51    if os.path.islink(path):
52        link = os.readlink(path)
53        if not os.path.isabs(link):
54            link = os.path.join(os.path.dirname(path), link)
55    else:
56        link = os.path.abspath(path)
57    return link
58
59
60def stopcron():
61    """Graceful shutdown of cron"""
62    global _stopping
63    _stopping = True
64    while subprocess_count():
65        with _subprocs_lock:
66            proc = _subprocs.pop()
67        if proc.poll() is None:
68            try:
69                proc.terminate()
70            except Exception:
71                getLogger(logger_name).exception('error in stopcron')
72
73
74def extcron(applications_parent, apps=None):
75    getLogger(logger_name).debug('external cron invocation')
76    crondance(applications_parent, 'external', startup=False, apps=apps)
77
78
79class hardcron(threading.Thread):
80
81    def __init__(self, applications_parent, apps=None):
82        threading.Thread.__init__(self)
83        self.setDaemon(True)
84        self.path = applications_parent
85        self.apps = apps
86        # processing of '@reboot' entries in crontab (startup=True)
87        getLogger(logger_name).info('hard cron bootstrap')
88        crondance(self.path, 'hard', startup=True, apps=self.apps)
89
90    def launch(self):
91        if not _stopping:
92            self.logger.debug('hard cron invocation')
93            crondance(self.path, 'hard', startup=False, apps=self.apps)
94
95    def run(self):
96        self.logger = getLogger(logger_name)
97        self.logger.info('hard cron daemon started')
98        s = sched.scheduler(time.time, time.sleep)
99        while not _stopping:
100            now = time.time()
101            s.enter(60 - now % 60, 1, self.launch, ())
102            s.run()
103
104
105def softcron(applications_parent, apps=None):
106    logger = getLogger(logger_name)
107    try:
108        if not _dancer((applications_parent, apps)):
109            logger.warning('no thread available for soft crondance')
110    except Exception:
111        logger.exception('error executing soft crondance')
112
113
114class Token(object):
115
116    def __init__(self, path):
117        self.path = os.path.join(path, 'cron.master')
118        if not os.path.exists(self.path):
119            fileutils.write_file(self.path, to_bytes(''), 'wb')
120        self.master = None
121        self.now = time.time()
122        self.logger = getLogger(logger_name)
123
124    def acquire(self, startup=False):
125        """
126        Returns the time when the lock is acquired or
127        None if cron already running
128
129        lock is implemented by writing a pickle (start, stop) in cron.master
130        start is time when cron job starts and stop is time when cron completed
131        stop == 0 if job started but did not yet complete
132        if a cron job started within less than 60 seconds, acquire returns None
133        if a cron job started before 60 seconds and did not stop,
134        a warning is issued ("Stale cron.master detected")
135        """
136        if sys.platform == 'win32':
137            locktime = 59.5
138        else:
139            locktime = 59.99
140        if portalocker.LOCK_EX is None:
141            self.logger.warning('cron disabled because no file locking')
142            return None
143        self.master = fileutils.open_file(self.path, 'rb+')
144        ret = None
145        try:
146            portalocker.lock(self.master, portalocker.LOCK_EX)
147            try:
148                (start, stop) = pickle.load(self.master)
149            except:
150                start = 0
151                stop = 1
152            if startup or self.now - start > locktime:
153                ret = self.now
154                if not stop:
155                    # this happens if previous cron job longer than 1 minute
156                    self.logger.warning('stale cron.master detected')
157                self.logger.debug('acquiring lock')
158                self.master.seek(0)
159                pickle.dump((self.now, 0), self.master)
160                self.master.flush()
161        finally:
162            portalocker.unlock(self.master)
163        if not ret:
164            # do this so no need to release
165            self.master.close()
166        return ret
167
168    def release(self):
169        """
170        Writes into cron.master the time when cron job was completed
171        """
172        ret = self.master.closed
173        if not self.master.closed:
174            portalocker.lock(self.master, portalocker.LOCK_EX)
175            self.logger.debug('releasing cron lock')
176            self.master.seek(0)
177            (start, stop) = pickle.load(self.master)
178            if start == self.now:  # if this is my lock
179                self.master.seek(0)
180                pickle.dump((self.now, time.time()), self.master)
181            portalocker.unlock(self.master)
182            self.master.close()
183        return ret
184
185
186def rangetolist(s, period='min'):
187    retval = []
188    if s.startswith('*'):
189        if period == 'min':
190            s = s.replace('*', '0-59', 1)
191        elif period == 'hr':
192            s = s.replace('*', '0-23', 1)
193        elif period == 'dom':
194            s = s.replace('*', '1-31', 1)
195        elif period == 'mon':
196            s = s.replace('*', '1-12', 1)
197        elif period == 'dow':
198            s = s.replace('*', '0-6', 1)
199    match = re.match(r'(\d+)-(\d+)/(\d+)', s)
200    if match:
201        for i in range(int(match.group(1)), int(match.group(2)) + 1):
202            if i % int(match.group(3)) == 0:
203                retval.append(i)
204    return retval
205
206
207def parsecronline(line):
208    task = {}
209    if line.startswith('@reboot'):
210        line = line.replace('@reboot', '-1 * * * *')
211    elif line.startswith('@yearly'):
212        line = line.replace('@yearly', '0 0 1 1 *')
213    elif line.startswith('@annually'):
214        line = line.replace('@annually', '0 0 1 1 *')
215    elif line.startswith('@monthly'):
216        line = line.replace('@monthly', '0 0 1 * *')
217    elif line.startswith('@weekly'):
218        line = line.replace('@weekly', '0 0 * * 0')
219    elif line.startswith('@daily'):
220        line = line.replace('@daily', '0 0 * * *')
221    elif line.startswith('@midnight'):
222        line = line.replace('@midnight', '0 0 * * *')
223    elif line.startswith('@hourly'):
224        line = line.replace('@hourly', '0 * * * *')
225    params = line.strip().split(None, 6)
226    if len(params) < 7:
227        return None
228    daysofweek = {'sun': 0, 'mon': 1, 'tue': 2, 'wed': 3,
229                  'thu': 4, 'fri': 5, 'sat': 6}
230    for (s, id) in zip(params[:5], ['min', 'hr', 'dom', 'mon', 'dow']):
231        if not s in [None, '*']:
232            task[id] = []
233            vals = s.split(',')
234            for val in vals:
235                if val != '-1' and '-' in val and '/' not in val:
236                    val = '%s/1' % val
237                if '/' in val:
238                    task[id] += rangetolist(val, id)
239                elif val.isdigit() or val == '-1':
240                    task[id].append(int(val))
241                elif id == 'dow' and val[:3].lower() in daysofweek:
242                    task[id].append(daysofweek[val[:3].lower()])
243    task['user'] = params[5]
244    task['cmd'] = params[6]
245    return task
246
247
248class Worker(threading.Thread):
249
250    def __init__(self, pool):
251        threading.Thread.__init__(self)
252        self.setDaemon(True)
253        self.pool = pool
254        self.run_lock = threading.Lock()
255        self.run_lock.acquire()
256        self.payload = None
257
258    def run(self):
259        logger = getLogger(logger_name)
260        logger.info('Worker %s: started', self.name)
261        while True:
262            try:
263                with self.run_lock:  # waiting for run_lock.release()
264                    cmd = ' '.join(self.payload)
265                    logger.debug('Worker %s: now calling %r', self.name, cmd)
266                    import subprocess
267                    proc = subprocess.Popen(self.payload,
268                                    stdin=subprocess.PIPE,
269                                    stdout=subprocess.PIPE,
270                                    stderr=subprocess.PIPE)
271                    with _subprocs_lock:
272                        _subprocs.append(proc)
273                    stdoutdata, stderrdata = proc.communicate()
274                    try:
275                        with _subprocs_lock:
276                            _subprocs.remove(proc)
277                    except ValueError:
278                        pass
279                    if proc.returncode != 0:
280                        logger.warning('Worker %s: %r call returned code %s:\n%s\n%s',
281                            self.name, cmd, proc.returncode, stdoutdata, stderrdata)
282                    else:
283                        logger.debug('Worker %s: %r call returned success:\n%s',
284                            self.name, cmd, stdoutdata)
285            finally:
286                self.run_lock.acquire()
287                self.pool.stop(self)
288
289
290class SoftWorker(threading.Thread):
291
292    def __init__(self, pool):
293        threading.Thread.__init__(self)
294        self.setDaemon(True)
295        self.pool = pool
296        self.run_lock = threading.Lock()
297        self.run_lock.acquire()
298        self.payload = None
299
300    def run(self):
301        logger = getLogger(logger_name)
302        logger.info('SoftWorker %s: started', self.name)
303        while True:
304            try:
305                with self.run_lock:  # waiting for run_lock.release()
306                    getLogger(logger_name).debug('soft cron invocation')
307                    applications_parent, apps = self.payload
308                    crondance(applications_parent, 'soft', startup=False, apps=apps)
309            finally:
310                self.run_lock.acquire()
311                self.pool.stop(self)
312
313
314class SimplePool(object):
315    """
316    Very simple thread pool,
317    (re)uses a maximum number of threads to launch cron tasks.
318
319    Pool size can be incremented after initialization,
320    this allows delayed configuration of a global instance
321    for the case you do not want to use lazy initialization.
322    """
323
324    def __init__(self, size, worker_cls=Worker):
325        """
326        Create the pool setting initial size.
327
328        Notice that no thread is created until the instance is called.
329        """
330        self.size = size
331        self.worker_cls = worker_cls
332        self.lock = threading.RLock()
333        self.idle = list()
334        self.running = set()
335
336    def grow(self, size):
337        if size and size > self.size:
338            self.size = size
339
340    def start(self, t):
341        with self.lock:
342            try:
343                self.idle.remove(t)
344            except ValueError:
345                pass
346            self.running.add(t)
347
348    def stop(self, t):
349        with self.lock:
350            self.idle.append(t)
351            try:
352                self.running.remove(t)
353            except KeyError:
354                pass
355
356    def __call__(self, payload):
357        """
358        Pass payload to a thread for immediate execution.
359
360        Returns a boolean indicating if a thread is available.
361        """
362        with self.lock:
363            if len(self.running) == self.size:
364                # no worker available
365                return False
366            idle_num = len(self.idle)
367            if idle_num:
368                # use an existing (idle) thread
369                t = self.idle.pop(0)
370            else:
371                # create a new thread
372                t = self.worker_cls(self)
373            self.start(t)
374        t.payload = payload
375        t.run_lock.release()
376        if not idle_num:
377            t.start()
378        return True
379
380
381_dancer = SimplePool(5, worker_cls=SoftWorker)
382
383def dancer_size(size):
384    _dancer.grow(size)
385
386_launcher = SimplePool(5)
387
388def launcher_size(size):
389    _launcher.grow(size)
390
391def crondance(applications_parent, ctype='hard', startup=False, apps=None):
392    """
393    Does the periodic job of cron service: read the crontab(s) and launch
394    the various commands.
395    """
396    apppath = os.path.join(applications_parent, 'applications')
397    token = Token(applications_parent)
398    cronmaster = token.acquire(startup=startup)
399    if not cronmaster:
400        return
401    try:
402        now_s = time.localtime()
403        checks = (('min', now_s.tm_min),
404                  ('hr', now_s.tm_hour),
405                  ('mon', now_s.tm_mon),
406                  ('dom', now_s.tm_mday),
407                  ('dow', (now_s.tm_wday + 1) % 7))
408        logger = getLogger(logger_name)
409
410        if not apps:
411            apps = [x for x in os.listdir(apppath)
412                    if os.path.isdir(os.path.join(apppath, x))]
413
414        full_apath_links = set()
415
416        if sys.executable.lower().endswith('pythonservice.exe'):
417            _python_exe = os.path.join(sys.exec_prefix, 'python.exe')
418        else:
419            _python_exe = sys.executable
420        base_commands = [_python_exe]
421        w2p_path = fileutils.abspath('web2py.py', gluon=True)
422        if os.path.exists(w2p_path):
423            base_commands.append(w2p_path)
424        base_commands.extend(('--cron_job', '--no_banner', '--no_gui', '--plain'))
425
426        for app in apps:
427            if _stopping:
428                break
429            apath = os.path.join(apppath, app)
430
431            # if app is a symbolic link to other app, skip it
432            full_apath_link = absolute_path_link(apath)
433            if full_apath_link in full_apath_links:
434                continue
435            else:
436                full_apath_links.add(full_apath_link)
437
438            cronpath = os.path.join(apath, 'cron')
439            crontab = os.path.join(cronpath, 'crontab')
440            if not os.path.exists(crontab):
441                continue
442            try:
443                cronlines = [line.strip() for line in fileutils.readlines_file(crontab, 'rt')]
444                lines = [line for line in cronlines if line and not line.startswith('#')]
445                tasks = [parsecronline(cline) for cline in lines]
446            except Exception as e:
447                logger.error('crontab read error %s', e)
448                continue
449
450            for task in tasks:
451                if _stopping:
452                    break
453                if not task:
454                    continue
455                task_min = task.get('min', [])
456                if not startup and task_min == [-1]:
457                    continue
458                citems = [(k in task and not v in task[k]) for k, v in checks]
459                if task_min != [-1] and reduce(lambda a, b: a or b, citems):
460                    continue
461
462                logger.info('%s cron: %s executing %r in %s at %s',
463                    ctype, app, task.get('cmd'),
464                    os.getcwd(), datetime.datetime.now())
465                action = models = False
466                command = task['cmd']
467                if command.startswith('**'):
468                    action = True
469                    command = command[2:]
470                elif command.startswith('*'):
471                    action = models = True
472                    command = command[1:]
473
474                if action:
475                    commands = base_commands[:]
476                    if command.endswith('.py'):
477                        commands.extend(('-S', app, '-R', command))
478                    else:
479                        commands.extend(('-S', app + '/' + command))
480                    if models:
481                        commands.append('-M')
482                else:
483                    commands = shlex.split(command)
484
485                try:
486                    if not _launcher(commands):
487                        logger.warning('no thread available, cannot execute %r', task['cmd'])
488                except Exception:
489                    logger.exception('error executing %r', task['cmd'])
490    finally:
491        token.release()
Note: See TracBrowser for help on using the repository browser.