source: ogAgent-Git/src/opengnsys/jobmgr.py @ c39b253

maintls 4.0.0
Last change on this file since c39b253 was 08f7e44, checked in by Natalia Serrano <natalia.serrano@…>, 9 months ago

refs #500 make longer IDs to avoid collissions

  • Property mode set to 100644
File size: 2.5 KB
Line 
1import threading
2import subprocess
3import hashlib
4import time
5from datetime import datetime, timezone
6from opengnsys import operations
7from opengnsys.log import logger
8
9def job_readstdout(job):
10    for l in iter(job['p'].stdout.readline, b''):
11        job['stdout'] += l.decode ('utf-8', 'ignore')
12
13def job_readstderr(job):
14    for l in iter(job['p'].stderr.readline, b''):
15        job['stderr'] += l.decode ('utf-8', 'ignore')
16
17class JobMgr():
18    jobs = {}
19
20    def launch_job(self, script, is_client):
21        logger.debug ('in launch_job(), is_client "{}"'.format(is_client))
22        args = operations.build_popen_args (script)
23        logger.debug ('args "{}"'.format (args))
24        now = datetime.now (tz=timezone.utc)
25        ts = now.strftime ('%Y-%m-%d %H:%M:%S.%f%z')    ## '%s' doesn't work on windows
26        jobid = hashlib.sha256 (now.isoformat().encode('UTF-8') + script.encode ('UTF-8')).hexdigest()[0:12]
27        p = subprocess.Popen (args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
28        self.jobs[jobid] = { 'p': p, 'pid': p.pid, 'starttime': ts, 'script': script, 'client': is_client, 'status': 'running', 'stdout': '', 'stderr': '' }
29        self.jobs[jobid]['t1'] = threading.Thread (target=job_readstdout, args=(self.jobs[jobid],))
30        self.jobs[jobid]['t2'] = threading.Thread (target=job_readstderr, args=(self.jobs[jobid],))
31        self.jobs[jobid]['t1'].start()
32        self.jobs[jobid]['t2'].start()
33        logger.debug ('jobs "{}"'.format (self.jobs))
34        return jobid
35
36    def prepare_jobs(self):
37        ## can't return self.jobs because the Popen object at self.jobs[id]['p'] is not serializable. So, need to create a new dict to return
38        st = []
39        for jobid in self.jobs:
40            j = self.jobs[jobid]
41            entry = dict ((k, j[k]) for k in ['pid', 'starttime', 'script', 'client', 'status', 'stdout', 'stderr'])
42            entry['jobid'] = jobid
43            if j['p'].poll() is not None:    ## process finished
44                entry['rc'] = j['p'].returncode
45                entry['status'] = 'finished'
46            st.append (entry)
47        return st
48
49    def terminate_job(self, jobid):
50        if jobid not in self.jobs: return {}
51        p = self.jobs[jobid]['p']
52        p.terminate()
53        time.sleep (1)
54        if p.poll() is not None:
55            return { 'terminated': True }
56
57        p.kill()
58        time.sleep (1)
59        if p.poll() is not None:
60            return { 'killed': True }
61
62        return { 'killed': False }
Note: See TracBrowser for help on using the repository browser.