diff --git a/linux/debian/changelog b/linux/debian/changelog index 5360d8e..2314ee0 100644 --- a/linux/debian/changelog +++ b/linux/debian/changelog @@ -1,4 +1,23 @@ -ogagent (1.4.6-1) UNRELEASED; urgency=medium +ogagent (1.4.9-1) stable; urgency=medium + + * Notify ogcore when agent shuts down within oglive + + -- OpenGnsys developers Thu, 20 Feb 2025 11:58:29 +0100 + +ogagent (1.4.8-1) stable; urgency=medium + + * Optionally return disk config in /status + + -- OpenGnsys developers Tue, 18 Feb 2025 13:48:54 +0100 + +ogagent (1.4.7-1) stable; urgency=medium + + * Merge server modules + * Track the progress of children + + -- OpenGnsys developers Tue, 04 Feb 2025 14:12:19 +0100 + +ogagent (1.4.6-1) stable; urgency=medium * Point to the new menu browser diff --git a/src/OGAgent.spec b/src/OGAgent.spec index aaba3a8..5e9256f 100755 --- a/src/OGAgent.spec +++ b/src/OGAgent.spec @@ -13,7 +13,7 @@ ogausr_a = Analysis( # ('cfg', 'cfg'), ## add the entire directory ('img', 'img'), ## add the entire directory ], - hiddenimports=['win32timezone', 'socketserver', 'http.server', 'urllib', 'opengnsys.modules.client.OpenGnSys', 'opengnsys.modules.server.CloningEngine', 'opengnsys.modules.server.ogAdmClient', 'opengnsys.modules.server.OpenGnSys'], + hiddenimports=['win32timezone', 'socketserver', 'http.server', 'urllib', 'opengnsys.modules.client.OpenGnSys', 'opengnsys.modules.server.ogAdmClient', 'opengnsys.modules.server.OpenGnSys'], hookspath=[], hooksconfig={}, runtime_hooks=[], @@ -26,7 +26,7 @@ ogasvc_a = Analysis( pathex=[], binaries=[], datas=[], - hiddenimports=['win32timezone', 'socketserver', 'http.server', 'urllib', 'opengnsys.modules.client.OpenGnSys', 'opengnsys.modules.server.CloningEngine', 'opengnsys.modules.server.ogAdmClient', 'opengnsys.modules.server.OpenGnSys'], + hiddenimports=['win32timezone', 'socketserver', 'http.server', 'urllib', 'opengnsys.modules.client.OpenGnSys', 'opengnsys.modules.server.ogAdmClient', 'opengnsys.modules.server.OpenGnSys'], hookspath=[], hooksconfig={}, runtime_hooks=[], diff --git a/src/VERSION b/src/VERSION index c514bd8..4ea2b1f 100644 --- a/src/VERSION +++ b/src/VERSION @@ -1 +1 @@ -1.4.6 +1.4.9 diff --git a/src/cfg/ogagent.cfg b/src/cfg/ogagent.cfg index c07e32a..a3faa82 100644 --- a/src/cfg/ogagent.cfg +++ b/src/cfg/ogagent.cfg @@ -28,10 +28,3 @@ log=DEBUG pathinterface=/opt/opengnsys/interfaceAdm urlMenu={}://{}/menu-browser urlMsg=http://localhost/cgi-bin/httpd-log.sh - -[CloningEngine] -remote={}://{}/opengnsys/rest -log=DEBUG -pathinterface=/opt/opengnsys/interfaceAdm -urlMenu={}://{}/menu-browser -urlMsg=http://localhost/cgi-bin/httpd-log.sh diff --git a/src/opengnsys/modules/server/CloningEngine/__init__.py b/src/opengnsys/modules/server/CloningEngine/__init__.py deleted file mode 100644 index 67ce077..0000000 --- a/src/opengnsys/modules/server/CloningEngine/__init__.py +++ /dev/null @@ -1,355 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -# -# Copyright (c) 2024 Qindel Formación y Servicios S.L. -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without modification, -# are permitted provided that the following conditions are met: -# -# * Redistributions of source code must retain the above copyright notice, -# this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above copyright notice, -# this list of conditions and the following disclaimer in the documentation -# and/or other materials provided with the distribution. -# * Neither the name of Virtual Cable S.L. nor the names of its contributors -# may be used to endorse or promote products derived from this software -# without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE -# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR -# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER -# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, -# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -""" -@author: Natalia Serrano, nserrano at qindel dot com -""" - -import base64 -import os -from pathlib import Path - -from opengnsys.log import logger -from opengnsys.workers import ogLiveWorker - -class CloningEngineWorker (ogLiveWorker): - name = 'CloningEngine' # Module name - REST = None # REST object - - def onActivation (self): - super().onActivation (run_monitoring_thread=False) - logger.info ('onActivation ok') - - def onDeactivation (self): - logger.debug ('onDeactivation') - - def InventariandoSoftware (self, dsk, par, nfn): - sft_src = f'/tmp/CSft-{self.IPlocal}-{par}' - try: - self.interfaceAdmin (nfn, [dsk, par, sft_src]) - herror = 0 - except: - herror = 1 - - if herror: - logger.warning ('Error al ejecutar el comando') - b64 = '' - self.muestraMensaje (20) - else: - if not os.path.exists (sft_src): - raise Exception (f'interfaceAdmin({nfn}) returned success but did not create file ({sft_src})') - sft_src_contents = Path (sft_src).read_bytes() - - b64 = base64.b64encode (sft_src_contents).decode ('utf-8') - self.muestraMensaje (19) - - cmd = { - 'nfn': 'RESPUESTA_InventarioSoftware', - 'dsk': dsk, ## not in the original C code, around ogAdmClient.c:1944 - 'par': par, - 'contents': b64, - } - return self.respuestaEjecucionComando (cmd, herror, 0) - - def do_CrearImagen (self, post_params): - for k in ['dsk', 'par', 'cpt', 'idi', 'nci', 'ipr', 'nfn', 'ids']: - if k not in post_params: - logger.error (f'required parameter ({k}) not in POST params') - return {} - - dsk = post_params['dsk'] ## Disco - par = post_params['par'] ## Número de partición - cpt = post_params['cpt'] ## Código de la partición - idi = post_params['idi'] ## Identificador de la imagen - nci = post_params['nci'] ## Nombre canónico de la imagen - ipr = post_params['ipr'] ## Ip del repositorio - nfn = post_params['nfn'] - ids = post_params['ids'] - - self.muestraMensaje (7) - - try: - res = self.InventariandoSoftware (dsk, par, 'InventarioSoftware') ## Crea inventario Software previamente - except: - logger.warning ('Error al ejecutar el comando') - return {} - - if res['contents']: - self.muestraMensaje (2) - inv_sft = res['contents'] - try: - self.interfaceAdmin (nfn, [dsk, par, nci, ipr]) - self.muestraMensaje (9) - herror = 0 - except: - logger.warning ('Error al ejecutar el comando') - self.muestraMensaje (10) - herror = 1 - else: - logger.warning ('Error al ejecutar el comando') - inv_sft = '' - - self.muestraMenu() - - cmd = { - 'nfn': 'RESPUESTA_CrearImagen', - 'idi': idi, ## Identificador de la imagen - 'dsk': dsk, ## Número de disco - 'par': par, ## Número de partición de donde se creó - 'cpt': cpt, ## Tipo o código de partición - 'ipr': ipr, ## Ip del repositorio donde se alojó - 'inv_sft': inv_sft, - } - return self.respuestaEjecucionComando (cmd, herror, ids) - - def do_RestaurarImagen (self, post_params): - for k in ['dsk', 'par', 'idi', 'ipr', 'nci', 'ifs', 'ptc', 'nfn', 'ids']: - if k not in post_params: - logger.error (f'required parameter ({k}) not in POST params') - return {} - - dsk = post_params['dsk'] - par = post_params['par'] - idi = post_params['idi'] - ipr = post_params['ipr'] - nci = post_params['nci'] - ifs = post_params['ifs'] - ptc = post_params['ptc'] ## Protocolo de clonación: Unicast, Multicast, Torrent - nfn = post_params['nfn'] - ids = post_params['ids'] - - self.muestraMensaje (3) - - try: - ## the ptc.split() is useless right now, since interfaceAdmin() does ' '.join(params) in order to spawn a shell - ## however we're going to need it in the future (when everything gets translated into python), plus it's harmless now. So let's do it - #self.interfaceAdmin (nfn, [dsk, par, nci, ipr, ptc]) - self.interfaceAdmin (nfn, [dsk, par, nci, ipr] + ptc.split()) - self.muestraMensaje (11) - herror = 0 - except: - logger.warning ('Error al ejecutar el comando') - self.muestraMensaje (12) - herror = 1 - - cfg = self.LeeConfiguracion() - if not cfg: - logger.warning ('No se ha podido recuperar la configuración de las particiones del disco') - - self.muestraMenu() - - cmd = { - 'nfn': 'RESPUESTA_RestaurarImagen', - 'idi': idi, ## Identificador de la imagen - 'dsk': dsk, ## Número de disco - 'par': par, ## Número de partición - 'ifs': ifs, ## Identificador del perfil software - 'cfg': self.cfg2obj(cfg), ## Configuración de discos - } - return self.respuestaEjecucionComando (cmd, herror, ids) - - def process_status (self, path, get_params, post_params, server): - thr_status = {} - for k in self.thread_list: - thr_status[k] = { - 'running': self.thread_list[k]['running'], - 'result': self.thread_list[k]['result'], - } - return thr_status - - def do_Configurar (self, post_params): - for k in ['nfn', 'dsk', 'cfg', 'ids']: - if k not in post_params: - logger.error (f'required parameter ({k}) not in POST params') - return {} - - nfn = post_params['nfn'] - dsk = post_params['dsk'] - cfg = post_params['cfg'] - ids = post_params['ids'] - - self.muestraMensaje (4) - - params = [] - disk_info = cfg.pop (0) - logger.debug (f'disk_info ({disk_info})') - for k in ['dis', 'che', 'tch']: - params.append (f'{k}={disk_info[k]}') - disk_info_str = '*'.join (params) - - partitions = [] - for entry in cfg: - logger.debug (f'entry ({entry})') - params = [] - for k in ['par', 'cpt', 'sfi', 'tam', 'ope']: - params.append (f'{k}={entry[k]}') - partitions.append ('*'.join (params)) - part_info_str = '%'.join (partitions) - - cfg_str = f'{disk_info_str}!{part_info_str}%' - - try: - self.interfaceAdmin (nfn, ['ignored', cfg_str]) - self.muestraMensaje (14) - herror = 0 - except: - logger.warning ('Error al ejecutar el comando') - self.muestraMensaje (13) - herror = 1 - - cfg = self.LeeConfiguracion() - if not cfg: - logger.warning ('No se ha podido recuperar la configuración de las particiones del disco') - return {} - - cmd = { - 'nfn': 'RESPUESTA_Configurar', - 'cfg': self.cfg2obj (cfg), - } - self.muestraMenu() - return self.respuestaEjecucionComando (cmd, herror, ids) - - def do_InventarioHardware (self, post_params): - for k in ['nfn', 'ids']: - if k not in post_params: - logger.error (f'required parameter ({k}) not in POST params') - return {} - - nfn = post_params['nfn'] - ids = post_params['ids'] - - self.muestraMensaje (6) - - hrdsrc = f'/tmp/Chrd-{self.IPlocal}' ## Nombre que tendra el archivo de inventario - hrddst = f'/tmp/Shrd-{self.IPlocal}' ## Nombre que tendra el archivo en el Servidor - try: - self.interfaceAdmin (nfn, [hrdsrc]) - hrdsrc_contents = Path (hrdsrc).read_bytes() - logger.debug (f'hrdsrc_contents 1 ({hrdsrc_contents})') - herror = 0 - except: - logger.warning ('Error al ejecutar el comando') - self.muestraMensaje (18) - herror = 1 - - if herror: - hrddst = '' - else: - logger.debug (f'hrdsrc_contents 2 ({hrdsrc_contents})') - ## Envía fichero de inventario al servidor - res = self.enviaMensajeServidor ('recibeArchivo', { 'nfl': hrddst, 'contents': base64.b64encode (hrdsrc_contents).decode ('utf-8') }) - logger.debug (res) - if not res: - logger.error ('Ha ocurrido algún problema al enviar un archivo por la red') - herror = 12 ## Error de envío de fichero por la red - self.muestraMensaje (17) - - ## Envia respuesta de ejecución de la función de interface - cmd = { - 'nfn': 'RESPUESTA_InventarioHardware', - 'hrd': hrddst, - } - self.muestraMenu() - return self.respuestaEjecucionComando (cmd, herror, ids) - - def do_InventarioSoftware (self, post_params): - for k in ['nfn', 'dsk', 'par', 'ids']: - if k not in post_params: - logger.error (f'required parameter ({k}) not in POST params') - return {} - - nfn = post_params['nfn'] - dsk = post_params['dsk'] - par = post_params['par'] - ids = post_params['ids'] - - self.muestraMensaje (7) - - try: - cmd = self.InventariandoSoftware (dsk, par, 'InventarioSoftware') - herror = 0 - except: - logger.warning ('Error al ejecutar el comando') - cmd = { 'nfn': 'RESPUESTA_InventarioSoftware' } - herror = 1 - - self.muestraMenu() - return self.respuestaEjecucionComando (cmd, herror, ids) - - def process_CrearImagen (self, path, get_params, post_params, server): - logger.debug ('in process_CrearImagen, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server)) - logger.debug ('type(post_params) "{}"'.format (type (post_params))) - return self._long_running_job ('CrearImagen', self.do_CrearImagen, args=(post_params,)) - - def process_CrearImagenBasica (self, path, get_params, post_params, server): - logger.debug ('in process_CrearImagenBasica, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server)) - logger.warning ('this method has been removed') - raise Exception ({ '_httpcode': 404, '_msg': 'This method has been removed' }) - - def process_CrearSoftIncremental (self, path, get_params, post_params, server): - logger.debug ('in process_CrearSoftIncremental, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server)) - logger.warning ('this method has been removed') - raise Exception ({ '_httpcode': 404, '_msg': 'This method has been removed' }) - - def process_RestaurarImagen (self, path, get_params, post_params, server): - logger.debug ('in process_RestaurarImagen, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server)) - logger.debug ('type(post_params) "{}"'.format (type (post_params))) - return self._long_running_job ('RestaurarImagen', self.do_RestaurarImagen, args=(post_params,)) - - def process_RestaurarImagenBasica (self, path, get_params, post_params, server): - logger.debug ('in process_RestaurarImagenBasica, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server)) - logger.warning ('this method has been removed') - raise Exception ({ '_httpcode': 404, '_msg': 'This method has been removed' }) - - def process_RestaurarSoftIncremental (self, path, get_params, post_params, server): - logger.warning ('in process_RestaurarSoftIncremental') - logger.debug ('in process_RestaurarSoftIncremental, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server)) - logger.warning ('this method has been removed') - raise Exception ({ '_httpcode': 404, '_msg': 'This method has been removed' }) - - def process_Configurar (self, path, get_params, post_params, server): - logger.debug ('in process_Configurar, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server)) - return self._long_running_job ('Configurar', self.do_Configurar, args=(post_params,)) - - def process_InventarioHardware (self, path, get_params, post_params, server): - logger.debug ('in process_InventarioHardware, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server)) - return self._long_running_job ('InventarioHardware', self.do_InventarioHardware, args=(post_params,)) - - def process_InventarioSoftware (self, path, get_params, post_params, server): - logger.debug ('in process_InventarioSoftware, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server)) - return self._long_running_job ('InventarioSoftware', self.do_InventarioSoftware, args=(post_params,)) - - ## curl --insecure -X POST --data '{"job_id":"foo"}' https://192.168.2.199:8000/CloningEngine/KillJob - def process_KillJob (self, path, get_params, post_params, server): - logger.debug ('in process_KillJob, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server)) - jid = post_params['job_id'] - r = self.killer (jid) - logger.debug (f'r bef ({r})') - r.update ({ 'nfn':'RESPUESTA_KillJob', 'job':jid }) - logger.debug (f'r aft ({r})') - return r diff --git a/src/opengnsys/modules/server/ogAdmClient/__init__.py b/src/opengnsys/modules/server/ogAdmClient/__init__.py index 1cf33a5..0866323 100644 --- a/src/opengnsys/modules/server/ogAdmClient/__init__.py +++ b/src/opengnsys/modules/server/ogAdmClient/__init__.py @@ -2,7 +2,7 @@ # -*- coding: utf-8 -*- # # Copyright (c) 2014 Virtual Cable S.L. -# Copyright (c) 2024 Qindel Formación y Servicios S.L. +# Copyright (c) 2024-2025 Qindel Formación y Servicios S.L. # All rights reserved. # # Redistribution and use in source and binary forms, with or without modification, @@ -41,7 +41,6 @@ import subprocess from pathlib import Path from urllib.parse import unquote -#from opengnsys import operations from opengnsys.log import logger from opengnsys.workers import ogLiveWorker @@ -69,7 +68,6 @@ def check_secret (fnc): class ogAdmClientWorker (ogLiveWorker): name = 'ogAdmClient' # Module name - #interface = None # Bound interface for OpenGnsys (el otro modulo lo usa para obtener .ip y .mac REST = None # REST object def onDeactivation (self): @@ -77,6 +75,8 @@ class ogAdmClientWorker (ogLiveWorker): Sends OGAgent stopping notification to OpenGnsys server """ logger.debug ('onDeactivation') + self.REST.sendMessage ('ogAdmClient/stopped', {'mac': self.mac, 'ip': self.IPlocal, 'idcentro': self.idcentro, 'idaula': self.idaula, + 'idordenador': self.idordenador, 'nombreordenador': self.nombreordenador}) #def processClientMessage (self, message, data): # logger.debug ('Got OpenGnsys message from client: {}, data {}'.format (message, data)) @@ -124,18 +124,46 @@ class ogAdmClientWorker (ogLiveWorker): # threading.Thread (target=pwoff).start() # return {'op': 'launched'} - - - - - - - - - ## process_* are invoked from opengnsys/httpserver.py:99 "data = module.processServerMessage (path, get_params, post_params, self)" (via opengnsys/workers/server_worker.py) ## process_client_* are invoked from opengnsys/service.py:123 "v.processClientMessage (message, json.loads (data))" (via opengnsys/workers/server_worker.py) + + + + + + + + + + def InventariandoSoftware (self, dsk, par, nfn): + sft_src = f'/tmp/CSft-{self.IPlocal}-{par}' + try: + self.interfaceAdmin (nfn, [dsk, par, sft_src]) + herror = 0 + except: + herror = 1 + + if herror: + logger.warning ('Error al ejecutar el comando') + b64 = '' + self.muestraMensaje (20) + else: + if not os.path.exists (sft_src): + raise Exception (f'interfaceAdmin({nfn}) returned success but did not create file ({sft_src})') + sft_src_contents = Path (sft_src).read_bytes() + + b64 = base64.b64encode (sft_src_contents).decode ('utf-8') + self.muestraMensaje (19) + + cmd = { + 'nfn': 'RESPUESTA_InventarioSoftware', + 'dsk': dsk, ## not in the original C code, around ogAdmClient.c:1944 + 'par': par, + 'contents': b64, + } + return self.respuestaEjecucionComando (cmd, herror, 0) + def ejecutaArchivo (self,fn): logger.debug ('fn ({})'.format (fn)) @@ -279,7 +307,7 @@ class ogAdmClientWorker (ogLiveWorker): #} def onActivation (self): - super().onActivation (run_monitoring_thread=True) + super().onActivation() logger.info ('Inicio de sesion') logger.info ('Abriendo sesión en el servidor de Administración') if (not self.inclusionCliente()): @@ -309,20 +337,24 @@ class ogAdmClientWorker (ogLiveWorker): @check_secret def process_status (self, path, get_params, post_params, server): logger.debug ('in process_status, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server)) - cfg = self.LeeConfiguracion() + full_config = 'full-config' in post_params and post_params['full-config'] thr_status = {} for k in self.thread_list: thr_status[k] = { 'running': self.thread_list[k]['running'], 'result': self.thread_list[k]['result'], } - return { + ret = { 'nfn': 'RESPUESTA_status', 'mac': self.mac, + 'st': 'OGL', 'ip': self.IPlocal, - 'cfg': self.cfg2obj (cfg), 'threads': thr_status, } + if full_config: + cfg = self.LeeConfiguracion() + ret['cfg'] = self.cfg2obj (cfg) + return ret @check_secret def process_popup (self, path, get_params, post_params, server): @@ -332,6 +364,222 @@ class ogAdmClientWorker (ogLiveWorker): ## type(post_params) "" return {'debug':'test'} + def do_CrearImagen (self, post_params): + for k in ['dsk', 'par', 'cpt', 'idi', 'nci', 'ipr', 'nfn', 'ids']: + if k not in post_params: + logger.error (f'required parameter ({k}) not in POST params') + return {} + + dsk = post_params['dsk'] ## Disco + par = post_params['par'] ## Número de partición + cpt = post_params['cpt'] ## Código de la partición + idi = post_params['idi'] ## Identificador de la imagen + nci = post_params['nci'] ## Nombre canónico de la imagen + ipr = post_params['ipr'] ## Ip del repositorio + nfn = post_params['nfn'] + ids = post_params['ids'] + + self.muestraMensaje (7) + + try: + res = self.InventariandoSoftware (dsk, par, 'InventarioSoftware') ## Crea inventario Software previamente + except: + logger.warning ('Error al ejecutar el comando') + return {} + + if res['contents']: + self.muestraMensaje (2) + inv_sft = res['contents'] + try: + self.interfaceAdmin (nfn, [dsk, par, nci, ipr]) + self.muestraMensaje (9) + herror = 0 + except: + logger.warning ('Error al ejecutar el comando') + self.muestraMensaje (10) + herror = 1 + else: + logger.warning ('Error al ejecutar el comando') + inv_sft = '' + + self.muestraMenu() + + cmd = { + 'nfn': 'RESPUESTA_CrearImagen', + 'idi': idi, ## Identificador de la imagen + 'dsk': dsk, ## Número de disco + 'par': par, ## Número de partición de donde se creó + 'cpt': cpt, ## Tipo o código de partición + 'ipr': ipr, ## Ip del repositorio donde se alojó + 'inv_sft': inv_sft, + } + return self.respuestaEjecucionComando (cmd, herror, ids) + + def do_RestaurarImagen (self, post_params): + for k in ['dsk', 'par', 'idi', 'ipr', 'nci', 'ifs', 'ptc', 'nfn', 'ids']: + if k not in post_params: + logger.error (f'required parameter ({k}) not in POST params') + return {} + + dsk = post_params['dsk'] + par = post_params['par'] + idi = post_params['idi'] + ipr = post_params['ipr'] + nci = post_params['nci'] + ifs = post_params['ifs'] + ptc = post_params['ptc'] ## Protocolo de clonación: Unicast, Multicast, Torrent + nfn = post_params['nfn'] + ids = post_params['ids'] + + self.muestraMensaje (3) + + try: + ## the ptc.split() is useless right now, since interfaceAdmin() does ' '.join(params) in order to spawn a shell + ## however we're going to need it in the future (when everything gets translated into python), plus it's harmless now. So let's do it + #self.interfaceAdmin (nfn, [dsk, par, nci, ipr, ptc]) + self.interfaceAdmin (nfn, [dsk, par, nci, ipr] + ptc.split()) + self.muestraMensaje (11) + herror = 0 + except: + logger.warning ('Error al ejecutar el comando') + self.muestraMensaje (12) + herror = 1 + + cfg = self.LeeConfiguracion() + if not cfg: + logger.warning ('No se ha podido recuperar la configuración de las particiones del disco') + + self.muestraMenu() + + cmd = { + 'nfn': 'RESPUESTA_RestaurarImagen', + 'idi': idi, ## Identificador de la imagen + 'dsk': dsk, ## Número de disco + 'par': par, ## Número de partición + 'ifs': ifs, ## Identificador del perfil software + 'cfg': self.cfg2obj(cfg), ## Configuración de discos + } + return self.respuestaEjecucionComando (cmd, herror, ids) + + def do_Configurar (self, post_params): + for k in ['nfn', 'dsk', 'cfg', 'ids']: + if k not in post_params: + logger.error (f'required parameter ({k}) not in POST params') + return {} + + nfn = post_params['nfn'] + dsk = post_params['dsk'] + cfg = post_params['cfg'] + ids = post_params['ids'] + + self.muestraMensaje (4) + + params = [] + disk_info = cfg.pop (0) + logger.debug (f'disk_info ({disk_info})') + for k in ['dis', 'che', 'tch']: + params.append (f'{k}={disk_info[k]}') + disk_info_str = '*'.join (params) + + partitions = [] + for entry in cfg: + logger.debug (f'entry ({entry})') + params = [] + for k in ['par', 'cpt', 'sfi', 'tam', 'ope']: + params.append (f'{k}={entry[k]}') + partitions.append ('*'.join (params)) + part_info_str = '%'.join (partitions) + + cfg_str = f'{disk_info_str}!{part_info_str}%' + + try: + self.interfaceAdmin (nfn, ['ignored', cfg_str]) + self.muestraMensaje (14) + herror = 0 + except: + logger.warning ('Error al ejecutar el comando') + self.muestraMensaje (13) + herror = 1 + + cfg = self.LeeConfiguracion() + if not cfg: + logger.warning ('No se ha podido recuperar la configuración de las particiones del disco') + return {} + + cmd = { + 'nfn': 'RESPUESTA_Configurar', + 'cfg': self.cfg2obj (cfg), + } + self.muestraMenu() + return self.respuestaEjecucionComando (cmd, herror, ids) + + def do_InventarioHardware (self, post_params): + for k in ['nfn', 'ids']: + if k not in post_params: + logger.error (f'required parameter ({k}) not in POST params') + return {} + + nfn = post_params['nfn'] + ids = post_params['ids'] + + self.muestraMensaje (6) + + hrdsrc = f'/tmp/Chrd-{self.IPlocal}' ## Nombre que tendra el archivo de inventario + hrddst = f'/tmp/Shrd-{self.IPlocal}' ## Nombre que tendra el archivo en el Servidor + try: + self.interfaceAdmin (nfn, [hrdsrc]) + hrdsrc_contents = Path (hrdsrc).read_bytes() + logger.debug (f'hrdsrc_contents 1 ({hrdsrc_contents})') + herror = 0 + except: + logger.warning ('Error al ejecutar el comando') + self.muestraMensaje (18) + herror = 1 + + if herror: + hrddst = '' + else: + logger.debug (f'hrdsrc_contents 2 ({hrdsrc_contents})') + ## Envía fichero de inventario al servidor + res = self.enviaMensajeServidor ('recibeArchivo', { 'nfl': hrddst, 'contents': base64.b64encode (hrdsrc_contents).decode ('utf-8') }) + logger.debug (res) + if not res: + logger.error ('Ha ocurrido algún problema al enviar un archivo por la red') + herror = 12 ## Error de envío de fichero por la red + self.muestraMensaje (17) + + ## Envia respuesta de ejecución de la función de interface + cmd = { + 'nfn': 'RESPUESTA_InventarioHardware', + 'hrd': hrddst, + } + self.muestraMenu() + return self.respuestaEjecucionComando (cmd, herror, ids) + + def do_InventarioSoftware (self, post_params): + for k in ['nfn', 'dsk', 'par', 'ids']: + if k not in post_params: + logger.error (f'required parameter ({k}) not in POST params') + return {} + + nfn = post_params['nfn'] + dsk = post_params['dsk'] + par = post_params['par'] + ids = post_params['ids'] + + self.muestraMensaje (7) + + try: + cmd = self.InventariandoSoftware (dsk, par, 'InventarioSoftware') + herror = 0 + except: + logger.warning ('Error al ejecutar el comando') + cmd = { 'nfn': 'RESPUESTA_InventarioSoftware' } + herror = 1 + + self.muestraMenu() + return self.respuestaEjecucionComando (cmd, herror, ids) + def do_Actualizar (self, post_params): self.muestraMensaje (1) #if !comandosPendientes: error 84 'Ha ocurrido algún problema al reiniciar la sesión del cliente' @@ -504,6 +752,8 @@ class ogAdmClientWorker (ogLiveWorker): self.muestraMenu() return self.respuestaEjecucionComando (cmd, herror, ids) + + def process_Actualizar (self, path, get_params, post_params, server): logger.debug ('in process_Actualizar, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server)) return self._long_running_job ('Actualizar', self.do_Actualizar, args=(post_params,)) @@ -562,7 +812,49 @@ class ogAdmClientWorker (ogLiveWorker): logger.debug ('in process_EjecutaComandosPendientes, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server)) return {'true':'true'} ## ogAdmClient.c:2138 - ## curl --insecure -X POST --data '{"job_id":"foo"}' https://192.168.2.199:8000/ogAdmClient/KillJob + def process_CrearImagen (self, path, get_params, post_params, server): + logger.debug ('in process_CrearImagen, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server)) + logger.debug ('type(post_params) "{}"'.format (type (post_params))) + return self._long_running_job ('CrearImagen', self.do_CrearImagen, args=(post_params,)) + + #def process_CrearImagenBasica (self, path, get_params, post_params, server): + # logger.debug ('in process_CrearImagenBasica, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server)) + # logger.warning ('this method has been removed') + # raise Exception ({ '_httpcode': 404, '_msg': 'This method has been removed' }) + + #def process_CrearSoftIncremental (self, path, get_params, post_params, server): + # logger.debug ('in process_CrearSoftIncremental, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server)) + # logger.warning ('this method has been removed') + # raise Exception ({ '_httpcode': 404, '_msg': 'This method has been removed' }) + + def process_RestaurarImagen (self, path, get_params, post_params, server): + logger.debug ('in process_RestaurarImagen, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server)) + logger.debug ('type(post_params) "{}"'.format (type (post_params))) + return self._long_running_job ('RestaurarImagen', self.do_RestaurarImagen, args=(post_params,)) + + #def process_RestaurarImagenBasica (self, path, get_params, post_params, server): + # logger.debug ('in process_RestaurarImagenBasica, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server)) + # logger.warning ('this method has been removed') + # raise Exception ({ '_httpcode': 404, '_msg': 'This method has been removed' }) + + #def process_RestaurarSoftIncremental (self, path, get_params, post_params, server): + # logger.warning ('in process_RestaurarSoftIncremental') + # logger.debug ('in process_RestaurarSoftIncremental, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server)) + # logger.warning ('this method has been removed') + # raise Exception ({ '_httpcode': 404, '_msg': 'This method has been removed' }) + + def process_Configurar (self, path, get_params, post_params, server): + logger.debug ('in process_Configurar, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server)) + return self._long_running_job ('Configurar', self.do_Configurar, args=(post_params,)) + + def process_InventarioHardware (self, path, get_params, post_params, server): + logger.debug ('in process_InventarioHardware, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server)) + return self._long_running_job ('InventarioHardware', self.do_InventarioHardware, args=(post_params,)) + + def process_InventarioSoftware (self, path, get_params, post_params, server): + logger.debug ('in process_InventarioSoftware, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server)) + return self._long_running_job ('InventarioSoftware', self.do_InventarioSoftware, args=(post_params,)) + def process_KillJob (self, path, get_params, post_params, server): logger.debug ('in process_KillJob, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server)) jid = post_params['job_id'] diff --git a/src/opengnsys/workers/oglive_worker.py b/src/opengnsys/workers/oglive_worker.py index faaeaf5..e365f6d 100644 --- a/src/opengnsys/workers/oglive_worker.py +++ b/src/opengnsys/workers/oglive_worker.py @@ -31,6 +31,7 @@ # pylint: disable=unused-wildcard-import,wildcard-import import os +import re import time import random import subprocess @@ -49,15 +50,16 @@ class ThreadWithResult (threading.Thread): self.result = None if self._target is not None: ## the first arg in self._args is the queue - self.q = self._args[0] - self._args = self._args[1:] + self.pid_q = self._args[0] + self.stdout_q = self._args[1] + self._args = self._args[2:] try: self.result = self._target (*self._args, **self._kwargs) except Exception as e: self.result = { 'res': 2, 'der': f'got exception: ({e})' } ## res=2 as defined in ogAdmClient.c:2048 finally: # Avoid a refcycle if the thread is running a function with an argument that has a member that points to the thread. - del self._target, self._args, self._kwargs, self.q + del self._target, self._args, self._kwargs, self.pid_q, self.stdout_q class ogLiveWorker(ServerWorker): thread_list = {} @@ -143,7 +145,6 @@ class ogLiveWorker(ServerWorker): ] def notifier (self, job_id, result): - logger.debug (f'notifier() called, job_id ({job_id}) result ({result})') result['job_id'] = job_id self.REST.sendMessage ('clients/status/webhook', result) @@ -193,6 +194,14 @@ class ogLiveWorker(ServerWorker): return { 'res':success, 'der':msg } + def _extract_progress (self, job_id, ary=[]): + progress = None + for i in ary: + if m := re.search (r'^\[([0-9]+)\]', i): ## look for strings like '[10]', '[60]' + logger.debug (f"matched regex, m.groups ({m.groups()})") + progress = float (m.groups()[0]) / 100 + return progress + def mon (self): while True: with self.thread_lock: @@ -201,12 +210,21 @@ class ogLiveWorker(ServerWorker): if 'thread' not in elem: continue logger.debug (f'considering thread ({k})') - if self.q: - if not self.q.empty(): - elem['child_pid'] = self.q.get() + if self.pid_q: + if not self.pid_q.empty(): + elem['child_pid'] = self.pid_q.get() logger.debug (f'queue not empty, got pid ({elem["child_pid"]})') - else: - logger.debug (f'queue empty') + + if self.stdout_q: + partial = '' + while not self.stdout_q.empty(): + partial += self.stdout_q.get() + lines = partial.splitlines() + if len (lines): + p = self._extract_progress (k, lines) + if p: + m = { "job_id": k, "progress": p } + self.REST.sendMessage ('clients/status/webhook', { "job_id": k, "progress": p }) elem['thread'].join (0.05) if not elem['thread'].is_alive(): @@ -221,8 +239,10 @@ class ogLiveWorker(ServerWorker): def interfaceAdmin (self, method, parametros=[]): exe = '{}/{}'.format (self.pathinterface, method) ## for development only. Will be removed when the referenced bash code (/opt/opengnsys/lib/engine/bin/*.lib) is translated into python - devel_bash_prefix = ''' + LANG = os.environ.get ('LANG', 'en_GB.UTF-8').replace ('UTF_8', 'UTF-8') + devel_bash_prefix = f''' PATH=/opt/opengnsys/scripts/:$PATH; + source /opt/opengnsys/etc/lang.{LANG}.conf; for I in /opt/opengnsys/lib/engine/bin/*.lib; do source $I; done; for i in $(declare -F |cut -f3 -d" "); do export -f $i; done; ''' @@ -234,15 +254,21 @@ class ogLiveWorker(ServerWorker): logger.debug ('subprocess.run ("{}", capture_output=True)'.format (proc)) p = subprocess.Popen (proc, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - if self.q: - self.q.put (p.pid) + if self.pid_q: + self.pid_q.put (p.pid) else: ## esto sucede por ejemplo cuando arranca el agente, que estamos en interfaceAdmin() en el mismo hilo, sin _long_running_job ni hilo separado logger.debug ('no queue--not writing any PID to it') + sout = serr = '' while p.poll() is None: - for l in iter (p.stdout.readline, b''): sout += l.decode ('utf-8', 'ignore') - for l in iter (p.stderr.readline, b''): serr += l.decode ('utf-8', 'ignore') + for l in iter (p.stdout.readline, b''): + partial = l.decode ('utf-8', 'ignore') + if self.stdout_q: self.stdout_q.put (partial) + sout += partial + for l in iter (p.stderr.readline, b''): + partial = l.decode ('utf-8', 'ignore') + serr += partial time.sleep (1) sout = sout.strip() serr = serr.strip() @@ -361,7 +387,7 @@ class ogLiveWorker(ServerWorker): return obj - def onActivation (self, run_monitoring_thread): + def onActivation (self): if not os.path.exists ('/scripts/oginit'): ## no estamos en oglive, este modulo no debe cargarse ## esta lógica la saco de src/opengnsys/linux/operations.py, donde hay un if similar @@ -376,7 +402,9 @@ class ogLiveWorker(ServerWorker): self.idproautoexec = None self.idcentro = None ## Identificador del centro self.idaula = None ## Identificador del aula - self.q = None ## for passing PIDs around + self.pid_q = None ## for passing PIDs around + self.stdout_q = None ## for passing stdout + self.progress_jobs = {} ogcore_scheme = os.environ.get ('OGAGENTCFG_OGCORE_SCHEME', 'https') ogcore_ip = os.environ.get ('OGAGENTCFG_OGCORE_IP', '192.168.2.1') @@ -407,8 +435,7 @@ class ogLiveWorker(ServerWorker): if not self.tomaMAClocal(): raise Exception ('Se han generado errores. No se puede continuar la ejecución de este módulo') - if run_monitoring_thread: ## should be true for exactly one ogLiveWorker - threading.Thread (name='monitoring_thread', target=self.mon, daemon=True).start() + threading.Thread (name='monitoring_thread', target=self.mon, daemon=True).start() def _long_running_job (self, name, f, args): any_job_running = False @@ -422,9 +449,10 @@ class ogLiveWorker(ServerWorker): job_id = '{}-{}'.format (name, ''.join (random.choice ('0123456789abcdef') for _ in range (8))) import queue - self.q = queue.Queue() ## a single queue works for us because we never have more than one long_running_job at the same time + self.pid_q = queue.Queue() ## a single queue works for us because we never have more than one long_running_job at the same time + self.stdout_q = queue.Queue() self.thread_list[job_id] = { - 'thread': ThreadWithResult (target=f, args=(self.q,) + args), + 'thread': ThreadWithResult (target=f, args=(self.pid_q, self.stdout_q) + args), 'starttime': time.time(), 'child_pid': None, 'running': True, @@ -434,10 +462,10 @@ class ogLiveWorker(ServerWorker): return { 'job_id': job_id } ## para matar threads tengo lo siguiente: -## - aqui en _long_running_job meto una cola en self.q -## - (self.q fue inicializado a None al instanciar el objeto, para evitar error "objeto no tiene 'q'") +## - aqui en _long_running_job meto una cola en self.pid_q +## - (self.pid_q fue inicializado a None al instanciar el objeto, para evitar error "objeto no tiene 'pid_q'") ## - en el thread_list también tengo un child_pid para almacenar el pid de los procesos hijos -## - al crear el ThreadWithResult le paso la cola, y luego en run() la recojo y la meto en el self.q del thread +## - al crear el ThreadWithResult le paso la cola, y luego en run() la recojo y la meto en el self.pid_q del thread ## - en interfaceAdmin() al hacer subprocess.Popen(), recojo el pid y lo escribo en la queue ## - en mon() recojo pids de la queue y los meto en thread_list 'child_pid' ## - algunas funciones llaman a interfaceAdmin más de una vez, y escriben más de un pid en la cola, y en mon() voy recogiendo y actualizando @@ -448,6 +476,11 @@ class ogLiveWorker(ServerWorker): ## ## $ curl --insecure -X POST --data '{"nfn":"EjecutarScript","scp":"cd /usr; sleep 30; pwd; ls","ids":"0"}' https://192.168.2.199:8000/ogAdmClient/EjecutarScript ## {"job_id": "EjecutarScript-333feb3f"} -## $ curl --insecure -X POST --data '{"job_id":"EjecutarScript-333feb3f"}' https://192.168.2.199:8000/CloningEngine/KillJob +## $ curl --insecure -X POST --data '{"job_id":"EjecutarScript-333feb3f"}' https://192.168.2.199:8000/ogAdmClient/KillJob ## ## funciona bien, excepto que el PID no muere xD, ni siquiera haciendo subprocess.run('kill') + +## para mostrar el progreso de los jobs reutilizo la misma infra +## una cola self.stdout_q +## en interfaceAdmin escribo la stdout parcial que ya venia recogiendo +## mon() lo recoge y le hace un POST a ogcore