Report progress, unify modules #15

Merged
nserrano merged 8 commits from report-progress into main 2025-03-12 11:39:58 +01:00
7 changed files with 389 additions and 407 deletions

View File

@ -1,4 +1,23 @@
ogagent (1.4.6-1) UNRELEASED; urgency=medium
ogagent (1.4.9-1) stable; urgency=medium
* Notify ogcore when agent shuts down within oglive
-- OpenGnsys developers <info@opengnsys.es> Thu, 20 Feb 2025 11:58:29 +0100
ogagent (1.4.8-1) stable; urgency=medium
* Optionally return disk config in /status
-- OpenGnsys developers <info@opengnsys.es> Tue, 18 Feb 2025 13:48:54 +0100
ogagent (1.4.7-1) stable; urgency=medium
* Merge server modules
* Track the progress of children
-- OpenGnsys developers <info@opengnsys.es> Tue, 04 Feb 2025 14:12:19 +0100
ogagent (1.4.6-1) stable; urgency=medium
* Point to the new menu browser

View File

@ -13,7 +13,7 @@ ogausr_a = Analysis(
# ('cfg', 'cfg'), ## add the entire directory
('img', 'img'), ## add the entire directory
],
hiddenimports=['win32timezone', 'socketserver', 'http.server', 'urllib', 'opengnsys.modules.client.OpenGnSys', 'opengnsys.modules.server.CloningEngine', 'opengnsys.modules.server.ogAdmClient', 'opengnsys.modules.server.OpenGnSys'],
hiddenimports=['win32timezone', 'socketserver', 'http.server', 'urllib', 'opengnsys.modules.client.OpenGnSys', 'opengnsys.modules.server.ogAdmClient', 'opengnsys.modules.server.OpenGnSys'],
hookspath=[],
hooksconfig={},
runtime_hooks=[],
@ -26,7 +26,7 @@ ogasvc_a = Analysis(
pathex=[],
binaries=[],
datas=[],
hiddenimports=['win32timezone', 'socketserver', 'http.server', 'urllib', 'opengnsys.modules.client.OpenGnSys', 'opengnsys.modules.server.CloningEngine', 'opengnsys.modules.server.ogAdmClient', 'opengnsys.modules.server.OpenGnSys'],
hiddenimports=['win32timezone', 'socketserver', 'http.server', 'urllib', 'opengnsys.modules.client.OpenGnSys', 'opengnsys.modules.server.ogAdmClient', 'opengnsys.modules.server.OpenGnSys'],
hookspath=[],
hooksconfig={},
runtime_hooks=[],

View File

@ -1 +1 @@
1.4.6
1.4.9

View File

@ -28,10 +28,3 @@ log=DEBUG
pathinterface=/opt/opengnsys/interfaceAdm
urlMenu={}://{}/menu-browser
urlMsg=http://localhost/cgi-bin/httpd-log.sh
[CloningEngine]
remote={}://{}/opengnsys/rest
log=DEBUG
pathinterface=/opt/opengnsys/interfaceAdm
urlMenu={}://{}/menu-browser
urlMsg=http://localhost/cgi-bin/httpd-log.sh

View File

@ -1,355 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright (c) 2024 Qindel Formación y Servicios S.L.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
@author: Natalia Serrano, nserrano at qindel dot com
"""
import base64
import os
from pathlib import Path
from opengnsys.log import logger
from opengnsys.workers import ogLiveWorker
class CloningEngineWorker (ogLiveWorker):
name = 'CloningEngine' # Module name
REST = None # REST object
def onActivation (self):
super().onActivation (run_monitoring_thread=False)
logger.info ('onActivation ok')
def onDeactivation (self):
logger.debug ('onDeactivation')
def InventariandoSoftware (self, dsk, par, nfn):
sft_src = f'/tmp/CSft-{self.IPlocal}-{par}'
try:
self.interfaceAdmin (nfn, [dsk, par, sft_src])
herror = 0
except:
herror = 1
if herror:
logger.warning ('Error al ejecutar el comando')
b64 = ''
self.muestraMensaje (20)
else:
if not os.path.exists (sft_src):
raise Exception (f'interfaceAdmin({nfn}) returned success but did not create file ({sft_src})')
sft_src_contents = Path (sft_src).read_bytes()
b64 = base64.b64encode (sft_src_contents).decode ('utf-8')
self.muestraMensaje (19)
cmd = {
'nfn': 'RESPUESTA_InventarioSoftware',
'dsk': dsk, ## not in the original C code, around ogAdmClient.c:1944
'par': par,
'contents': b64,
}
return self.respuestaEjecucionComando (cmd, herror, 0)
def do_CrearImagen (self, post_params):
for k in ['dsk', 'par', 'cpt', 'idi', 'nci', 'ipr', 'nfn', 'ids']:
if k not in post_params:
logger.error (f'required parameter ({k}) not in POST params')
return {}
dsk = post_params['dsk'] ## Disco
par = post_params['par'] ## Número de partición
cpt = post_params['cpt'] ## Código de la partición
idi = post_params['idi'] ## Identificador de la imagen
nci = post_params['nci'] ## Nombre canónico de la imagen
ipr = post_params['ipr'] ## Ip del repositorio
nfn = post_params['nfn']
ids = post_params['ids']
self.muestraMensaje (7)
try:
res = self.InventariandoSoftware (dsk, par, 'InventarioSoftware') ## Crea inventario Software previamente
except:
logger.warning ('Error al ejecutar el comando')
return {}
if res['contents']:
self.muestraMensaje (2)
inv_sft = res['contents']
try:
self.interfaceAdmin (nfn, [dsk, par, nci, ipr])
self.muestraMensaje (9)
herror = 0
except:
logger.warning ('Error al ejecutar el comando')
self.muestraMensaje (10)
herror = 1
else:
logger.warning ('Error al ejecutar el comando')
inv_sft = ''
self.muestraMenu()
cmd = {
'nfn': 'RESPUESTA_CrearImagen',
'idi': idi, ## Identificador de la imagen
'dsk': dsk, ## Número de disco
'par': par, ## Número de partición de donde se creó
'cpt': cpt, ## Tipo o código de partición
'ipr': ipr, ## Ip del repositorio donde se alojó
'inv_sft': inv_sft,
}
return self.respuestaEjecucionComando (cmd, herror, ids)
def do_RestaurarImagen (self, post_params):
for k in ['dsk', 'par', 'idi', 'ipr', 'nci', 'ifs', 'ptc', 'nfn', 'ids']:
if k not in post_params:
logger.error (f'required parameter ({k}) not in POST params')
return {}
dsk = post_params['dsk']
par = post_params['par']
idi = post_params['idi']
ipr = post_params['ipr']
nci = post_params['nci']
ifs = post_params['ifs']
ptc = post_params['ptc'] ## Protocolo de clonación: Unicast, Multicast, Torrent
nfn = post_params['nfn']
ids = post_params['ids']
self.muestraMensaje (3)
try:
## the ptc.split() is useless right now, since interfaceAdmin() does ' '.join(params) in order to spawn a shell
## however we're going to need it in the future (when everything gets translated into python), plus it's harmless now. So let's do it
#self.interfaceAdmin (nfn, [dsk, par, nci, ipr, ptc])
self.interfaceAdmin (nfn, [dsk, par, nci, ipr] + ptc.split())
self.muestraMensaje (11)
herror = 0
except:
logger.warning ('Error al ejecutar el comando')
self.muestraMensaje (12)
herror = 1
cfg = self.LeeConfiguracion()
if not cfg:
logger.warning ('No se ha podido recuperar la configuración de las particiones del disco')
self.muestraMenu()
cmd = {
'nfn': 'RESPUESTA_RestaurarImagen',
'idi': idi, ## Identificador de la imagen
'dsk': dsk, ## Número de disco
'par': par, ## Número de partición
'ifs': ifs, ## Identificador del perfil software
'cfg': self.cfg2obj(cfg), ## Configuración de discos
}
return self.respuestaEjecucionComando (cmd, herror, ids)
def process_status (self, path, get_params, post_params, server):
thr_status = {}
for k in self.thread_list:
thr_status[k] = {
'running': self.thread_list[k]['running'],
'result': self.thread_list[k]['result'],
}
return thr_status
def do_Configurar (self, post_params):
for k in ['nfn', 'dsk', 'cfg', 'ids']:
if k not in post_params:
logger.error (f'required parameter ({k}) not in POST params')
return {}
nfn = post_params['nfn']
dsk = post_params['dsk']
cfg = post_params['cfg']
ids = post_params['ids']
self.muestraMensaje (4)
params = []
disk_info = cfg.pop (0)
logger.debug (f'disk_info ({disk_info})')
for k in ['dis', 'che', 'tch']:
params.append (f'{k}={disk_info[k]}')
disk_info_str = '*'.join (params)
partitions = []
for entry in cfg:
logger.debug (f'entry ({entry})')
params = []
for k in ['par', 'cpt', 'sfi', 'tam', 'ope']:
params.append (f'{k}={entry[k]}')
partitions.append ('*'.join (params))
part_info_str = '%'.join (partitions)
cfg_str = f'{disk_info_str}!{part_info_str}%'
try:
self.interfaceAdmin (nfn, ['ignored', cfg_str])
self.muestraMensaje (14)
herror = 0
except:
logger.warning ('Error al ejecutar el comando')
self.muestraMensaje (13)
herror = 1
cfg = self.LeeConfiguracion()
if not cfg:
logger.warning ('No se ha podido recuperar la configuración de las particiones del disco')
return {}
cmd = {
'nfn': 'RESPUESTA_Configurar',
'cfg': self.cfg2obj (cfg),
}
self.muestraMenu()
return self.respuestaEjecucionComando (cmd, herror, ids)
def do_InventarioHardware (self, post_params):
for k in ['nfn', 'ids']:
if k not in post_params:
logger.error (f'required parameter ({k}) not in POST params')
return {}
nfn = post_params['nfn']
ids = post_params['ids']
self.muestraMensaje (6)
hrdsrc = f'/tmp/Chrd-{self.IPlocal}' ## Nombre que tendra el archivo de inventario
hrddst = f'/tmp/Shrd-{self.IPlocal}' ## Nombre que tendra el archivo en el Servidor
try:
self.interfaceAdmin (nfn, [hrdsrc])
hrdsrc_contents = Path (hrdsrc).read_bytes()
logger.debug (f'hrdsrc_contents 1 ({hrdsrc_contents})')
herror = 0
except:
logger.warning ('Error al ejecutar el comando')
self.muestraMensaje (18)
herror = 1
if herror:
hrddst = ''
else:
logger.debug (f'hrdsrc_contents 2 ({hrdsrc_contents})')
## Envía fichero de inventario al servidor
res = self.enviaMensajeServidor ('recibeArchivo', { 'nfl': hrddst, 'contents': base64.b64encode (hrdsrc_contents).decode ('utf-8') })
logger.debug (res)
if not res:
logger.error ('Ha ocurrido algún problema al enviar un archivo por la red')
herror = 12 ## Error de envío de fichero por la red
self.muestraMensaje (17)
## Envia respuesta de ejecución de la función de interface
cmd = {
'nfn': 'RESPUESTA_InventarioHardware',
'hrd': hrddst,
}
self.muestraMenu()
return self.respuestaEjecucionComando (cmd, herror, ids)
def do_InventarioSoftware (self, post_params):
for k in ['nfn', 'dsk', 'par', 'ids']:
if k not in post_params:
logger.error (f'required parameter ({k}) not in POST params')
return {}
nfn = post_params['nfn']
dsk = post_params['dsk']
par = post_params['par']
ids = post_params['ids']
self.muestraMensaje (7)
try:
cmd = self.InventariandoSoftware (dsk, par, 'InventarioSoftware')
herror = 0
except:
logger.warning ('Error al ejecutar el comando')
cmd = { 'nfn': 'RESPUESTA_InventarioSoftware' }
herror = 1
self.muestraMenu()
return self.respuestaEjecucionComando (cmd, herror, ids)
def process_CrearImagen (self, path, get_params, post_params, server):
logger.debug ('in process_CrearImagen, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server))
logger.debug ('type(post_params) "{}"'.format (type (post_params)))
return self._long_running_job ('CrearImagen', self.do_CrearImagen, args=(post_params,))
def process_CrearImagenBasica (self, path, get_params, post_params, server):
logger.debug ('in process_CrearImagenBasica, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server))
logger.warning ('this method has been removed')
raise Exception ({ '_httpcode': 404, '_msg': 'This method has been removed' })
def process_CrearSoftIncremental (self, path, get_params, post_params, server):
logger.debug ('in process_CrearSoftIncremental, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server))
logger.warning ('this method has been removed')
raise Exception ({ '_httpcode': 404, '_msg': 'This method has been removed' })
def process_RestaurarImagen (self, path, get_params, post_params, server):
logger.debug ('in process_RestaurarImagen, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server))
logger.debug ('type(post_params) "{}"'.format (type (post_params)))
return self._long_running_job ('RestaurarImagen', self.do_RestaurarImagen, args=(post_params,))
def process_RestaurarImagenBasica (self, path, get_params, post_params, server):
logger.debug ('in process_RestaurarImagenBasica, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server))
logger.warning ('this method has been removed')
raise Exception ({ '_httpcode': 404, '_msg': 'This method has been removed' })
def process_RestaurarSoftIncremental (self, path, get_params, post_params, server):
logger.warning ('in process_RestaurarSoftIncremental')
logger.debug ('in process_RestaurarSoftIncremental, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server))
logger.warning ('this method has been removed')
raise Exception ({ '_httpcode': 404, '_msg': 'This method has been removed' })
def process_Configurar (self, path, get_params, post_params, server):
logger.debug ('in process_Configurar, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server))
return self._long_running_job ('Configurar', self.do_Configurar, args=(post_params,))
def process_InventarioHardware (self, path, get_params, post_params, server):
logger.debug ('in process_InventarioHardware, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server))
return self._long_running_job ('InventarioHardware', self.do_InventarioHardware, args=(post_params,))
def process_InventarioSoftware (self, path, get_params, post_params, server):
logger.debug ('in process_InventarioSoftware, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server))
return self._long_running_job ('InventarioSoftware', self.do_InventarioSoftware, args=(post_params,))
## curl --insecure -X POST --data '{"job_id":"foo"}' https://192.168.2.199:8000/CloningEngine/KillJob
def process_KillJob (self, path, get_params, post_params, server):
logger.debug ('in process_KillJob, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server))
jid = post_params['job_id']
r = self.killer (jid)
logger.debug (f'r bef ({r})')
r.update ({ 'nfn':'RESPUESTA_KillJob', 'job':jid })
logger.debug (f'r aft ({r})')
return r

View File

@ -2,7 +2,7 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2014 Virtual Cable S.L.
# Copyright (c) 2024 Qindel Formación y Servicios S.L.
# Copyright (c) 2024-2025 Qindel Formación y Servicios S.L.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
@ -41,7 +41,6 @@ import subprocess
from pathlib import Path
from urllib.parse import unquote
#from opengnsys import operations
from opengnsys.log import logger
from opengnsys.workers import ogLiveWorker
@ -69,7 +68,6 @@ def check_secret (fnc):
class ogAdmClientWorker (ogLiveWorker):
name = 'ogAdmClient' # Module name
#interface = None # Bound interface for OpenGnsys (el otro modulo lo usa para obtener .ip y .mac
REST = None # REST object
def onDeactivation (self):
@ -77,6 +75,8 @@ class ogAdmClientWorker (ogLiveWorker):
Sends OGAgent stopping notification to OpenGnsys server
"""
logger.debug ('onDeactivation')
self.REST.sendMessage ('ogAdmClient/stopped', {'mac': self.mac, 'ip': self.IPlocal, 'idcentro': self.idcentro, 'idaula': self.idaula,
'idordenador': self.idordenador, 'nombreordenador': self.nombreordenador})
#def processClientMessage (self, message, data):
# logger.debug ('Got OpenGnsys message from client: {}, data {}'.format (message, data))
@ -124,18 +124,46 @@ class ogAdmClientWorker (ogLiveWorker):
# threading.Thread (target=pwoff).start()
# return {'op': 'launched'}
## process_* are invoked from opengnsys/httpserver.py:99 "data = module.processServerMessage (path, get_params, post_params, self)" (via opengnsys/workers/server_worker.py)
## process_client_* are invoked from opengnsys/service.py:123 "v.processClientMessage (message, json.loads (data))" (via opengnsys/workers/server_worker.py)
def InventariandoSoftware (self, dsk, par, nfn):
sft_src = f'/tmp/CSft-{self.IPlocal}-{par}'
try:
self.interfaceAdmin (nfn, [dsk, par, sft_src])
herror = 0
except:
herror = 1
if herror:
logger.warning ('Error al ejecutar el comando')
b64 = ''
self.muestraMensaje (20)
else:
if not os.path.exists (sft_src):
raise Exception (f'interfaceAdmin({nfn}) returned success but did not create file ({sft_src})')
sft_src_contents = Path (sft_src).read_bytes()
b64 = base64.b64encode (sft_src_contents).decode ('utf-8')
self.muestraMensaje (19)
cmd = {
'nfn': 'RESPUESTA_InventarioSoftware',
'dsk': dsk, ## not in the original C code, around ogAdmClient.c:1944
'par': par,
'contents': b64,
}
return self.respuestaEjecucionComando (cmd, herror, 0)
def ejecutaArchivo (self,fn):
logger.debug ('fn ({})'.format (fn))
@ -279,7 +307,7 @@ class ogAdmClientWorker (ogLiveWorker):
#}
def onActivation (self):
super().onActivation (run_monitoring_thread=True)
super().onActivation()
logger.info ('Inicio de sesion')
logger.info ('Abriendo sesión en el servidor de Administración')
if (not self.inclusionCliente()):
@ -309,20 +337,24 @@ class ogAdmClientWorker (ogLiveWorker):
@check_secret
def process_status (self, path, get_params, post_params, server):
logger.debug ('in process_status, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server))
cfg = self.LeeConfiguracion()
full_config = 'full-config' in post_params and post_params['full-config']
thr_status = {}
for k in self.thread_list:
thr_status[k] = {
'running': self.thread_list[k]['running'],
'result': self.thread_list[k]['result'],
}
return {
ret = {
'nfn': 'RESPUESTA_status',
'mac': self.mac,
'st': 'OGL',
'ip': self.IPlocal,
'cfg': self.cfg2obj (cfg),
'threads': thr_status,
}
if full_config:
cfg = self.LeeConfiguracion()
ret['cfg'] = self.cfg2obj (cfg)
return ret
@check_secret
def process_popup (self, path, get_params, post_params, server):
@ -332,6 +364,222 @@ class ogAdmClientWorker (ogLiveWorker):
## type(post_params) "<class 'dict'>"
return {'debug':'test'}
def do_CrearImagen (self, post_params):
for k in ['dsk', 'par', 'cpt', 'idi', 'nci', 'ipr', 'nfn', 'ids']:
if k not in post_params:
logger.error (f'required parameter ({k}) not in POST params')
return {}
dsk = post_params['dsk'] ## Disco
par = post_params['par'] ## Número de partición
cpt = post_params['cpt'] ## Código de la partición
idi = post_params['idi'] ## Identificador de la imagen
nci = post_params['nci'] ## Nombre canónico de la imagen
ipr = post_params['ipr'] ## Ip del repositorio
nfn = post_params['nfn']
ids = post_params['ids']
self.muestraMensaje (7)
try:
res = self.InventariandoSoftware (dsk, par, 'InventarioSoftware') ## Crea inventario Software previamente
except:
logger.warning ('Error al ejecutar el comando')
return {}
if res['contents']:
self.muestraMensaje (2)
inv_sft = res['contents']
try:
self.interfaceAdmin (nfn, [dsk, par, nci, ipr])
self.muestraMensaje (9)
herror = 0
except:
logger.warning ('Error al ejecutar el comando')
self.muestraMensaje (10)
herror = 1
else:
logger.warning ('Error al ejecutar el comando')
inv_sft = ''
self.muestraMenu()
cmd = {
'nfn': 'RESPUESTA_CrearImagen',
'idi': idi, ## Identificador de la imagen
'dsk': dsk, ## Número de disco
'par': par, ## Número de partición de donde se creó
'cpt': cpt, ## Tipo o código de partición
'ipr': ipr, ## Ip del repositorio donde se alojó
'inv_sft': inv_sft,
}
return self.respuestaEjecucionComando (cmd, herror, ids)
def do_RestaurarImagen (self, post_params):
for k in ['dsk', 'par', 'idi', 'ipr', 'nci', 'ifs', 'ptc', 'nfn', 'ids']:
if k not in post_params:
logger.error (f'required parameter ({k}) not in POST params')
return {}
dsk = post_params['dsk']
par = post_params['par']
idi = post_params['idi']
ipr = post_params['ipr']
nci = post_params['nci']
ifs = post_params['ifs']
ptc = post_params['ptc'] ## Protocolo de clonación: Unicast, Multicast, Torrent
nfn = post_params['nfn']
ids = post_params['ids']
self.muestraMensaje (3)
try:
## the ptc.split() is useless right now, since interfaceAdmin() does ' '.join(params) in order to spawn a shell
## however we're going to need it in the future (when everything gets translated into python), plus it's harmless now. So let's do it
#self.interfaceAdmin (nfn, [dsk, par, nci, ipr, ptc])
self.interfaceAdmin (nfn, [dsk, par, nci, ipr] + ptc.split())
self.muestraMensaje (11)
herror = 0
except:
logger.warning ('Error al ejecutar el comando')
self.muestraMensaje (12)
herror = 1
cfg = self.LeeConfiguracion()
if not cfg:
logger.warning ('No se ha podido recuperar la configuración de las particiones del disco')
self.muestraMenu()
cmd = {
'nfn': 'RESPUESTA_RestaurarImagen',
'idi': idi, ## Identificador de la imagen
'dsk': dsk, ## Número de disco
'par': par, ## Número de partición
'ifs': ifs, ## Identificador del perfil software
'cfg': self.cfg2obj(cfg), ## Configuración de discos
}
return self.respuestaEjecucionComando (cmd, herror, ids)
def do_Configurar (self, post_params):
for k in ['nfn', 'dsk', 'cfg', 'ids']:
if k not in post_params:
logger.error (f'required parameter ({k}) not in POST params')
return {}
nfn = post_params['nfn']
dsk = post_params['dsk']
cfg = post_params['cfg']
ids = post_params['ids']
self.muestraMensaje (4)
params = []
disk_info = cfg.pop (0)
logger.debug (f'disk_info ({disk_info})')
for k in ['dis', 'che', 'tch']:
params.append (f'{k}={disk_info[k]}')
disk_info_str = '*'.join (params)
partitions = []
for entry in cfg:
logger.debug (f'entry ({entry})')
params = []
for k in ['par', 'cpt', 'sfi', 'tam', 'ope']:
params.append (f'{k}={entry[k]}')
partitions.append ('*'.join (params))
part_info_str = '%'.join (partitions)
cfg_str = f'{disk_info_str}!{part_info_str}%'
try:
self.interfaceAdmin (nfn, ['ignored', cfg_str])
self.muestraMensaje (14)
herror = 0
except:
logger.warning ('Error al ejecutar el comando')
self.muestraMensaje (13)
herror = 1
cfg = self.LeeConfiguracion()
if not cfg:
logger.warning ('No se ha podido recuperar la configuración de las particiones del disco')
return {}
cmd = {
'nfn': 'RESPUESTA_Configurar',
'cfg': self.cfg2obj (cfg),
}
self.muestraMenu()
return self.respuestaEjecucionComando (cmd, herror, ids)
def do_InventarioHardware (self, post_params):
for k in ['nfn', 'ids']:
if k not in post_params:
logger.error (f'required parameter ({k}) not in POST params')
return {}
nfn = post_params['nfn']
ids = post_params['ids']
self.muestraMensaje (6)
hrdsrc = f'/tmp/Chrd-{self.IPlocal}' ## Nombre que tendra el archivo de inventario
hrddst = f'/tmp/Shrd-{self.IPlocal}' ## Nombre que tendra el archivo en el Servidor
try:
self.interfaceAdmin (nfn, [hrdsrc])
hrdsrc_contents = Path (hrdsrc).read_bytes()
logger.debug (f'hrdsrc_contents 1 ({hrdsrc_contents})')
herror = 0
except:
logger.warning ('Error al ejecutar el comando')
self.muestraMensaje (18)
herror = 1
if herror:
hrddst = ''
else:
logger.debug (f'hrdsrc_contents 2 ({hrdsrc_contents})')
## Envía fichero de inventario al servidor
res = self.enviaMensajeServidor ('recibeArchivo', { 'nfl': hrddst, 'contents': base64.b64encode (hrdsrc_contents).decode ('utf-8') })
logger.debug (res)
if not res:
logger.error ('Ha ocurrido algún problema al enviar un archivo por la red')
herror = 12 ## Error de envío de fichero por la red
self.muestraMensaje (17)
## Envia respuesta de ejecución de la función de interface
cmd = {
'nfn': 'RESPUESTA_InventarioHardware',
'hrd': hrddst,
}
self.muestraMenu()
return self.respuestaEjecucionComando (cmd, herror, ids)
def do_InventarioSoftware (self, post_params):
for k in ['nfn', 'dsk', 'par', 'ids']:
if k not in post_params:
logger.error (f'required parameter ({k}) not in POST params')
return {}
nfn = post_params['nfn']
dsk = post_params['dsk']
par = post_params['par']
ids = post_params['ids']
self.muestraMensaje (7)
try:
cmd = self.InventariandoSoftware (dsk, par, 'InventarioSoftware')
herror = 0
except:
logger.warning ('Error al ejecutar el comando')
cmd = { 'nfn': 'RESPUESTA_InventarioSoftware' }
herror = 1
self.muestraMenu()
return self.respuestaEjecucionComando (cmd, herror, ids)
def do_Actualizar (self, post_params):
self.muestraMensaje (1)
#if !comandosPendientes: error 84 'Ha ocurrido algún problema al reiniciar la sesión del cliente'
@ -504,6 +752,8 @@ class ogAdmClientWorker (ogLiveWorker):
self.muestraMenu()
return self.respuestaEjecucionComando (cmd, herror, ids)
def process_Actualizar (self, path, get_params, post_params, server):
logger.debug ('in process_Actualizar, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server))
return self._long_running_job ('Actualizar', self.do_Actualizar, args=(post_params,))
@ -562,7 +812,49 @@ class ogAdmClientWorker (ogLiveWorker):
logger.debug ('in process_EjecutaComandosPendientes, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server))
return {'true':'true'} ## ogAdmClient.c:2138
## curl --insecure -X POST --data '{"job_id":"foo"}' https://192.168.2.199:8000/ogAdmClient/KillJob
def process_CrearImagen (self, path, get_params, post_params, server):
logger.debug ('in process_CrearImagen, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server))
logger.debug ('type(post_params) "{}"'.format (type (post_params)))
return self._long_running_job ('CrearImagen', self.do_CrearImagen, args=(post_params,))
#def process_CrearImagenBasica (self, path, get_params, post_params, server):
# logger.debug ('in process_CrearImagenBasica, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server))
# logger.warning ('this method has been removed')
# raise Exception ({ '_httpcode': 404, '_msg': 'This method has been removed' })
#def process_CrearSoftIncremental (self, path, get_params, post_params, server):
# logger.debug ('in process_CrearSoftIncremental, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server))
# logger.warning ('this method has been removed')
# raise Exception ({ '_httpcode': 404, '_msg': 'This method has been removed' })
def process_RestaurarImagen (self, path, get_params, post_params, server):
logger.debug ('in process_RestaurarImagen, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server))
logger.debug ('type(post_params) "{}"'.format (type (post_params)))
return self._long_running_job ('RestaurarImagen', self.do_RestaurarImagen, args=(post_params,))
#def process_RestaurarImagenBasica (self, path, get_params, post_params, server):
# logger.debug ('in process_RestaurarImagenBasica, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server))
# logger.warning ('this method has been removed')
# raise Exception ({ '_httpcode': 404, '_msg': 'This method has been removed' })
#def process_RestaurarSoftIncremental (self, path, get_params, post_params, server):
# logger.warning ('in process_RestaurarSoftIncremental')
# logger.debug ('in process_RestaurarSoftIncremental, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server))
# logger.warning ('this method has been removed')
# raise Exception ({ '_httpcode': 404, '_msg': 'This method has been removed' })
def process_Configurar (self, path, get_params, post_params, server):
logger.debug ('in process_Configurar, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server))
return self._long_running_job ('Configurar', self.do_Configurar, args=(post_params,))
def process_InventarioHardware (self, path, get_params, post_params, server):
logger.debug ('in process_InventarioHardware, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server))
return self._long_running_job ('InventarioHardware', self.do_InventarioHardware, args=(post_params,))
def process_InventarioSoftware (self, path, get_params, post_params, server):
logger.debug ('in process_InventarioSoftware, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server))
return self._long_running_job ('InventarioSoftware', self.do_InventarioSoftware, args=(post_params,))
def process_KillJob (self, path, get_params, post_params, server):
logger.debug ('in process_KillJob, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server))
jid = post_params['job_id']

View File

@ -31,6 +31,7 @@
# pylint: disable=unused-wildcard-import,wildcard-import
import os
import re
import time
import random
import subprocess
@ -49,15 +50,16 @@ class ThreadWithResult (threading.Thread):
self.result = None
if self._target is not None:
## the first arg in self._args is the queue
self.q = self._args[0]
self._args = self._args[1:]
self.pid_q = self._args[0]
self.stdout_q = self._args[1]
self._args = self._args[2:]
try:
self.result = self._target (*self._args, **self._kwargs)
except Exception as e:
self.result = { 'res': 2, 'der': f'got exception: ({e})' } ## res=2 as defined in ogAdmClient.c:2048
finally:
# Avoid a refcycle if the thread is running a function with an argument that has a member that points to the thread.
del self._target, self._args, self._kwargs, self.q
del self._target, self._args, self._kwargs, self.pid_q, self.stdout_q
class ogLiveWorker(ServerWorker):
thread_list = {}
@ -143,7 +145,6 @@ class ogLiveWorker(ServerWorker):
]
def notifier (self, job_id, result):
logger.debug (f'notifier() called, job_id ({job_id}) result ({result})')
result['job_id'] = job_id
self.REST.sendMessage ('clients/status/webhook', result)
@ -193,6 +194,14 @@ class ogLiveWorker(ServerWorker):
return { 'res':success, 'der':msg }
def _extract_progress (self, job_id, ary=[]):
progress = None
for i in ary:
if m := re.search (r'^\[([0-9]+)\]', i): ## look for strings like '[10]', '[60]'
logger.debug (f"matched regex, m.groups ({m.groups()})")
progress = float (m.groups()[0]) / 100
return progress
def mon (self):
while True:
with self.thread_lock:
@ -201,12 +210,21 @@ class ogLiveWorker(ServerWorker):
if 'thread' not in elem: continue
logger.debug (f'considering thread ({k})')
if self.q:
if not self.q.empty():
elem['child_pid'] = self.q.get()
if self.pid_q:
if not self.pid_q.empty():
elem['child_pid'] = self.pid_q.get()
logger.debug (f'queue not empty, got pid ({elem["child_pid"]})')
else:
logger.debug (f'queue empty')
if self.stdout_q:
partial = ''
while not self.stdout_q.empty():
partial += self.stdout_q.get()
lines = partial.splitlines()
if len (lines):
p = self._extract_progress (k, lines)
if p:
m = { "job_id": k, "progress": p }
self.REST.sendMessage ('clients/status/webhook', { "job_id": k, "progress": p })
elem['thread'].join (0.05)
if not elem['thread'].is_alive():
@ -221,8 +239,10 @@ class ogLiveWorker(ServerWorker):
def interfaceAdmin (self, method, parametros=[]):
exe = '{}/{}'.format (self.pathinterface, method)
## for development only. Will be removed when the referenced bash code (/opt/opengnsys/lib/engine/bin/*.lib) is translated into python
devel_bash_prefix = '''
LANG = os.environ.get ('LANG', 'en_GB.UTF-8').replace ('UTF_8', 'UTF-8')
devel_bash_prefix = f'''
PATH=/opt/opengnsys/scripts/:$PATH;
source /opt/opengnsys/etc/lang.{LANG}.conf;
for I in /opt/opengnsys/lib/engine/bin/*.lib; do source $I; done;
for i in $(declare -F |cut -f3 -d" "); do export -f $i; done;
'''
@ -234,15 +254,21 @@ class ogLiveWorker(ServerWorker):
logger.debug ('subprocess.run ("{}", capture_output=True)'.format (proc))
p = subprocess.Popen (proc, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if self.q:
self.q.put (p.pid)
if self.pid_q:
self.pid_q.put (p.pid)
else:
## esto sucede por ejemplo cuando arranca el agente, que estamos en interfaceAdmin() en el mismo hilo, sin _long_running_job ni hilo separado
logger.debug ('no queue--not writing any PID to it')
sout = serr = ''
while p.poll() is None:
for l in iter (p.stdout.readline, b''): sout += l.decode ('utf-8', 'ignore')
for l in iter (p.stderr.readline, b''): serr += l.decode ('utf-8', 'ignore')
for l in iter (p.stdout.readline, b''):
partial = l.decode ('utf-8', 'ignore')
if self.stdout_q: self.stdout_q.put (partial)
sout += partial
for l in iter (p.stderr.readline, b''):
partial = l.decode ('utf-8', 'ignore')
serr += partial
time.sleep (1)
sout = sout.strip()
serr = serr.strip()
@ -361,7 +387,7 @@ class ogLiveWorker(ServerWorker):
return obj
def onActivation (self, run_monitoring_thread):
def onActivation (self):
if not os.path.exists ('/scripts/oginit'):
## no estamos en oglive, este modulo no debe cargarse
## esta lógica la saco de src/opengnsys/linux/operations.py, donde hay un if similar
@ -376,7 +402,9 @@ class ogLiveWorker(ServerWorker):
self.idproautoexec = None
self.idcentro = None ## Identificador del centro
self.idaula = None ## Identificador del aula
self.q = None ## for passing PIDs around
self.pid_q = None ## for passing PIDs around
self.stdout_q = None ## for passing stdout
self.progress_jobs = {}
ogcore_scheme = os.environ.get ('OGAGENTCFG_OGCORE_SCHEME', 'https')
ogcore_ip = os.environ.get ('OGAGENTCFG_OGCORE_IP', '192.168.2.1')
@ -407,8 +435,7 @@ class ogLiveWorker(ServerWorker):
if not self.tomaMAClocal():
raise Exception ('Se han generado errores. No se puede continuar la ejecución de este módulo')
if run_monitoring_thread: ## should be true for exactly one ogLiveWorker
threading.Thread (name='monitoring_thread', target=self.mon, daemon=True).start()
threading.Thread (name='monitoring_thread', target=self.mon, daemon=True).start()
def _long_running_job (self, name, f, args):
any_job_running = False
@ -422,9 +449,10 @@ class ogLiveWorker(ServerWorker):
job_id = '{}-{}'.format (name, ''.join (random.choice ('0123456789abcdef') for _ in range (8)))
import queue
self.q = queue.Queue() ## a single queue works for us because we never have more than one long_running_job at the same time
self.pid_q = queue.Queue() ## a single queue works for us because we never have more than one long_running_job at the same time
self.stdout_q = queue.Queue()
self.thread_list[job_id] = {
'thread': ThreadWithResult (target=f, args=(self.q,) + args),
'thread': ThreadWithResult (target=f, args=(self.pid_q, self.stdout_q) + args),
'starttime': time.time(),
'child_pid': None,
'running': True,
@ -434,10 +462,10 @@ class ogLiveWorker(ServerWorker):
return { 'job_id': job_id }
## para matar threads tengo lo siguiente:
## - aqui en _long_running_job meto una cola en self.q
## - (self.q fue inicializado a None al instanciar el objeto, para evitar error "objeto no tiene 'q'")
## - aqui en _long_running_job meto una cola en self.pid_q
## - (self.pid_q fue inicializado a None al instanciar el objeto, para evitar error "objeto no tiene 'pid_q'")
## - en el thread_list también tengo un child_pid para almacenar el pid de los procesos hijos
## - al crear el ThreadWithResult le paso la cola, y luego en run() la recojo y la meto en el self.q del thread
## - al crear el ThreadWithResult le paso la cola, y luego en run() la recojo y la meto en el self.pid_q del thread
## - en interfaceAdmin() al hacer subprocess.Popen(), recojo el pid y lo escribo en la queue
## - en mon() recojo pids de la queue y los meto en thread_list 'child_pid'
## - algunas funciones llaman a interfaceAdmin más de una vez, y escriben más de un pid en la cola, y en mon() voy recogiendo y actualizando
@ -448,6 +476,11 @@ class ogLiveWorker(ServerWorker):
##
## $ curl --insecure -X POST --data '{"nfn":"EjecutarScript","scp":"cd /usr; sleep 30; pwd; ls","ids":"0"}' https://192.168.2.199:8000/ogAdmClient/EjecutarScript
## {"job_id": "EjecutarScript-333feb3f"}
## $ curl --insecure -X POST --data '{"job_id":"EjecutarScript-333feb3f"}' https://192.168.2.199:8000/CloningEngine/KillJob
## $ curl --insecure -X POST --data '{"job_id":"EjecutarScript-333feb3f"}' https://192.168.2.199:8000/ogAdmClient/KillJob
##
## funciona bien, excepto que el PID no muere xD, ni siquiera haciendo subprocess.run('kill')
## para mostrar el progreso de los jobs reutilizo la misma infra
## una cola self.stdout_q
## en interfaceAdmin escribo la stdout parcial que ya venia recogiendo
## mon() lo recoge y le hace un POST a ogcore