Compare commits

...

3 Commits

Author SHA1 Message Date
Natalia Serrano 068e0cf633 refs #806 join threads when a new operation is requested 2024-10-03 14:39:31 +02:00
Natalia Serrano 72e4198762 refs #784 make Purgar() return something 2024-10-03 14:21:26 +02:00
Natalia Serrano 647489d507 refs #783 make Actualizar() asynchronous 2024-10-03 14:20:48 +02:00
3 changed files with 54 additions and 41 deletions

View File

@ -32,12 +32,10 @@
import base64
import os
import time
import random
from pathlib import Path
from opengnsys.log import logger
from opengnsys.workers import ogLiveWorker, ThreadWithResult
from opengnsys.workers import ogLiveWorker
class CloningEngineWorker (ogLiveWorker):
name = 'CloningEngine' # Module name
@ -171,40 +169,9 @@ class CloningEngineWorker (ogLiveWorker):
}
return self.respuestaEjecucionComando (cmd, herror, ids)
def _long_running_job (self, name, f, args):
any_job_running = False
for k in self.thread_list:
if self.thread_list[k]['running']:
any_job_running = True
break
if any_job_running:
logger.info ('some job is already running, refusing to launch another one')
return { 'job_id': None, 'message': 'some job is already running, refusing to launch another one' }
job_id = '{}-{}'.format (name, ''.join (random.choice ('0123456789abcdef') for _ in range (8)))
self.thread_list[job_id] = {
'thread': ThreadWithResult (target=f, args=args),
'starttime': time.time(),
'running': True,
'result': None
}
self.thread_list[job_id]['thread'].start()
return { 'job_id': job_id }
def process_status (self, path, get_params, post_params, server):
## join finished threads
for k in self.thread_list:
logger.debug (f'considering thread ({k})')
elem = self.thread_list[k]
if 'thread' in elem:
elem['thread'].join (0.05)
if not elem['thread'].is_alive():
logger.debug (f'is no longer alive, k ({k}) thread ({elem["thread"]})')
elem['running'] = False
elem['result'] = elem['thread'].result
del elem['thread']
self._join_threads()
## return status of threads
thr_status = {}
for k in self.thread_list:
thr_status[k] = {

View File

@ -385,9 +385,16 @@ class ogAdmClientWorker (ogLiveWorker):
@check_secret
def process_status (self, path, get_params, post_params, server):
return {self.name: 'in process_status'} ## XXX
self._join_threads()
thr_status = {}
for k in self.thread_list:
thr_status[k] = {
'running': self.thread_list[k]['running'],
'result': self.thread_list[k]['result'],
}
return thr_status
## curl --insecure -X POST --data '{"nfn": "popup", "title": "my title", "message": "my message"}' https://192.168.1.249:8000/ogAdmClient/popup
@check_secret
def process_popup (self, path, get_params, post_params, server):
logger.debug ('in process_popup, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server))
@ -396,9 +403,7 @@ class ogAdmClientWorker (ogLiveWorker):
## type(post_params) "<class 'dict'>"
return {'debug':'test'}
## curl --insecure https://192.168.1.249:8000/ogAdmClient/Actualizar
def process_Actualizar (self, path, get_params, post_params, server):
logger.debug ('in process_Actualizar, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server))
def do_Actualizar (self, post_params):
self.muestraMensaje (1)
#if !comandosPendientes: error 84 'Ha ocurrido algún problema al reiniciar la sesión del cliente'
cfg = self.LeeConfiguracion()
@ -414,9 +419,14 @@ class ogAdmClientWorker (ogLiveWorker):
self.muestraMenu()
return self.respuestaEjecucionComando (cmd, 0)
def process_Actualizar (self, path, get_params, post_params, server):
logger.debug ('in process_Actualizar, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server))
return self._long_running_job ('Actualizar', self.do_Actualizar, args=(post_params,))
def process_Purgar (self, path, get_params, post_params, server):
logger.debug ('in process_Purgar, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server))
exit (0) ## ogAdmClient.c:905
return {}
#exit (0) ## ogAdmClient.c:905
def process_Sondeo (self, path, get_params, post_params, server):
logger.debug ('in process_Sondeo, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server))

View File

@ -31,6 +31,8 @@
# pylint: disable=unused-wildcard-import,wildcard-import
import os
import time
import random
import subprocess
import threading
@ -190,3 +192,37 @@ class ogLiveWorker(ServerWorker):
if not self.tomaIPlocal():
raise Exception ('Se han generado errores. No se puede continuar la ejecución de este módulo')
def _long_running_job (self, name, f, args):
self._join_threads()
any_job_running = False
for k in self.thread_list:
if self.thread_list[k]['running']:
any_job_running = True
break
if any_job_running:
logger.info ('some job is already running, refusing to launch another one')
return { 'job_id': None, 'message': 'some job is already running, refusing to launch another one' }
job_id = '{}-{}'.format (name, ''.join (random.choice ('0123456789abcdef') for _ in range (8)))
self.thread_list[job_id] = {
'thread': ThreadWithResult (target=f, args=args),
'starttime': time.time(),
'running': True,
'result': None
}
self.thread_list[job_id]['thread'].start()
return { 'job_id': job_id }
def _join_threads (self):
for k in self.thread_list:
logger.debug (f'considering thread ({k})')
elem = self.thread_list[k]
if 'thread' in elem:
elem['thread'].join (0.05)
if not elem['thread'].is_alive():
logger.debug (f'is no longer alive, k ({k}) thread ({elem["thread"]})')
elem['running'] = False
elem['result'] = elem['thread'].result
del elem['thread']