ogagent/src/opengnsys/jobmgr.py

63 lines
2.5 KiB
Python

import threading
import subprocess
import hashlib
import time
from datetime import datetime, timezone
from opengnsys import operations
from opengnsys.log import logger
def job_readstdout(job):
for l in iter(job['p'].stdout.readline, b''):
job['stdout'] += l.decode ('utf-8', 'ignore')
def job_readstderr(job):
for l in iter(job['p'].stderr.readline, b''):
job['stderr'] += l.decode ('utf-8', 'ignore')
class JobMgr():
jobs = {}
def launch_job(self, script, is_client):
logger.debug ('in launch_job(), is_client "{}"'.format(is_client))
args = operations.build_popen_args (script)
logger.debug ('args "{}"'.format (args))
now = datetime.now (tz=timezone.utc)
ts = now.strftime ('%Y-%m-%d %H:%M:%S.%f%z') ## '%s' doesn't work on windows
jobid = hashlib.sha256 (now.isoformat().encode('UTF-8') + script.encode ('UTF-8')).hexdigest()[0:12]
p = subprocess.Popen (args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
self.jobs[jobid] = { 'p': p, 'pid': p.pid, 'starttime': ts, 'script': script, 'client': is_client, 'status': 'running', 'stdout': '', 'stderr': '' }
self.jobs[jobid]['t1'] = threading.Thread (target=job_readstdout, args=(self.jobs[jobid],))
self.jobs[jobid]['t2'] = threading.Thread (target=job_readstderr, args=(self.jobs[jobid],))
self.jobs[jobid]['t1'].start()
self.jobs[jobid]['t2'].start()
logger.debug ('jobs "{}"'.format (self.jobs))
return jobid
def prepare_jobs(self):
## can't return self.jobs because the Popen object at self.jobs[id]['p'] is not serializable. So, need to create a new dict to return
st = []
for jobid in self.jobs:
j = self.jobs[jobid]
entry = dict ((k, j[k]) for k in ['pid', 'starttime', 'script', 'client', 'status', 'stdout', 'stderr'])
entry['jobid'] = jobid
if j['p'].poll() is not None: ## process finished
entry['rc'] = j['p'].returncode
entry['status'] = 'finished'
st.append (entry)
return st
def terminate_job(self, jobid):
if jobid not in self.jobs: return {}
p = self.jobs[jobid]['p']
p.terminate()
time.sleep (1)
if p.poll() is not None:
return { 'terminated': True }
p.kill()
time.sleep (1)
if p.poll() is not None:
return { 'killed': True }
return { 'killed': False }