diff --git a/linux/debian/changelog b/linux/debian/changelog index 13c29d9..9a3204e 100644 --- a/linux/debian/changelog +++ b/linux/debian/changelog @@ -1,3 +1,9 @@ +ogagent (1.3.7-1) stable; urgency=medium + + * CloningEngine: RESTfully keep a list of long-running jobs + + -- OpenGnsys developers Fri, 27 Sep 2024 18:03:16 +0200 + ogagent (1.3.6-1) stable; urgency=medium * Add more functionality to the ogAdmClient module diff --git a/src/VERSION b/src/VERSION index 95b25ae..3336003 100644 --- a/src/VERSION +++ b/src/VERSION @@ -1 +1 @@ -1.3.6 +1.3.7 diff --git a/src/opengnsys/modules/server/CloningEngine/__init__.py b/src/opengnsys/modules/server/CloningEngine/__init__.py index 0d7aea5..4e43ef4 100644 --- a/src/opengnsys/modules/server/CloningEngine/__init__.py +++ b/src/opengnsys/modules/server/CloningEngine/__init__.py @@ -32,25 +32,24 @@ import base64 import os +import time +import random from pathlib import Path from opengnsys.log import logger -from opengnsys.workers import ogLiveWorker +from opengnsys.workers import ogLiveWorker, ThreadWithResult class CloningEngineWorker (ogLiveWorker): name = 'CloningEngine' # Module name REST = None # REST object - def onDeactivation (self): - logger.debug ('onDeactivation') - - def process_status (self, path, get_params, post_params, server): - return {self.name: 'in process_status'} ## XXX - def onActivation (self): super().onActivation() logger.info ('onActivation ok') + def onDeactivation (self): + logger.debug ('onDeactivation') + ## en C, esto envia una trama de respuesta al servidor. Devuelve un boolean ## en python, simplemente termina de construir la respuesta y la devuelve; no envĂ­a nada por la red. El caller la usa en return() para enviar implĂ­citamente la respuesta def respuestaEjecucionComando (self, cmd, herror, ids): @@ -102,10 +101,7 @@ class CloningEngineWorker (ogLiveWorker): return {'true':'true'} ## XXX - def process_CrearImagen (self, path, get_params, post_params, server): - logger.debug ('in process_CrearImagen, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server)) - logger.debug ('type(post_params) "{}"'.format (type (post_params))) - + def do_CrearImagen (self, post_params): for k in ['dsk', 'par', 'cpt', 'idi', 'nci', 'ipr', 'nfn', 'ids']: if k not in post_params: logger.error (f'required parameter ({k}) not in POST params') @@ -147,19 +143,7 @@ class CloningEngineWorker (ogLiveWorker): } return self.respuestaEjecucionComando (cmd, herror, ids) - def process_CrearImagenBasica (self, path, get_params, post_params, server): - logger.debug ('in process_CrearImagenBasica, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server)) - logger.warning ('this method has been removed') - raise Exception ({ '_httpcode': 404, '_msg': 'This method has been removed' }) - - def process_CrearSoftIncremental (self, path, get_params, post_params, server): - logger.debug ('in process_CrearSoftIncremental, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server)) - logger.warning ('this method has been removed') - raise Exception ({ '_httpcode': 404, '_msg': 'This method has been removed' }) - - def process_RestaurarImagen (self, path, get_params, post_params, server): - logger.debug ('in process_RestaurarImagen, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server)) - + def do_RestaurarImagen (self, post_params): for k in ['dsk', 'par', 'idi', 'ipr', 'nci', 'ifs', 'ptc', 'nfn', 'ids']: if k not in post_params: logger.error (f'required parameter ({k}) not in POST params') @@ -202,6 +186,59 @@ class CloningEngineWorker (ogLiveWorker): } return self.respuestaEjecucionComando (cmd, herror, ids) + def _long_running_job (self, name, f, args): + job_id = '{}-{}'.format (name, ''.join (random.choice ('0123456789abcdef') for _ in range (8))) + self.thread_list[job_id] = { + 'thread': ThreadWithResult (target=f, args=args), + 'starttime': time.time(), + 'running': True, + 'result': None + } + self.thread_list[job_id]['thread'].start() + return { 'job_id': job_id } + + def process_status (self, path, get_params, post_params, server): + ## join finished threads + for k in self.thread_list: + logger.debug (f'considering thread ({k})') + elem = self.thread_list[k] + if 'thread' in elem: + elem['thread'].join (0.05) + if not elem['thread'].is_alive(): + logger.debug (f'is no longer alive, k ({k}) thread ({elem["thread"]})') + elem['running'] = False + elem['result'] = elem['thread'].result + del elem['thread'] + + ## return status of threads + thr_status = {} + for k in self.thread_list: + thr_status[k] = { + 'running': self.thread_list[k]['running'], + 'result': self.thread_list[k]['result'], + } + return thr_status + + def process_CrearImagen (self, path, get_params, post_params, server): + logger.debug ('in process_CrearImagen, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server)) + logger.debug ('type(post_params) "{}"'.format (type (post_params))) + return self._long_running_job ('CrearImagen', self.do_CrearImagen, args=(post_params,)) + + def process_CrearImagenBasica (self, path, get_params, post_params, server): + logger.debug ('in process_CrearImagenBasica, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server)) + logger.warning ('this method has been removed') + raise Exception ({ '_httpcode': 404, '_msg': 'This method has been removed' }) + + def process_CrearSoftIncremental (self, path, get_params, post_params, server): + logger.debug ('in process_CrearSoftIncremental, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server)) + logger.warning ('this method has been removed') + raise Exception ({ '_httpcode': 404, '_msg': 'This method has been removed' }) + + def process_RestaurarImagen (self, path, get_params, post_params, server): + logger.debug ('in process_RestaurarImagen, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server)) + logger.debug ('type(post_params) "{}"'.format (type (post_params))) + return self._long_running_job ('RestaurarImagen', self.do_RestaurarImagen, args=(post_params,)) + def process_RestaurarImagenBasica (self, path, get_params, post_params, server): logger.debug ('in process_RestaurarImagenBasica, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server)) logger.warning ('this method has been removed') diff --git a/src/opengnsys/workers/__init__.py b/src/opengnsys/workers/__init__.py index dc0c01d..e74f9f5 100644 --- a/src/opengnsys/workers/__init__.py +++ b/src/opengnsys/workers/__init__.py @@ -1,3 +1,3 @@ from .server_worker import ServerWorker from .client_worker import ClientWorker -from .oglive_worker import ogLiveWorker +from .oglive_worker import ogLiveWorker, ThreadWithResult diff --git a/src/opengnsys/workers/oglive_worker.py b/src/opengnsys/workers/oglive_worker.py index a63df10..737c763 100644 --- a/src/opengnsys/workers/oglive_worker.py +++ b/src/opengnsys/workers/oglive_worker.py @@ -32,13 +32,30 @@ import os import subprocess +import threading from configparser import NoOptionError from opengnsys import REST from opengnsys.log import logger from .server_worker import ServerWorker +## https://stackoverflow.com/questions/6893968/how-to-get-the-return-value-from-a-thread +class ThreadWithResult (threading.Thread): + def run (self): + try: + self.result = None + if self._target is not None: + try: + self.result = self._target (*self._args, **self._kwargs) + except Exception as e: + self.result = f'got exception: ({e})' + finally: + # Avoid a refcycle if the thread is running a function with an argument that has a member that points to the thread. + del self._target, self._args, self._kwargs + class ogLiveWorker(ServerWorker): + thread_list = {} + def interfaceAdmin (self, method, parametros=[]): exe = '{}/{}'.format (self.pathinterface, method) ## for development only. Will be removed when the referenced bash code (/opt/opengnsys/lib/engine/bin/*.lib) is translated into python