From def6750cd1f91229d075a3eaadc6f3e39f3f8a04 Mon Sep 17 00:00:00 2001 From: Natalia Serrano Date: Tue, 8 Oct 2024 17:55:51 +0200 Subject: [PATCH] refs #880 monitor running threads --- ogcore-mock.py | 10 +++++ .../modules/server/CloningEngine/__init__.py | 2 - .../modules/server/ogAdmClient/__init__.py | 2 - src/opengnsys/workers/oglive_worker.py | 38 ++++++++++++------- 4 files changed, 34 insertions(+), 18 deletions(-) diff --git a/ogcore-mock.py b/ogcore-mock.py index fc51888..716e52f 100644 --- a/ogcore-mock.py +++ b/ogcore-mock.py @@ -178,6 +178,11 @@ def oac_recibe_archivo(): logging.info(f'dec ({dec})') return jsonify({'anything':'anything'}) ## if we return {}, then we trigger "if not {}" which happens to be true +@app.route('/opengnsys/rest/ogAdmClient/callback', methods=['POST']) +def oac_callback(): + logging.info(f'{request.get_json()}') + return jsonify({'anything':'anything'}) + @app.route('/opengnsys/rest/ogAdmClient/', methods=['GET', 'POST']) def oac_cucu(cucu): #j = request.get_json(force=True) @@ -202,6 +207,11 @@ def ce_recibe_archivo(): logging.info(f'dec ({dec})') return jsonify({'anything':'anything'}) ## if we return {}, then we trigger "if not {}" which happens to be true +@app.route('/opengnsys/rest/CloningEngine/callback', methods=['POST']) +def ce_callback(): + logging.info(f'{request.get_json()}') + return jsonify({'anything':'anything'}) + @app.route('/opengnsys/rest/CloningEngine/', methods=['GET', 'POST']) def ce_cucu(cucu): abort (404) diff --git a/src/opengnsys/modules/server/CloningEngine/__init__.py b/src/opengnsys/modules/server/CloningEngine/__init__.py index 3eaff50..41e310f 100644 --- a/src/opengnsys/modules/server/CloningEngine/__init__.py +++ b/src/opengnsys/modules/server/CloningEngine/__init__.py @@ -176,8 +176,6 @@ class CloningEngineWorker (ogLiveWorker): return self.respuestaEjecucionComando (cmd, herror, ids) def process_status (self, path, get_params, post_params, server): - self._join_threads() - thr_status = {} for k in self.thread_list: thr_status[k] = { diff --git a/src/opengnsys/modules/server/ogAdmClient/__init__.py b/src/opengnsys/modules/server/ogAdmClient/__init__.py index 2417294..9b6939a 100644 --- a/src/opengnsys/modules/server/ogAdmClient/__init__.py +++ b/src/opengnsys/modules/server/ogAdmClient/__init__.py @@ -387,8 +387,6 @@ class ogAdmClientWorker (ogLiveWorker): @check_secret def process_status (self, path, get_params, post_params, server): - self._join_threads() - thr_status = {} for k in self.thread_list: thr_status[k] = { diff --git a/src/opengnsys/workers/oglive_worker.py b/src/opengnsys/workers/oglive_worker.py index e2e948b..c76d867 100644 --- a/src/opengnsys/workers/oglive_worker.py +++ b/src/opengnsys/workers/oglive_worker.py @@ -58,6 +58,28 @@ class ThreadWithResult (threading.Thread): class ogLiveWorker(ServerWorker): thread_list = {} + def notifier (self, result): + logger.debug (f'notifier() called, result ({result})') + res = self.REST.sendMessage ('/'.join ([self.name, 'callback']), result) + + def mon (self): + while True: + #print ('mon(): iterating') + for k in self.thread_list: + elem = self.thread_list[k] + if 'thread' not in elem: continue + logger.debug (f'considering thread ({k})') + try: elem['thread'].join (0.05) + except RuntimeError: pass ## race condition: a thread is created and this code runs before it is start()ed + if not elem['thread'].is_alive(): + logger.debug (f'is no longer alive, k ({k}) thread ({elem["thread"]})') + elem['running'] = False + elem['result'] = elem['thread'].result + del elem['thread'] + self.notifier (elem['result']) + + time.sleep (1) + def interfaceAdmin (self, method, parametros=[]): exe = '{}/{}'.format (self.pathinterface, method) ## for development only. Will be removed when the referenced bash code (/opt/opengnsys/lib/engine/bin/*.lib) is translated into python @@ -193,9 +215,9 @@ class ogLiveWorker(ServerWorker): if not self.tomaIPlocal(): raise Exception ('Se han generado errores. No se puede continuar la ejecución de este módulo') - def _long_running_job (self, name, f, args): - self._join_threads() + threading.Thread (name='monitoring_thread', target=self.mon, daemon=True).start() + def _long_running_job (self, name, f, args): any_job_running = False for k in self.thread_list: if self.thread_list[k]['running']: @@ -214,15 +236,3 @@ class ogLiveWorker(ServerWorker): } self.thread_list[job_id]['thread'].start() return { 'job_id': job_id } - - def _join_threads (self): - for k in self.thread_list: - logger.debug (f'considering thread ({k})') - elem = self.thread_list[k] - if 'thread' in elem: - elem['thread'].join (0.05) - if not elem['thread'].is_alive(): - logger.debug (f'is no longer alive, k ({k}) thread ({elem["thread"]})') - elem['running'] = False - elem['result'] = elem['thread'].result - del elem['thread']