From ee183f6ad36ec2b47efa7dd420ca8e61a69112cd Mon Sep 17 00:00:00 2001 From: Natalia Serrano Date: Thu, 31 Jul 2025 10:18:19 +0200 Subject: [PATCH 1/3] refs #2556 change "jobid" for "job_id" --- src/opengnsys/jobmgr.py | 26 +++++++++---------- .../modules/client/OpenGnSys/__init__.py | 6 ++--- .../modules/server/OpenGnSys/__init__.py | 16 ++++++------ 3 files changed, 24 insertions(+), 24 deletions(-) diff --git a/src/opengnsys/jobmgr.py b/src/opengnsys/jobmgr.py index 78e1e64..fe5c093 100644 --- a/src/opengnsys/jobmgr.py +++ b/src/opengnsys/jobmgr.py @@ -23,32 +23,32 @@ class JobMgr(): logger.debug ('args "{}"'.format (args)) now = datetime.now (tz=timezone.utc) ts = now.strftime ('%Y-%m-%d %H:%M:%S.%f%z') ## '%s' doesn't work on windows - jobid = hashlib.sha256 (now.isoformat().encode('UTF-8') + script.encode ('UTF-8')).hexdigest()[0:12] + job_id = hashlib.sha256 (now.isoformat().encode('UTF-8') + script.encode ('UTF-8')).hexdigest()[0:12] p = subprocess.Popen (args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - self.jobs[jobid] = { 'p': p, 'pid': p.pid, 'starttime': ts, 'script': script, 'client': is_client, 'status': 'running', 'stdout': '', 'stderr': '' } - self.jobs[jobid]['t1'] = threading.Thread (target=job_readstdout, args=(self.jobs[jobid],)) - self.jobs[jobid]['t2'] = threading.Thread (target=job_readstderr, args=(self.jobs[jobid],)) - self.jobs[jobid]['t1'].start() - self.jobs[jobid]['t2'].start() + self.jobs[job_id] = { 'p': p, 'pid': p.pid, 'starttime': ts, 'script': script, 'client': is_client, 'status': 'running', 'stdout': '', 'stderr': '' } + self.jobs[job_id]['t1'] = threading.Thread (target=job_readstdout, args=(self.jobs[job_id],)) + self.jobs[job_id]['t2'] = threading.Thread (target=job_readstderr, args=(self.jobs[job_id],)) + self.jobs[job_id]['t1'].start() + self.jobs[job_id]['t2'].start() logger.debug ('jobs "{}"'.format (self.jobs)) - return jobid + return job_id def prepare_jobs(self): ## can't return self.jobs because the Popen object at self.jobs[id]['p'] is not serializable. So, need to create a new dict to return st = [] - for jobid in self.jobs: - j = self.jobs[jobid] + for job_id in self.jobs: + j = self.jobs[job_id] entry = dict ((k, j[k]) for k in ['pid', 'starttime', 'script', 'client', 'status', 'stdout', 'stderr']) - entry['jobid'] = jobid + entry['job_id'] = job_id if j['p'].poll() is not None: ## process finished entry['rc'] = j['p'].returncode entry['status'] = 'finished' st.append (entry) return st - def terminate_job(self, jobid): - if jobid not in self.jobs: return {} - p = self.jobs[jobid]['p'] + def terminate_job(self, job_id): + if job_id not in self.jobs: return {} + p = self.jobs[job_id]['p'] p.terminate() time.sleep (1) if p.poll() is not None: diff --git a/src/opengnsys/modules/client/OpenGnSys/__init__.py b/src/opengnsys/modules/client/OpenGnSys/__init__.py index ce47254..39c3928 100644 --- a/src/opengnsys/modules/client/OpenGnSys/__init__.py +++ b/src/opengnsys/modules/client/OpenGnSys/__init__.py @@ -54,9 +54,9 @@ class OpenGnSysWorker(ClientWorker): #self.sendServerMessage('script', {'op', 'launched'}) def process_terminatescript(self, json_params): - jobid = json_params['jobid'] - logger.debug('Processing terminatescript request, jobid "{}"'.format (jobid)) - self.jobmgr.terminate_job (jobid) + job_id = json_params['job_id'] + logger.debug('Processing terminatescript request, job_id "{}"'.format (job_id)) + self.jobmgr.terminate_job (job_id) def process_preparescripts(self, json_params): logger.debug('Processing preparescripts request') diff --git a/src/opengnsys/modules/server/OpenGnSys/__init__.py b/src/opengnsys/modules/server/OpenGnSys/__init__.py index eb6e1a4..350b4ca 100644 --- a/src/opengnsys/modules/server/OpenGnSys/__init__.py +++ b/src/opengnsys/modules/server/OpenGnSys/__init__.py @@ -369,24 +369,24 @@ class OpenGnSysWorker(ServerWorker): logger.debug('received script "{}"'.format(script)) if post_params.get('client', 'false') == 'false': - jobid = self.jobmgr.launch_job (script, False) - return {'op': 'launched', 'jobid': jobid} + job_id = self.jobmgr.launch_job (script, False) + return {'op': 'launched', 'job_id': job_id} else: ## post_params.get('client') is not 'false' ## send script as-is self.sendClientMessage('script', {'code': script}) - #return {'op': 'launched', 'jobid': jobid} ## TODO obtain jobid generated at the client (can it be done?) + #return {'op': 'launched', 'job_id': job_id} ## TODO obtain job_id generated at the client (can it be done?) return {'op': 'launched'} @execution_level('full') @check_secret def process_terminatescript(self, path, get_params, post_params, server): - jobid = post_params.get('jobid', None) - logger.debug('Processing terminate_script request, jobid "{}"'.format (jobid)) - if jobid is None: + job_id = post_params.get('job_id', None) + logger.debug('Processing terminate_script request, job_id "{}"'.format (job_id)) + if job_id is None: return {} - self.sendClientMessage('terminatescript', {'jobid': jobid}) - self.jobmgr.terminate_job (jobid) + self.sendClientMessage('terminatescript', {'job_id': job_id}) + self.jobmgr.terminate_job (job_id) return {} @execution_level('full') From e783d7c1faacdf23fc54538e237237a7c300ac29 Mon Sep 17 00:00:00 2001 From: Natalia Serrano Date: Thu, 31 Jul 2025 10:34:01 +0200 Subject: [PATCH 2/3] refs #2554 wait for zombies --- src/opengnsys/workers/oglive_worker.py | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/src/opengnsys/workers/oglive_worker.py b/src/opengnsys/workers/oglive_worker.py index 6381dfc..e753f5c 100644 --- a/src/opengnsys/workers/oglive_worker.py +++ b/src/opengnsys/workers/oglive_worker.py @@ -35,6 +35,7 @@ import re import time try: import dbus ## don't fail on windows (the worker will later refuse to load anyway) except: pass +import select import random import subprocess import threading @@ -292,22 +293,26 @@ class ogLiveWorker(ServerWorker): pass sout = serr = '' - poll_iterations = 1 - while p.poll() is None: - for l in iter (p.stdout.readline, b''): + finished = False + while True: + try: + p.wait (0.05) + finished = True + except subprocess.TimeoutExpired: + pass + + ready_to_read, _, _ = select.select ([p.stdout, p.stderr], [], [], 0.2) + if p.stdout in ready_to_read: + l = p.stdout.readline() partial = l.decode ('utf-8', 'ignore') if self.stdout_q: self.stdout_q.put (partial) sout += partial - for l in iter (p.stderr.readline, b''): + if p.stderr in ready_to_read: + l = p.stderr.readline() partial = l.decode ('utf-8', 'ignore') serr += partial - ## poll quickly at first, then poll less frequently - if poll_iterations > 15: sleep_time = 1 - elif poll_iterations > 10: sleep_time = 0.2 - else: sleep_time = 0.1 - time.sleep (sleep_time) - poll_iterations += 1 + if finished: break sout = sout.strip() serr = serr.strip() From 75d222c42517291a09d09f7d75b6eb32efd0cb98 Mon Sep 17 00:00:00 2001 From: Natalia Serrano Date: Thu, 31 Jul 2025 10:36:00 +0200 Subject: [PATCH 3/3] refs #2554 wait for zombies --- CHANGELOG.md | 10 ++++++++++ linux/debian/changelog | 7 +++++++ src/VERSION | 2 +- 3 files changed, 18 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ab407f9..bedb8d9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,16 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [7.3.0] - 2025-07-31 + +### Fixed + +- Wait for zombies + +### Changed + +- Change "jobid" for "job_id" for consistency + ## [7.2.2] - 2025-07-30 ### Added diff --git a/linux/debian/changelog b/linux/debian/changelog index b4df92e..7437ef0 100644 --- a/linux/debian/changelog +++ b/linux/debian/changelog @@ -1,3 +1,10 @@ +ogagent (7.3.0-1) stable; urgency=medium + + * Wait for zombies + * Change "jobid" for "job_id" for consistency + + -- OpenGnsys developers Thu, 31 Jul 2025 10:35:16 +0200 + ogagent (7.2.2-1) stable; urgency=medium * Add missing file stop-agent.ps1 diff --git a/src/VERSION b/src/VERSION index 77f5bec..1502020 100644 --- a/src/VERSION +++ b/src/VERSION @@ -1 +1 @@ -7.2.2 +7.3.0