[d4e21da] | 1 | import threading |
---|
[8c6a652] | 2 | import subprocess |
---|
| 3 | import hashlib |
---|
[d4e21da] | 4 | import time |
---|
[8c6a652] | 5 | from datetime import datetime, timezone |
---|
| 6 | from opengnsys import operations |
---|
| 7 | from opengnsys.log import logger |
---|
| 8 | |
---|
| 9 | def job_readstdout(job): |
---|
| 10 | for l in iter(job['p'].stdout.readline, b''): |
---|
| 11 | job['stdout'] += l.decode ('utf-8', 'ignore') |
---|
| 12 | |
---|
| 13 | def job_readstderr(job): |
---|
| 14 | for l in iter(job['p'].stderr.readline, b''): |
---|
| 15 | job['stderr'] += l.decode ('utf-8', 'ignore') |
---|
| 16 | |
---|
| 17 | class JobMgr(): |
---|
| 18 | jobs = {} |
---|
| 19 | |
---|
| 20 | def launch_job(self, script, is_client): |
---|
| 21 | logger.debug ('in launch_job(), is_client "{}"'.format(is_client)) |
---|
| 22 | args = operations.build_popen_args (script) |
---|
| 23 | logger.debug ('args "{}"'.format (args)) |
---|
| 24 | now = datetime.now (tz=timezone.utc) |
---|
[da7dd41] | 25 | ts = now.strftime ('%Y-%m-%d %H:%M:%S.%f%z') ## '%s' doesn't work on windows |
---|
[08f7e44] | 26 | jobid = hashlib.sha256 (now.isoformat().encode('UTF-8') + script.encode ('UTF-8')).hexdigest()[0:12] |
---|
[8c6a652] | 27 | p = subprocess.Popen (args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
---|
| 28 | self.jobs[jobid] = { 'p': p, 'pid': p.pid, 'starttime': ts, 'script': script, 'client': is_client, 'status': 'running', 'stdout': '', 'stderr': '' } |
---|
| 29 | self.jobs[jobid]['t1'] = threading.Thread (target=job_readstdout, args=(self.jobs[jobid],)) |
---|
| 30 | self.jobs[jobid]['t2'] = threading.Thread (target=job_readstderr, args=(self.jobs[jobid],)) |
---|
| 31 | self.jobs[jobid]['t1'].start() |
---|
| 32 | self.jobs[jobid]['t2'].start() |
---|
| 33 | logger.debug ('jobs "{}"'.format (self.jobs)) |
---|
| 34 | return jobid |
---|
| 35 | |
---|
| 36 | def prepare_jobs(self): |
---|
| 37 | ## can't return self.jobs because the Popen object at self.jobs[id]['p'] is not serializable. So, need to create a new dict to return |
---|
[d4e21da] | 38 | st = [] |
---|
[8c6a652] | 39 | for jobid in self.jobs: |
---|
| 40 | j = self.jobs[jobid] |
---|
[d4e21da] | 41 | entry = dict ((k, j[k]) for k in ['pid', 'starttime', 'script', 'client', 'status', 'stdout', 'stderr']) |
---|
| 42 | entry['jobid'] = jobid |
---|
[8c6a652] | 43 | if j['p'].poll() is not None: ## process finished |
---|
[d4e21da] | 44 | entry['rc'] = j['p'].returncode |
---|
| 45 | entry['status'] = 'finished' |
---|
| 46 | st.append (entry) |
---|
[8c6a652] | 47 | return st |
---|
| 48 | |
---|
[d4e21da] | 49 | def terminate_job(self, jobid): |
---|
| 50 | if jobid not in self.jobs: return {} |
---|
| 51 | p = self.jobs[jobid]['p'] |
---|
| 52 | p.terminate() |
---|
| 53 | time.sleep (1) |
---|
| 54 | if p.poll() is not None: |
---|
| 55 | return { 'terminated': True } |
---|
| 56 | |
---|
| 57 | p.kill() |
---|
| 58 | time.sleep (1) |
---|
| 59 | if p.poll() is not None: |
---|
| 60 | return { 'killed': True } |
---|
| 61 | |
---|
| 62 | return { 'killed': False } |
---|