refs #789 perform long-running tasks in the background

pull/9/head
Natalia Serrano 2024-09-30 17:36:25 +02:00
parent 39a367be79
commit 9b91eedf1b
5 changed files with 86 additions and 26 deletions

View File

@ -1,3 +1,9 @@
ogagent (1.3.7-1) stable; urgency=medium
* CloningEngine: RESTfully keep a list of long-running jobs
-- OpenGnsys developers <info@opengnsys.es> Fri, 27 Sep 2024 18:03:16 +0200
ogagent (1.3.6-1) stable; urgency=medium
* Add more functionality to the ogAdmClient module

View File

@ -1 +1 @@
1.3.6
1.3.7

View File

@ -32,25 +32,24 @@
import base64
import os
import time
import random
from pathlib import Path
from opengnsys.log import logger
from opengnsys.workers import ogLiveWorker
from opengnsys.workers import ogLiveWorker, ThreadWithResult
class CloningEngineWorker (ogLiveWorker):
name = 'CloningEngine' # Module name
REST = None # REST object
def onDeactivation (self):
logger.debug ('onDeactivation')
def process_status (self, path, get_params, post_params, server):
return {self.name: 'in process_status'} ## XXX
def onActivation (self):
super().onActivation()
logger.info ('onActivation ok')
def onDeactivation (self):
logger.debug ('onDeactivation')
## en C, esto envia una trama de respuesta al servidor. Devuelve un boolean
## en python, simplemente termina de construir la respuesta y la devuelve; no envía nada por la red. El caller la usa en return() para enviar implícitamente la respuesta
def respuestaEjecucionComando (self, cmd, herror, ids):
@ -102,10 +101,7 @@ class CloningEngineWorker (ogLiveWorker):
return {'true':'true'} ## XXX
def process_CrearImagen (self, path, get_params, post_params, server):
logger.debug ('in process_CrearImagen, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server))
logger.debug ('type(post_params) "{}"'.format (type (post_params)))
def do_CrearImagen (self, post_params):
for k in ['dsk', 'par', 'cpt', 'idi', 'nci', 'ipr', 'nfn', 'ids']:
if k not in post_params:
logger.error (f'required parameter ({k}) not in POST params')
@ -147,19 +143,7 @@ class CloningEngineWorker (ogLiveWorker):
}
return self.respuestaEjecucionComando (cmd, herror, ids)
def process_CrearImagenBasica (self, path, get_params, post_params, server):
logger.debug ('in process_CrearImagenBasica, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server))
logger.warning ('this method has been removed')
raise Exception ({ '_httpcode': 404, '_msg': 'This method has been removed' })
def process_CrearSoftIncremental (self, path, get_params, post_params, server):
logger.debug ('in process_CrearSoftIncremental, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server))
logger.warning ('this method has been removed')
raise Exception ({ '_httpcode': 404, '_msg': 'This method has been removed' })
def process_RestaurarImagen (self, path, get_params, post_params, server):
logger.debug ('in process_RestaurarImagen, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server))
def do_RestaurarImagen (self, post_params):
for k in ['dsk', 'par', 'idi', 'ipr', 'nci', 'ifs', 'ptc', 'nfn', 'ids']:
if k not in post_params:
logger.error (f'required parameter ({k}) not in POST params')
@ -202,6 +186,59 @@ class CloningEngineWorker (ogLiveWorker):
}
return self.respuestaEjecucionComando (cmd, herror, ids)
def _long_running_job (self, name, f, args):
job_id = '{}-{}'.format (name, ''.join (random.choice ('0123456789abcdef') for _ in range (8)))
self.thread_list[job_id] = {
'thread': ThreadWithResult (target=f, args=args),
'starttime': time.time(),
'running': True,
'result': None
}
self.thread_list[job_id]['thread'].start()
return { 'job_id': job_id }
def process_status (self, path, get_params, post_params, server):
## join finished threads
for k in self.thread_list:
logger.debug (f'considering thread ({k})')
elem = self.thread_list[k]
if 'thread' in elem:
elem['thread'].join (0.05)
if not elem['thread'].is_alive():
logger.debug (f'is no longer alive, k ({k}) thread ({elem["thread"]})')
elem['running'] = False
elem['result'] = elem['thread'].result
del elem['thread']
## return status of threads
thr_status = {}
for k in self.thread_list:
thr_status[k] = {
'running': self.thread_list[k]['running'],
'result': self.thread_list[k]['result'],
}
return thr_status
def process_CrearImagen (self, path, get_params, post_params, server):
logger.debug ('in process_CrearImagen, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server))
logger.debug ('type(post_params) "{}"'.format (type (post_params)))
return self._long_running_job ('CrearImagen', self.do_CrearImagen, args=(post_params,))
def process_CrearImagenBasica (self, path, get_params, post_params, server):
logger.debug ('in process_CrearImagenBasica, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server))
logger.warning ('this method has been removed')
raise Exception ({ '_httpcode': 404, '_msg': 'This method has been removed' })
def process_CrearSoftIncremental (self, path, get_params, post_params, server):
logger.debug ('in process_CrearSoftIncremental, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server))
logger.warning ('this method has been removed')
raise Exception ({ '_httpcode': 404, '_msg': 'This method has been removed' })
def process_RestaurarImagen (self, path, get_params, post_params, server):
logger.debug ('in process_RestaurarImagen, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server))
logger.debug ('type(post_params) "{}"'.format (type (post_params)))
return self._long_running_job ('RestaurarImagen', self.do_RestaurarImagen, args=(post_params,))
def process_RestaurarImagenBasica (self, path, get_params, post_params, server):
logger.debug ('in process_RestaurarImagenBasica, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server))
logger.warning ('this method has been removed')

View File

@ -1,3 +1,3 @@
from .server_worker import ServerWorker
from .client_worker import ClientWorker
from .oglive_worker import ogLiveWorker
from .oglive_worker import ogLiveWorker, ThreadWithResult

View File

@ -32,13 +32,30 @@
import os
import subprocess
import threading
from configparser import NoOptionError
from opengnsys import REST
from opengnsys.log import logger
from .server_worker import ServerWorker
## https://stackoverflow.com/questions/6893968/how-to-get-the-return-value-from-a-thread
class ThreadWithResult (threading.Thread):
def run (self):
try:
self.result = None
if self._target is not None:
try:
self.result = self._target (*self._args, **self._kwargs)
except Exception as e:
self.result = f'got exception: ({e})'
finally:
# Avoid a refcycle if the thread is running a function with an argument that has a member that points to the thread.
del self._target, self._args, self._kwargs
class ogLiveWorker(ServerWorker):
thread_list = {}
def interfaceAdmin (self, method, parametros=[]):
exe = '{}/{}'.format (self.pathinterface, method)
## for development only. Will be removed when the referenced bash code (/opt/opengnsys/lib/engine/bin/*.lib) is translated into python