Merge pull request 'jobid-wait-zombies' (#51) from jobid-wait-zombies into main

Reviewed-on: #51
main 7.3.0
Natalia Serrano 2025-07-31 10:54:26 +02:00
commit 33f65a45b7
7 changed files with 57 additions and 35 deletions

View File

@ -6,6 +6,16 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [7.3.0] - 2025-07-31
### Fixed
- Wait for zombies
### Changed
- Change "jobid" for "job_id" for consistency
## [7.2.2] - 2025-07-30
### Added

View File

@ -1,3 +1,10 @@
ogagent (7.3.0-1) stable; urgency=medium
* Wait for zombies
* Change "jobid" for "job_id" for consistency
-- OpenGnsys developers <info@opengnsys.es> Thu, 31 Jul 2025 10:35:16 +0200
ogagent (7.2.2-1) stable; urgency=medium
* Add missing file stop-agent.ps1

View File

@ -1 +1 @@
7.2.2
7.3.0

View File

@ -23,32 +23,32 @@ class JobMgr():
logger.debug ('args "{}"'.format (args))
now = datetime.now (tz=timezone.utc)
ts = now.strftime ('%Y-%m-%d %H:%M:%S.%f%z') ## '%s' doesn't work on windows
jobid = hashlib.sha256 (now.isoformat().encode('UTF-8') + script.encode ('UTF-8')).hexdigest()[0:12]
job_id = hashlib.sha256 (now.isoformat().encode('UTF-8') + script.encode ('UTF-8')).hexdigest()[0:12]
p = subprocess.Popen (args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
self.jobs[jobid] = { 'p': p, 'pid': p.pid, 'starttime': ts, 'script': script, 'client': is_client, 'status': 'running', 'stdout': '', 'stderr': '' }
self.jobs[jobid]['t1'] = threading.Thread (target=job_readstdout, args=(self.jobs[jobid],))
self.jobs[jobid]['t2'] = threading.Thread (target=job_readstderr, args=(self.jobs[jobid],))
self.jobs[jobid]['t1'].start()
self.jobs[jobid]['t2'].start()
self.jobs[job_id] = { 'p': p, 'pid': p.pid, 'starttime': ts, 'script': script, 'client': is_client, 'status': 'running', 'stdout': '', 'stderr': '' }
self.jobs[job_id]['t1'] = threading.Thread (target=job_readstdout, args=(self.jobs[job_id],))
self.jobs[job_id]['t2'] = threading.Thread (target=job_readstderr, args=(self.jobs[job_id],))
self.jobs[job_id]['t1'].start()
self.jobs[job_id]['t2'].start()
logger.debug ('jobs "{}"'.format (self.jobs))
return jobid
return job_id
def prepare_jobs(self):
## can't return self.jobs because the Popen object at self.jobs[id]['p'] is not serializable. So, need to create a new dict to return
st = []
for jobid in self.jobs:
j = self.jobs[jobid]
for job_id in self.jobs:
j = self.jobs[job_id]
entry = dict ((k, j[k]) for k in ['pid', 'starttime', 'script', 'client', 'status', 'stdout', 'stderr'])
entry['jobid'] = jobid
entry['job_id'] = job_id
if j['p'].poll() is not None: ## process finished
entry['rc'] = j['p'].returncode
entry['status'] = 'finished'
st.append (entry)
return st
def terminate_job(self, jobid):
if jobid not in self.jobs: return {}
p = self.jobs[jobid]['p']
def terminate_job(self, job_id):
if job_id not in self.jobs: return {}
p = self.jobs[job_id]['p']
p.terminate()
time.sleep (1)
if p.poll() is not None:

View File

@ -54,9 +54,9 @@ class OpenGnSysWorker(ClientWorker):
#self.sendServerMessage('script', {'op', 'launched'})
def process_terminatescript(self, json_params):
jobid = json_params['jobid']
logger.debug('Processing terminatescript request, jobid "{}"'.format (jobid))
self.jobmgr.terminate_job (jobid)
job_id = json_params['job_id']
logger.debug('Processing terminatescript request, job_id "{}"'.format (job_id))
self.jobmgr.terminate_job (job_id)
def process_preparescripts(self, json_params):
logger.debug('Processing preparescripts request')

View File

@ -369,24 +369,24 @@ class OpenGnSysWorker(ServerWorker):
logger.debug('received script "{}"'.format(script))
if post_params.get('client', 'false') == 'false':
jobid = self.jobmgr.launch_job (script, False)
return {'op': 'launched', 'jobid': jobid}
job_id = self.jobmgr.launch_job (script, False)
return {'op': 'launched', 'job_id': job_id}
else: ## post_params.get('client') is not 'false'
## send script as-is
self.sendClientMessage('script', {'code': script})
#return {'op': 'launched', 'jobid': jobid} ## TODO obtain jobid generated at the client (can it be done?)
#return {'op': 'launched', 'job_id': job_id} ## TODO obtain job_id generated at the client (can it be done?)
return {'op': 'launched'}
@execution_level('full')
@check_secret
def process_terminatescript(self, path, get_params, post_params, server):
jobid = post_params.get('jobid', None)
logger.debug('Processing terminate_script request, jobid "{}"'.format (jobid))
if jobid is None:
job_id = post_params.get('job_id', None)
logger.debug('Processing terminate_script request, job_id "{}"'.format (job_id))
if job_id is None:
return {}
self.sendClientMessage('terminatescript', {'jobid': jobid})
self.jobmgr.terminate_job (jobid)
self.sendClientMessage('terminatescript', {'job_id': job_id})
self.jobmgr.terminate_job (job_id)
return {}
@execution_level('full')

View File

@ -35,6 +35,7 @@ import re
import time
try: import dbus ## don't fail on windows (the worker will later refuse to load anyway)
except: pass
import select
import random
import subprocess
import threading
@ -292,22 +293,26 @@ class ogLiveWorker(ServerWorker):
pass
sout = serr = ''
poll_iterations = 1
while p.poll() is None:
for l in iter (p.stdout.readline, b''):
finished = False
while True:
try:
p.wait (0.05)
finished = True
except subprocess.TimeoutExpired:
pass
ready_to_read, _, _ = select.select ([p.stdout, p.stderr], [], [], 0.2)
if p.stdout in ready_to_read:
l = p.stdout.readline()
partial = l.decode ('utf-8', 'ignore')
if self.stdout_q: self.stdout_q.put (partial)
sout += partial
for l in iter (p.stderr.readline, b''):
if p.stderr in ready_to_read:
l = p.stderr.readline()
partial = l.decode ('utf-8', 'ignore')
serr += partial
## poll quickly at first, then poll less frequently
if poll_iterations > 15: sleep_time = 1
elif poll_iterations > 10: sleep_time = 0.2
else: sleep_time = 0.1
time.sleep (sleep_time)
poll_iterations += 1
if finished: break
sout = sout.strip()
serr = serr.strip()