refs #880 monitor running threads

pull/10/head
Natalia Serrano 2024-10-08 17:55:51 +02:00
parent e1bd063bde
commit def6750cd1
4 changed files with 34 additions and 18 deletions

View File

@ -178,6 +178,11 @@ def oac_recibe_archivo():
logging.info(f'dec ({dec})') logging.info(f'dec ({dec})')
return jsonify({'anything':'anything'}) ## if we return {}, then we trigger "if not {}" which happens to be true return jsonify({'anything':'anything'}) ## if we return {}, then we trigger "if not {}" which happens to be true
@app.route('/opengnsys/rest/ogAdmClient/callback', methods=['POST'])
def oac_callback():
logging.info(f'{request.get_json()}')
return jsonify({'anything':'anything'})
@app.route('/opengnsys/rest/ogAdmClient/<cucu>', methods=['GET', 'POST']) @app.route('/opengnsys/rest/ogAdmClient/<cucu>', methods=['GET', 'POST'])
def oac_cucu(cucu): def oac_cucu(cucu):
#j = request.get_json(force=True) #j = request.get_json(force=True)
@ -202,6 +207,11 @@ def ce_recibe_archivo():
logging.info(f'dec ({dec})') logging.info(f'dec ({dec})')
return jsonify({'anything':'anything'}) ## if we return {}, then we trigger "if not {}" which happens to be true return jsonify({'anything':'anything'}) ## if we return {}, then we trigger "if not {}" which happens to be true
@app.route('/opengnsys/rest/CloningEngine/callback', methods=['POST'])
def ce_callback():
logging.info(f'{request.get_json()}')
return jsonify({'anything':'anything'})
@app.route('/opengnsys/rest/CloningEngine/<cucu>', methods=['GET', 'POST']) @app.route('/opengnsys/rest/CloningEngine/<cucu>', methods=['GET', 'POST'])
def ce_cucu(cucu): def ce_cucu(cucu):
abort (404) abort (404)

View File

@ -176,8 +176,6 @@ class CloningEngineWorker (ogLiveWorker):
return self.respuestaEjecucionComando (cmd, herror, ids) return self.respuestaEjecucionComando (cmd, herror, ids)
def process_status (self, path, get_params, post_params, server): def process_status (self, path, get_params, post_params, server):
self._join_threads()
thr_status = {} thr_status = {}
for k in self.thread_list: for k in self.thread_list:
thr_status[k] = { thr_status[k] = {

View File

@ -387,8 +387,6 @@ class ogAdmClientWorker (ogLiveWorker):
@check_secret @check_secret
def process_status (self, path, get_params, post_params, server): def process_status (self, path, get_params, post_params, server):
self._join_threads()
thr_status = {} thr_status = {}
for k in self.thread_list: for k in self.thread_list:
thr_status[k] = { thr_status[k] = {

View File

@ -58,6 +58,28 @@ class ThreadWithResult (threading.Thread):
class ogLiveWorker(ServerWorker): class ogLiveWorker(ServerWorker):
thread_list = {} thread_list = {}
def notifier (self, result):
logger.debug (f'notifier() called, result ({result})')
res = self.REST.sendMessage ('/'.join ([self.name, 'callback']), result)
def mon (self):
while True:
#print ('mon(): iterating')
for k in self.thread_list:
elem = self.thread_list[k]
if 'thread' not in elem: continue
logger.debug (f'considering thread ({k})')
try: elem['thread'].join (0.05)
except RuntimeError: pass ## race condition: a thread is created and this code runs before it is start()ed
if not elem['thread'].is_alive():
logger.debug (f'is no longer alive, k ({k}) thread ({elem["thread"]})')
elem['running'] = False
elem['result'] = elem['thread'].result
del elem['thread']
self.notifier (elem['result'])
time.sleep (1)
def interfaceAdmin (self, method, parametros=[]): def interfaceAdmin (self, method, parametros=[]):
exe = '{}/{}'.format (self.pathinterface, method) exe = '{}/{}'.format (self.pathinterface, method)
## for development only. Will be removed when the referenced bash code (/opt/opengnsys/lib/engine/bin/*.lib) is translated into python ## for development only. Will be removed when the referenced bash code (/opt/opengnsys/lib/engine/bin/*.lib) is translated into python
@ -193,9 +215,9 @@ class ogLiveWorker(ServerWorker):
if not self.tomaIPlocal(): if not self.tomaIPlocal():
raise Exception ('Se han generado errores. No se puede continuar la ejecución de este módulo') raise Exception ('Se han generado errores. No se puede continuar la ejecución de este módulo')
def _long_running_job (self, name, f, args): threading.Thread (name='monitoring_thread', target=self.mon, daemon=True).start()
self._join_threads()
def _long_running_job (self, name, f, args):
any_job_running = False any_job_running = False
for k in self.thread_list: for k in self.thread_list:
if self.thread_list[k]['running']: if self.thread_list[k]['running']:
@ -214,15 +236,3 @@ class ogLiveWorker(ServerWorker):
} }
self.thread_list[job_id]['thread'].start() self.thread_list[job_id]['thread'].start()
return { 'job_id': job_id } return { 'job_id': job_id }
def _join_threads (self):
for k in self.thread_list:
logger.debug (f'considering thread ({k})')
elem = self.thread_list[k]
if 'thread' in elem:
elem['thread'].join (0.05)
if not elem['thread'].is_alive():
logger.debug (f'is no longer alive, k ({k}) thread ({elem["thread"]})')
elem['running'] = False
elem['result'] = elem['thread'].result
del elem['thread']