1 | import threading |
---|
2 | import subprocess |
---|
3 | import hashlib |
---|
4 | import time |
---|
5 | from datetime import datetime, timezone |
---|
6 | from opengnsys import operations |
---|
7 | from opengnsys.log import logger |
---|
8 | |
---|
9 | def job_readstdout(job): |
---|
10 | for l in iter(job['p'].stdout.readline, b''): |
---|
11 | job['stdout'] += l.decode ('utf-8', 'ignore') |
---|
12 | |
---|
13 | def job_readstderr(job): |
---|
14 | for l in iter(job['p'].stderr.readline, b''): |
---|
15 | job['stderr'] += l.decode ('utf-8', 'ignore') |
---|
16 | |
---|
17 | class JobMgr(): |
---|
18 | jobs = {} |
---|
19 | |
---|
20 | def launch_job(self, script, is_client): |
---|
21 | logger.debug ('in launch_job(), is_client "{}"'.format(is_client)) |
---|
22 | args = operations.build_popen_args (script) |
---|
23 | logger.debug ('args "{}"'.format (args)) |
---|
24 | now = datetime.now (tz=timezone.utc) |
---|
25 | ts = now.strftime ('%Y-%m-%d %H:%M:%S.%f%z') ## '%s' doesn't work on windows |
---|
26 | jobid = hashlib.sha256 (now.isoformat().encode('UTF-8') + script.encode ('UTF-8')).hexdigest()[0:12] |
---|
27 | p = subprocess.Popen (args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
---|
28 | self.jobs[jobid] = { 'p': p, 'pid': p.pid, 'starttime': ts, 'script': script, 'client': is_client, 'status': 'running', 'stdout': '', 'stderr': '' } |
---|
29 | self.jobs[jobid]['t1'] = threading.Thread (target=job_readstdout, args=(self.jobs[jobid],)) |
---|
30 | self.jobs[jobid]['t2'] = threading.Thread (target=job_readstderr, args=(self.jobs[jobid],)) |
---|
31 | self.jobs[jobid]['t1'].start() |
---|
32 | self.jobs[jobid]['t2'].start() |
---|
33 | logger.debug ('jobs "{}"'.format (self.jobs)) |
---|
34 | return jobid |
---|
35 | |
---|
36 | def prepare_jobs(self): |
---|
37 | ## can't return self.jobs because the Popen object at self.jobs[id]['p'] is not serializable. So, need to create a new dict to return |
---|
38 | st = [] |
---|
39 | for jobid in self.jobs: |
---|
40 | j = self.jobs[jobid] |
---|
41 | entry = dict ((k, j[k]) for k in ['pid', 'starttime', 'script', 'client', 'status', 'stdout', 'stderr']) |
---|
42 | entry['jobid'] = jobid |
---|
43 | if j['p'].poll() is not None: ## process finished |
---|
44 | entry['rc'] = j['p'].returncode |
---|
45 | entry['status'] = 'finished' |
---|
46 | st.append (entry) |
---|
47 | return st |
---|
48 | |
---|
49 | def terminate_job(self, jobid): |
---|
50 | if jobid not in self.jobs: return {} |
---|
51 | p = self.jobs[jobid]['p'] |
---|
52 | p.terminate() |
---|
53 | time.sleep (1) |
---|
54 | if p.poll() is not None: |
---|
55 | return { 'terminated': True } |
---|
56 | |
---|
57 | p.kill() |
---|
58 | time.sleep (1) |
---|
59 | if p.poll() is not None: |
---|
60 | return { 'killed': True } |
---|
61 | |
---|
62 | return { 'killed': False } |
---|