ogcore1 #14
|
@ -1,3 +1,66 @@
|
||||||
|
ogagent (1.4.6-1) UNRELEASED; urgency=medium
|
||||||
|
|
||||||
|
* Point to the new menu browser
|
||||||
|
|
||||||
|
-- OpenGnsys developers <info@opengnsys.es> Tue, 14 Jan 2025 12:00:24 +0100
|
||||||
|
|
||||||
|
ogagent (1.4.5-1) stable; urgency=medium
|
||||||
|
|
||||||
|
* Kill long running jobs in oglive
|
||||||
|
|
||||||
|
-- OpenGnsys developers <info@opengnsys.es> Fri, 29 Nov 2024 10:22:36 +0100
|
||||||
|
|
||||||
|
ogagent (1.4.5~pre8-1) stable; urgency=medium
|
||||||
|
|
||||||
|
* Add Configurar() to the CloningEngine module
|
||||||
|
|
||||||
|
-- OpenGnsys developers <info@opengnsys.es> Wed, 27 Nov 2024 20:02:42 +0100
|
||||||
|
|
||||||
|
ogagent (1.4.5~pre7-1) stable; urgency=medium
|
||||||
|
|
||||||
|
* Use old browser again
|
||||||
|
|
||||||
|
-- OpenGnsys developers <info@opengnsys.es> Wed, 20 Nov 2024 14:24:44 +0100
|
||||||
|
|
||||||
|
ogagent (1.4.5~pre6-1) stable; urgency=medium
|
||||||
|
|
||||||
|
* Do not use envvars for the operating-system module
|
||||||
|
|
||||||
|
-- OpenGnsys developers <info@opengnsys.es> Wed, 20 Nov 2024 13:45:21 +0100
|
||||||
|
|
||||||
|
ogagent (1.4.5~pre5-1) stable; urgency=medium
|
||||||
|
|
||||||
|
* Avoid some KeyErrors
|
||||||
|
|
||||||
|
-- OpenGnsys developers <info@opengnsys.es> Mon, 18 Nov 2024 12:14:27 +0100
|
||||||
|
|
||||||
|
ogagent (1.4.5~pre4-1) stable; urgency=medium
|
||||||
|
|
||||||
|
* Don't die when ogcore returns HTTP 4xx or 5xx
|
||||||
|
* Get ogcore IP and port from the environment
|
||||||
|
|
||||||
|
-- OpenGnsys developers <info@opengnsys.es> Fri, 15 Nov 2024 11:43:01 +0100
|
||||||
|
|
||||||
|
ogagent (1.4.5~pre3-1) stable; urgency=medium
|
||||||
|
|
||||||
|
* Kill long running jobs in oglive (not-yet-working draft)
|
||||||
|
|
||||||
|
-- OpenGnsys developers <info@opengnsys.es> Wed, 06 Nov 2024 14:11:32 +0100
|
||||||
|
|
||||||
|
ogagent (1.4.5~pre2-1) stable; urgency=medium
|
||||||
|
|
||||||
|
* Remove race condition due to several monitoring threads
|
||||||
|
* Include job_id in asynchronous responses
|
||||||
|
* Remove vim swapfiles from the package contents
|
||||||
|
|
||||||
|
-- OpenGnsys developers <info@opengnsys.es> Wed, 06 Nov 2024 13:24:03 +0100
|
||||||
|
|
||||||
|
ogagent (1.4.5~pre1-1) stable; urgency=medium
|
||||||
|
|
||||||
|
* CrearImagen: return inventory inline
|
||||||
|
|
||||||
|
-- OpenGnsys developers <info@opengnsys.es> Wed, 06 Nov 2024 12:41:14 +0100
|
||||||
|
|
||||||
ogagent (1.4.4-1) stable; urgency=medium
|
ogagent (1.4.4-1) stable; urgency=medium
|
||||||
|
|
||||||
* Use logger.debug() to prevent the windows agent from dying
|
* Use logger.debug() to prevent the windows agent from dying
|
||||||
|
|
|
@ -22,6 +22,7 @@ install: build
|
||||||
dh_prep
|
dh_prep
|
||||||
dh_installdirs
|
dh_installdirs
|
||||||
$(MAKE) DESTDIR=$(CURDIR)/debian/ogagent install-ogagent
|
$(MAKE) DESTDIR=$(CURDIR)/debian/ogagent install-ogagent
|
||||||
|
find $(CURDIR) -name '*.swp' -exec rm -f '{}' ';'
|
||||||
binary-arch: build install
|
binary-arch: build install
|
||||||
# emptyness
|
# emptyness
|
||||||
binary-indep: build install
|
binary-indep: build install
|
||||||
|
|
|
@ -178,7 +178,7 @@ def oac_recibe_archivo():
|
||||||
logging.info(f'dec ({dec})')
|
logging.info(f'dec ({dec})')
|
||||||
return jsonify({'anything':'anything'}) ## if we return {}, then we trigger "if not {}" which happens to be true
|
return jsonify({'anything':'anything'}) ## if we return {}, then we trigger "if not {}" which happens to be true
|
||||||
|
|
||||||
@app.route('/opengnsys/rest/ogAdmClient/callback', methods=['POST'])
|
@app.route('/opengnsys/rest/clients/status/webhook', methods=['POST'])
|
||||||
def oac_callback():
|
def oac_callback():
|
||||||
logging.info(f'{request.get_json()}')
|
logging.info(f'{request.get_json()}')
|
||||||
return jsonify({'anything':'anything'})
|
return jsonify({'anything':'anything'})
|
||||||
|
|
|
@ -1 +1 @@
|
||||||
1.4.4
|
1.4.6
|
||||||
|
|
|
@ -23,15 +23,15 @@ log=DEBUG
|
||||||
[ogAdmClient]
|
[ogAdmClient]
|
||||||
#path=test_modules/server,more_modules/server
|
#path=test_modules/server,more_modules/server
|
||||||
|
|
||||||
remote=https://192.168.2.1/opengnsys/rest
|
remote={}://{}/opengnsys/rest
|
||||||
log=DEBUG
|
log=DEBUG
|
||||||
pathinterface=/opt/opengnsys/interfaceAdm
|
pathinterface=/opt/opengnsys/interfaceAdm
|
||||||
urlMenu=https://192.168.2.1/opengnsys/varios/menubrowser.php
|
urlMenu={}://{}/menu-browser
|
||||||
urlMsg=http://localhost/cgi-bin/httpd-log.sh
|
urlMsg=http://localhost/cgi-bin/httpd-log.sh
|
||||||
|
|
||||||
[CloningEngine]
|
[CloningEngine]
|
||||||
remote=https://192.168.2.1/opengnsys/rest
|
remote={}://{}/opengnsys/rest
|
||||||
log=DEBUG
|
log=DEBUG
|
||||||
pathinterface=/opt/opengnsys/interfaceAdm
|
pathinterface=/opt/opengnsys/interfaceAdm
|
||||||
urlMenu=https://192.168.2.1/opengnsys/varios/menubrowser.php
|
urlMenu={}://{}/menu-browser
|
||||||
urlMsg=http://localhost/cgi-bin/httpd-log.sh
|
urlMsg=http://localhost/cgi-bin/httpd-log.sh
|
||||||
|
|
|
@ -168,4 +168,9 @@ class REST(object):
|
||||||
url = self._getUrl(msg)
|
url = self._getUrl(msg)
|
||||||
logger.debug('Requesting {}'.format(url))
|
logger.debug('Requesting {}'.format(url))
|
||||||
|
|
||||||
return self._request(url, data)
|
try:
|
||||||
|
res = self._request(url, data)
|
||||||
|
return res
|
||||||
|
except:
|
||||||
|
logger.exception()
|
||||||
|
return None
|
||||||
|
|
|
@ -42,13 +42,13 @@ class CloningEngineWorker (ogLiveWorker):
|
||||||
REST = None # REST object
|
REST = None # REST object
|
||||||
|
|
||||||
def onActivation (self):
|
def onActivation (self):
|
||||||
super().onActivation()
|
super().onActivation (run_monitoring_thread=False)
|
||||||
logger.info ('onActivation ok')
|
logger.info ('onActivation ok')
|
||||||
|
|
||||||
def onDeactivation (self):
|
def onDeactivation (self):
|
||||||
logger.debug ('onDeactivation')
|
logger.debug ('onDeactivation')
|
||||||
|
|
||||||
def InventariandoSoftware (self, dsk, par, sw, nfn):
|
def InventariandoSoftware (self, dsk, par, nfn):
|
||||||
sft_src = f'/tmp/CSft-{self.IPlocal}-{par}'
|
sft_src = f'/tmp/CSft-{self.IPlocal}-{par}'
|
||||||
try:
|
try:
|
||||||
self.interfaceAdmin (nfn, [dsk, par, sft_src])
|
self.interfaceAdmin (nfn, [dsk, par, sft_src])
|
||||||
|
@ -58,31 +58,23 @@ class CloningEngineWorker (ogLiveWorker):
|
||||||
|
|
||||||
if herror:
|
if herror:
|
||||||
logger.warning ('Error al ejecutar el comando')
|
logger.warning ('Error al ejecutar el comando')
|
||||||
|
b64 = ''
|
||||||
self.muestraMensaje (20)
|
self.muestraMensaje (20)
|
||||||
else:
|
else:
|
||||||
if not os.path.exists (sft_src):
|
if not os.path.exists (sft_src):
|
||||||
raise Exception (f'interfaceAdmin({nfn}) returned success but did not create file ({sft_src})')
|
raise Exception (f'interfaceAdmin({nfn}) returned success but did not create file ({sft_src})')
|
||||||
sft_src_contents = Path (sft_src).read_bytes()
|
sft_src_contents = Path (sft_src).read_bytes()
|
||||||
|
|
||||||
## Envía fichero de inventario al servidor
|
b64 = base64.b64encode (sft_src_contents).decode ('utf-8')
|
||||||
sft_dst = f'/tmp/Ssft-{self.IPlocal}-{par}' ## Nombre que tendra el archivo en el Servidor
|
|
||||||
logger.debug ('sending recibeArchivo to server')
|
|
||||||
res = self.enviaMensajeServidor ('recibeArchivo', { 'nfl': sft_dst, 'contents': base64.b64encode (sft_src_contents).decode ('utf-8') })
|
|
||||||
logger.debug (res)
|
|
||||||
if not res:
|
|
||||||
herror = 12 ## Error de envío de fichero por la red
|
|
||||||
raise Exception ('Ha ocurrido algún problema al enviar un archivo por la red')
|
|
||||||
self.muestraMensaje (19)
|
self.muestraMensaje (19)
|
||||||
|
|
||||||
if not sw:
|
cmd = {
|
||||||
cmd = {
|
'nfn': 'RESPUESTA_InventarioSoftware',
|
||||||
'nfn': 'RESPUESTA_InventarioSoftware',
|
'dsk': dsk, ## not in the original C code, around ogAdmClient.c:1944
|
||||||
'par': par,
|
'par': par,
|
||||||
'sft': sft_dst,
|
'contents': b64,
|
||||||
}
|
}
|
||||||
return self.respuestaEjecucionComando (cmd, herror, 0)
|
return self.respuestaEjecucionComando (cmd, herror, 0)
|
||||||
|
|
||||||
return {'true':'true'} ## XXX
|
|
||||||
|
|
||||||
def do_CrearImagen (self, post_params):
|
def do_CrearImagen (self, post_params):
|
||||||
for k in ['dsk', 'par', 'cpt', 'idi', 'nci', 'ipr', 'nfn', 'ids']:
|
for k in ['dsk', 'par', 'cpt', 'idi', 'nci', 'ipr', 'nfn', 'ids']:
|
||||||
|
@ -102,13 +94,14 @@ class CloningEngineWorker (ogLiveWorker):
|
||||||
self.muestraMensaje (7)
|
self.muestraMensaje (7)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
res = self.InventariandoSoftware (dsk, par, False, 'InventarioSoftware') ## Crea inventario Software previamente
|
res = self.InventariandoSoftware (dsk, par, 'InventarioSoftware') ## Crea inventario Software previamente
|
||||||
except:
|
except:
|
||||||
logger.warning ('Error al ejecutar el comando')
|
logger.warning ('Error al ejecutar el comando')
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
if res:
|
if res['contents']:
|
||||||
self.muestraMensaje (2)
|
self.muestraMensaje (2)
|
||||||
|
inv_sft = res['contents']
|
||||||
try:
|
try:
|
||||||
self.interfaceAdmin (nfn, [dsk, par, nci, ipr])
|
self.interfaceAdmin (nfn, [dsk, par, nci, ipr])
|
||||||
self.muestraMensaje (9)
|
self.muestraMensaje (9)
|
||||||
|
@ -119,16 +112,18 @@ class CloningEngineWorker (ogLiveWorker):
|
||||||
herror = 1
|
herror = 1
|
||||||
else:
|
else:
|
||||||
logger.warning ('Error al ejecutar el comando')
|
logger.warning ('Error al ejecutar el comando')
|
||||||
|
inv_sft = ''
|
||||||
|
|
||||||
self.muestraMenu()
|
self.muestraMenu()
|
||||||
|
|
||||||
cmd = {
|
cmd = {
|
||||||
'nfn': 'RESPUESTA_CrearImagen',
|
'nfn': 'RESPUESTA_CrearImagen',
|
||||||
'idi': idi, ## Identificador de la imagen
|
'idi': idi, ## Identificador de la imagen
|
||||||
'dsk': dsk, ## Número de disco
|
'dsk': dsk, ## Número de disco
|
||||||
'par': par, ## Número de partición de donde se creó
|
'par': par, ## Número de partición de donde se creó
|
||||||
'cpt': cpt, ## Tipo o código de partición
|
'cpt': cpt, ## Tipo o código de partición
|
||||||
'ipr': ipr, ## Ip del repositorio donde se alojó
|
'ipr': ipr, ## Ip del repositorio donde se alojó
|
||||||
|
'inv_sft': inv_sft,
|
||||||
}
|
}
|
||||||
return self.respuestaEjecucionComando (cmd, herror, ids)
|
return self.respuestaEjecucionComando (cmd, herror, ids)
|
||||||
|
|
||||||
|
@ -151,7 +146,10 @@ class CloningEngineWorker (ogLiveWorker):
|
||||||
self.muestraMensaje (3)
|
self.muestraMensaje (3)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.interfaceAdmin (nfn, [dsk, par, nci, ipr, ptc])
|
## the ptc.split() is useless right now, since interfaceAdmin() does ' '.join(params) in order to spawn a shell
|
||||||
|
## however we're going to need it in the future (when everything gets translated into python), plus it's harmless now. So let's do it
|
||||||
|
#self.interfaceAdmin (nfn, [dsk, par, nci, ipr, ptc])
|
||||||
|
self.interfaceAdmin (nfn, [dsk, par, nci, ipr] + ptc.split())
|
||||||
self.muestraMensaje (11)
|
self.muestraMensaje (11)
|
||||||
herror = 0
|
herror = 0
|
||||||
except:
|
except:
|
||||||
|
@ -192,13 +190,31 @@ class CloningEngineWorker (ogLiveWorker):
|
||||||
|
|
||||||
nfn = post_params['nfn']
|
nfn = post_params['nfn']
|
||||||
dsk = post_params['dsk']
|
dsk = post_params['dsk']
|
||||||
cfg = post_params['cfg'].replace('\n','$').replace('\t','#')
|
cfg = post_params['cfg']
|
||||||
ids = post_params['ids']
|
ids = post_params['ids']
|
||||||
|
|
||||||
self.muestraMensaje (4)
|
self.muestraMensaje (4)
|
||||||
|
|
||||||
|
params = []
|
||||||
|
disk_info = cfg.pop (0)
|
||||||
|
logger.debug (f'disk_info ({disk_info})')
|
||||||
|
for k in ['dis', 'che', 'tch']:
|
||||||
|
params.append (f'{k}={disk_info[k]}')
|
||||||
|
disk_info_str = '*'.join (params)
|
||||||
|
|
||||||
|
partitions = []
|
||||||
|
for entry in cfg:
|
||||||
|
logger.debug (f'entry ({entry})')
|
||||||
|
params = []
|
||||||
|
for k in ['par', 'cpt', 'sfi', 'tam', 'ope']:
|
||||||
|
params.append (f'{k}={entry[k]}')
|
||||||
|
partitions.append ('*'.join (params))
|
||||||
|
part_info_str = '%'.join (partitions)
|
||||||
|
|
||||||
|
cfg_str = f'{disk_info_str}!{part_info_str}%'
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.interfaceAdmin (nfn, [dsk, cfg])
|
self.interfaceAdmin (nfn, ['ignored', cfg_str])
|
||||||
self.muestraMensaje (14)
|
self.muestraMensaje (14)
|
||||||
herror = 0
|
herror = 0
|
||||||
except:
|
except:
|
||||||
|
@ -275,16 +291,14 @@ class CloningEngineWorker (ogLiveWorker):
|
||||||
self.muestraMensaje (7)
|
self.muestraMensaje (7)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.InventariandoSoftware (dsk, par, True, 'InventarioSoftware')
|
cmd = self.InventariandoSoftware (dsk, par, 'InventarioSoftware')
|
||||||
herror = 0
|
herror = 0
|
||||||
except:
|
except:
|
||||||
logger.warning ('Error al ejecutar el comando')
|
logger.warning ('Error al ejecutar el comando')
|
||||||
|
cmd = { 'nfn': 'RESPUESTA_InventarioSoftware' }
|
||||||
herror = 1
|
herror = 1
|
||||||
|
|
||||||
self.muestraMenu()
|
self.muestraMenu()
|
||||||
cmd = {
|
|
||||||
'nfn': 'RESPUESTA_InventarioSoftware',
|
|
||||||
}
|
|
||||||
return self.respuestaEjecucionComando (cmd, herror, ids)
|
return self.respuestaEjecucionComando (cmd, herror, ids)
|
||||||
|
|
||||||
def process_CrearImagen (self, path, get_params, post_params, server):
|
def process_CrearImagen (self, path, get_params, post_params, server):
|
||||||
|
@ -329,3 +343,13 @@ class CloningEngineWorker (ogLiveWorker):
|
||||||
def process_InventarioSoftware (self, path, get_params, post_params, server):
|
def process_InventarioSoftware (self, path, get_params, post_params, server):
|
||||||
logger.debug ('in process_InventarioSoftware, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server))
|
logger.debug ('in process_InventarioSoftware, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server))
|
||||||
return self._long_running_job ('InventarioSoftware', self.do_InventarioSoftware, args=(post_params,))
|
return self._long_running_job ('InventarioSoftware', self.do_InventarioSoftware, args=(post_params,))
|
||||||
|
|
||||||
|
## curl --insecure -X POST --data '{"job_id":"foo"}' https://192.168.2.199:8000/CloningEngine/KillJob
|
||||||
|
def process_KillJob (self, path, get_params, post_params, server):
|
||||||
|
logger.debug ('in process_KillJob, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server))
|
||||||
|
jid = post_params['job_id']
|
||||||
|
r = self.killer (jid)
|
||||||
|
logger.debug (f'r bef ({r})')
|
||||||
|
r.update ({ 'nfn':'RESPUESTA_KillJob', 'job':jid })
|
||||||
|
logger.debug (f'r aft ({r})')
|
||||||
|
return r
|
||||||
|
|
|
@ -279,7 +279,7 @@ class ogAdmClientWorker (ogLiveWorker):
|
||||||
#}
|
#}
|
||||||
|
|
||||||
def onActivation (self):
|
def onActivation (self):
|
||||||
super().onActivation()
|
super().onActivation (run_monitoring_thread=True)
|
||||||
logger.info ('Inicio de sesion')
|
logger.info ('Inicio de sesion')
|
||||||
logger.info ('Abriendo sesión en el servidor de Administración')
|
logger.info ('Abriendo sesión en el servidor de Administración')
|
||||||
if (not self.inclusionCliente()):
|
if (not self.inclusionCliente()):
|
||||||
|
@ -561,3 +561,13 @@ class ogAdmClientWorker (ogLiveWorker):
|
||||||
def process_EjecutaComandosPendientes (self, path, get_params, post_params, server):
|
def process_EjecutaComandosPendientes (self, path, get_params, post_params, server):
|
||||||
logger.debug ('in process_EjecutaComandosPendientes, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server))
|
logger.debug ('in process_EjecutaComandosPendientes, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server))
|
||||||
return {'true':'true'} ## ogAdmClient.c:2138
|
return {'true':'true'} ## ogAdmClient.c:2138
|
||||||
|
|
||||||
|
## curl --insecure -X POST --data '{"job_id":"foo"}' https://192.168.2.199:8000/ogAdmClient/KillJob
|
||||||
|
def process_KillJob (self, path, get_params, post_params, server):
|
||||||
|
logger.debug ('in process_KillJob, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server))
|
||||||
|
jid = post_params['job_id']
|
||||||
|
r = self.killer (jid)
|
||||||
|
logger.debug (f'r bef ({r})')
|
||||||
|
r.update ({ 'nfn':'RESPUESTA_KillJob', 'job':jid })
|
||||||
|
logger.debug (f'r aft ({r})')
|
||||||
|
return r
|
||||||
|
|
|
@ -35,6 +35,7 @@ import time
|
||||||
import random
|
import random
|
||||||
import subprocess
|
import subprocess
|
||||||
import threading
|
import threading
|
||||||
|
import signal
|
||||||
|
|
||||||
from configparser import NoOptionError
|
from configparser import NoOptionError
|
||||||
from opengnsys import REST
|
from opengnsys import REST
|
||||||
|
@ -47,16 +48,20 @@ class ThreadWithResult (threading.Thread):
|
||||||
try:
|
try:
|
||||||
self.result = None
|
self.result = None
|
||||||
if self._target is not None:
|
if self._target is not None:
|
||||||
|
## the first arg in self._args is the queue
|
||||||
|
self.q = self._args[0]
|
||||||
|
self._args = self._args[1:]
|
||||||
try:
|
try:
|
||||||
self.result = self._target (*self._args, **self._kwargs)
|
self.result = self._target (*self._args, **self._kwargs)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.result = { 'res': 2, 'der': f'got exception: ({e})' } ## res=2 as defined in ogAdmClient.c:2048
|
self.result = { 'res': 2, 'der': f'got exception: ({e})' } ## res=2 as defined in ogAdmClient.c:2048
|
||||||
finally:
|
finally:
|
||||||
# Avoid a refcycle if the thread is running a function with an argument that has a member that points to the thread.
|
# Avoid a refcycle if the thread is running a function with an argument that has a member that points to the thread.
|
||||||
del self._target, self._args, self._kwargs
|
del self._target, self._args, self._kwargs, self.q
|
||||||
|
|
||||||
class ogLiveWorker(ServerWorker):
|
class ogLiveWorker(ServerWorker):
|
||||||
thread_list = {}
|
thread_list = {}
|
||||||
|
thread_lock = threading.Lock()
|
||||||
|
|
||||||
tbErroresScripts = [
|
tbErroresScripts = [
|
||||||
"Se han generado errores desconocidos. No se puede continuar la ejecución de este módulo", ## 0
|
"Se han generado errores desconocidos. No se puede continuar la ejecución de este módulo", ## 0
|
||||||
|
@ -137,25 +142,79 @@ class ogLiveWorker(ServerWorker):
|
||||||
"Error desconocido",
|
"Error desconocido",
|
||||||
]
|
]
|
||||||
|
|
||||||
def notifier (self, result):
|
def notifier (self, job_id, result):
|
||||||
logger.debug (f'notifier() called, result ({result})')
|
logger.debug (f'notifier() called, job_id ({job_id}) result ({result})')
|
||||||
res = self.REST.sendMessage ('/clients/status/webhook', result)
|
result['job_id'] = job_id
|
||||||
|
self.REST.sendMessage ('clients/status/webhook', result)
|
||||||
|
|
||||||
|
def killer (self, job_id):
|
||||||
|
logger.debug (f'killer() called, job_id ({job_id})')
|
||||||
|
if job_id not in self.thread_list: return { 'res': 2, 'der': 'Unknown job' }
|
||||||
|
|
||||||
|
with self.thread_lock:
|
||||||
|
if 'thread' not in self.thread_list[job_id]: return { 'res': 2, 'der': 'Job is not running' }
|
||||||
|
t = self.thread_list[job_id]['thread']
|
||||||
|
pid = self.thread_list[job_id]['child_pid']
|
||||||
|
logger.debug (f'pid ({pid})')
|
||||||
|
try_times = 8
|
||||||
|
sig = signal.SIGTERM
|
||||||
|
msg = f'could not kill pid ({pid}) after ({try_times}) tries'
|
||||||
|
success = 2 ## mimic cmd['res'] in respuestaEjecucionComando(): "1" means success, "2" means failed
|
||||||
|
while True:
|
||||||
|
t.join (0.05)
|
||||||
|
if not t.is_alive():
|
||||||
|
msg = 'job terminated'
|
||||||
|
success = 1
|
||||||
|
logger.debug (msg)
|
||||||
|
self.thread_list[job_id]['child_pid'] = None
|
||||||
|
break
|
||||||
|
## race condition: if the subprocess finishes just here, then we already checked that t.is_alive() is true, but os.path.exists(/proc/pid) will be false below. msg will be 'nothing to kill'.
|
||||||
|
## this is fine in the first iteration of the loop, before we send any signals. In the rest of iterations, after some signals were sent, msg should be 'job terminated' instead.
|
||||||
|
if pid:
|
||||||
|
if os.path.exists (f'/proc/{pid}'):
|
||||||
|
logger.debug (f'sending signal ({sig}) to pid ({pid})')
|
||||||
|
## if the process finishes just here, nothing happens: the signal is sent to the void
|
||||||
|
os.kill (pid, sig)
|
||||||
|
#subprocess.run (['kill', '--signal', str(sig), str(pid)])
|
||||||
|
else:
|
||||||
|
msg = f'pid ({pid}) is gone, nothing to kill'
|
||||||
|
success = 1
|
||||||
|
logger.debug (msg)
|
||||||
|
self.thread_list[job_id]['child_pid'] = None
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
msg = 'no PID to kill'
|
||||||
|
logger.debug (msg)
|
||||||
|
|
||||||
|
if not try_times: break
|
||||||
|
if 4 == try_times: sig = signal.SIGKILL ## change signal after a few tries
|
||||||
|
try_times -= 1
|
||||||
|
time.sleep (0.4)
|
||||||
|
|
||||||
|
return { 'res':success, 'der':msg }
|
||||||
|
|
||||||
def mon (self):
|
def mon (self):
|
||||||
while True:
|
while True:
|
||||||
#print ('mon(): iterating')
|
with self.thread_lock:
|
||||||
for k in self.thread_list:
|
for k in self.thread_list:
|
||||||
elem = self.thread_list[k]
|
elem = self.thread_list[k]
|
||||||
if 'thread' not in elem: continue
|
if 'thread' not in elem: continue
|
||||||
logger.debug (f'considering thread ({k})')
|
logger.debug (f'considering thread ({k})')
|
||||||
try: elem['thread'].join (0.05)
|
|
||||||
except RuntimeError: pass ## race condition: a thread is created and this code runs before it is start()ed
|
if self.q:
|
||||||
if not elem['thread'].is_alive():
|
if not self.q.empty():
|
||||||
logger.debug (f'is no longer alive, k ({k}) thread ({elem["thread"]})')
|
elem['child_pid'] = self.q.get()
|
||||||
elem['running'] = False
|
logger.debug (f'queue not empty, got pid ({elem["child_pid"]})')
|
||||||
elem['result'] = elem['thread'].result
|
else:
|
||||||
del elem['thread']
|
logger.debug (f'queue empty')
|
||||||
self.notifier (elem['result'])
|
|
||||||
|
elem['thread'].join (0.05)
|
||||||
|
if not elem['thread'].is_alive():
|
||||||
|
logger.debug (f'is no longer alive, k ({k}) thread ({elem["thread"]})')
|
||||||
|
elem['running'] = False
|
||||||
|
elem['result'] = elem['thread'].result
|
||||||
|
del elem['thread']
|
||||||
|
self.notifier (k, elem['result'])
|
||||||
|
|
||||||
time.sleep (1)
|
time.sleep (1)
|
||||||
|
|
||||||
|
@ -169,26 +228,40 @@ class ogLiveWorker(ServerWorker):
|
||||||
'''
|
'''
|
||||||
|
|
||||||
if parametros:
|
if parametros:
|
||||||
proc = ['bash', '-c', '{} bash -x {} {}'.format (devel_bash_prefix, exe, ' '.join (parametros))]
|
proc = ['bash', '-c', '{} {} {}'.format (devel_bash_prefix, exe, ' '.join (parametros))]
|
||||||
else:
|
else:
|
||||||
proc = ['bash', '-c', '{} bash -x {}'.format (devel_bash_prefix, exe)]
|
proc = ['bash', '-c', '{} {}'.format (devel_bash_prefix, exe)]
|
||||||
logger.debug ('subprocess.run ("{}", capture_output=True)'.format (proc))
|
logger.debug ('subprocess.run ("{}", capture_output=True)'.format (proc))
|
||||||
p = subprocess.run (proc, capture_output=True)
|
|
||||||
|
p = subprocess.Popen (proc, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||||
|
if self.q:
|
||||||
|
self.q.put (p.pid)
|
||||||
|
else:
|
||||||
|
## esto sucede por ejemplo cuando arranca el agente, que estamos en interfaceAdmin() en el mismo hilo, sin _long_running_job ni hilo separado
|
||||||
|
logger.debug ('no queue--not writing any PID to it')
|
||||||
|
sout = serr = ''
|
||||||
|
while p.poll() is None:
|
||||||
|
for l in iter (p.stdout.readline, b''): sout += l.decode ('utf-8', 'ignore')
|
||||||
|
for l in iter (p.stderr.readline, b''): serr += l.decode ('utf-8', 'ignore')
|
||||||
|
time.sleep (1)
|
||||||
|
sout = sout.strip()
|
||||||
|
serr = serr.strip()
|
||||||
|
|
||||||
## DEBUG
|
## DEBUG
|
||||||
logger.info (f'stdout follows:')
|
logger.info (f'stdout follows:')
|
||||||
for l in p.stdout.strip().decode ('utf-8').splitlines():
|
for l in sout.splitlines():
|
||||||
logger.info (f' {l}')
|
logger.info (f' {l}')
|
||||||
logger.info (f'stderr follows:')
|
logger.info (f'stderr follows:')
|
||||||
for l in p.stderr.strip().decode ('utf-8').splitlines():
|
for l in serr.splitlines():
|
||||||
logger.info (f' {l}')
|
logger.info (f' {l}')
|
||||||
## /DEBUG
|
## /DEBUG
|
||||||
if 0 != p.returncode:
|
if 0 != p.returncode:
|
||||||
cmd_txt = ' '.join (proc)
|
cmd_txt = ' '.join (proc)
|
||||||
logger.error (f'command ({cmd_txt}) failed, stderr follows:')
|
logger.error (f'command ({cmd_txt}) failed, stderr follows:')
|
||||||
for l in p.stderr.strip().decode ('utf-8').splitlines():
|
for l in serr.splitlines():
|
||||||
logger.error (f' {l}')
|
logger.error (f' {l}')
|
||||||
raise Exception (f'command ({cmd_txt}) failed, see log for details')
|
raise Exception (f'command ({cmd_txt}) failed, see log for details')
|
||||||
return p.stdout.strip().decode ('utf-8')
|
return sout
|
||||||
|
|
||||||
def tomaIPlocal (self):
|
def tomaIPlocal (self):
|
||||||
try:
|
try:
|
||||||
|
@ -237,15 +310,15 @@ class ogLiveWorker(ServerWorker):
|
||||||
cmd['der'] = ''
|
cmd['der'] = ''
|
||||||
else: ## el comando tuvo algún error
|
else: ## el comando tuvo algún error
|
||||||
cmd['res'] = 2
|
cmd['res'] = 2
|
||||||
cmd['der'] = self.tbErroresScripts[herror] ## XXX
|
cmd['der'] = self.tbErroresScripts[herror]
|
||||||
|
|
||||||
return cmd
|
return cmd
|
||||||
|
|
||||||
def cargaPaginaWeb (self, url=None):
|
def cargaPaginaWeb (self, url=None):
|
||||||
if (not url): url = self.urlMenu
|
if (not url): url = self.urlMenu
|
||||||
os.system ('pkill -9 OGBrowser')
|
os.system ('pkill -9 browser')
|
||||||
|
|
||||||
p = subprocess.Popen (['/usr/bin/OGBrowser', '-qws', url])
|
p = subprocess.Popen (['/usr/bin/browser', '-qws', url])
|
||||||
try:
|
try:
|
||||||
p.wait (2) ## if the process dies before 2 seconds...
|
p.wait (2) ## if the process dies before 2 seconds...
|
||||||
logger.error ('Error al ejecutar la llamada a la interface de administración')
|
logger.error ('Error al ejecutar la llamada a la interface de administración')
|
||||||
|
@ -288,7 +361,7 @@ class ogLiveWorker(ServerWorker):
|
||||||
|
|
||||||
return obj
|
return obj
|
||||||
|
|
||||||
def onActivation (self):
|
def onActivation (self, run_monitoring_thread):
|
||||||
if not os.path.exists ('/scripts/oginit'):
|
if not os.path.exists ('/scripts/oginit'):
|
||||||
## no estamos en oglive, este modulo no debe cargarse
|
## no estamos en oglive, este modulo no debe cargarse
|
||||||
## esta lógica la saco de src/opengnsys/linux/operations.py, donde hay un if similar
|
## esta lógica la saco de src/opengnsys/linux/operations.py, donde hay un if similar
|
||||||
|
@ -303,13 +376,25 @@ class ogLiveWorker(ServerWorker):
|
||||||
self.idproautoexec = None
|
self.idproautoexec = None
|
||||||
self.idcentro = None ## Identificador del centro
|
self.idcentro = None ## Identificador del centro
|
||||||
self.idaula = None ## Identificador del aula
|
self.idaula = None ## Identificador del aula
|
||||||
|
self.q = None ## for passing PIDs around
|
||||||
|
|
||||||
|
ogcore_scheme = os.environ.get ('OGAGENTCFG_OGCORE_SCHEME', 'https')
|
||||||
|
ogcore_ip = os.environ.get ('OGAGENTCFG_OGCORE_IP', '192.168.2.1')
|
||||||
|
ogcore_port = os.environ.get ('OGAGENTCFG_OGCORE_PORT', '8443')
|
||||||
|
urlmenu_scheme = os.environ.get ('OGAGENTCFG_URLMENU_SCHEME', 'https')
|
||||||
|
urlmenu_ip = os.environ.get ('OGAGENTCFG_URLMENU_IP', '192.168.2.1')
|
||||||
|
urlmenu_port = os.environ.get ('OGAGENTCFG_URLMENU_PORT', '8443')
|
||||||
|
ogcore_ip_port = ':'.join (map (str, filter (None, [ogcore_ip, ogcore_port ])))
|
||||||
|
urlmenu_ip_port = ':'.join (map (str, filter (None, [urlmenu_ip, urlmenu_port])))
|
||||||
try:
|
try:
|
||||||
url = self.service.config.get (self.name, 'remote')
|
url = self.service.config.get (self.name, 'remote')
|
||||||
loglevel = self.service.config.get (self.name, 'log')
|
loglevel = self.service.config.get (self.name, 'log')
|
||||||
self.pathinterface = self.service.config.get (self.name, 'pathinterface')
|
self.pathinterface = self.service.config.get (self.name, 'pathinterface')
|
||||||
self.urlMenu = self.service.config.get (self.name, 'urlMenu')
|
self.urlMenu = self.service.config.get (self.name, 'urlMenu')
|
||||||
self.urlMsg = self.service.config.get (self.name, 'urlMsg')
|
self.urlMsg = self.service.config.get (self.name, 'urlMsg')
|
||||||
|
|
||||||
|
url = url.format (ogcore_scheme, ogcore_ip_port)
|
||||||
|
self.urlMenu = self.urlMenu.format (urlmenu_scheme, urlmenu_ip_port)
|
||||||
except NoOptionError as e:
|
except NoOptionError as e:
|
||||||
logger.error ("Configuration error: {}".format (e))
|
logger.error ("Configuration error: {}".format (e))
|
||||||
raise e
|
raise e
|
||||||
|
@ -322,7 +407,8 @@ class ogLiveWorker(ServerWorker):
|
||||||
if not self.tomaMAClocal():
|
if not self.tomaMAClocal():
|
||||||
raise Exception ('Se han generado errores. No se puede continuar la ejecución de este módulo')
|
raise Exception ('Se han generado errores. No se puede continuar la ejecución de este módulo')
|
||||||
|
|
||||||
threading.Thread (name='monitoring_thread', target=self.mon, daemon=True).start()
|
if run_monitoring_thread: ## should be true for exactly one ogLiveWorker
|
||||||
|
threading.Thread (name='monitoring_thread', target=self.mon, daemon=True).start()
|
||||||
|
|
||||||
def _long_running_job (self, name, f, args):
|
def _long_running_job (self, name, f, args):
|
||||||
any_job_running = False
|
any_job_running = False
|
||||||
|
@ -335,11 +421,33 @@ class ogLiveWorker(ServerWorker):
|
||||||
return { 'job_id': None, 'message': 'some job is already running, refusing to launch another one' }
|
return { 'job_id': None, 'message': 'some job is already running, refusing to launch another one' }
|
||||||
|
|
||||||
job_id = '{}-{}'.format (name, ''.join (random.choice ('0123456789abcdef') for _ in range (8)))
|
job_id = '{}-{}'.format (name, ''.join (random.choice ('0123456789abcdef') for _ in range (8)))
|
||||||
|
import queue
|
||||||
|
self.q = queue.Queue() ## a single queue works for us because we never have more than one long_running_job at the same time
|
||||||
self.thread_list[job_id] = {
|
self.thread_list[job_id] = {
|
||||||
'thread': ThreadWithResult (target=f, args=args),
|
'thread': ThreadWithResult (target=f, args=(self.q,) + args),
|
||||||
'starttime': time.time(),
|
'starttime': time.time(),
|
||||||
|
'child_pid': None,
|
||||||
'running': True,
|
'running': True,
|
||||||
'result': None
|
'result': None
|
||||||
}
|
}
|
||||||
self.thread_list[job_id]['thread'].start()
|
self.thread_list[job_id]['thread'].start()
|
||||||
return { 'job_id': job_id }
|
return { 'job_id': job_id }
|
||||||
|
|
||||||
|
## para matar threads tengo lo siguiente:
|
||||||
|
## - aqui en _long_running_job meto una cola en self.q
|
||||||
|
## - (self.q fue inicializado a None al instanciar el objeto, para evitar error "objeto no tiene 'q'")
|
||||||
|
## - en el thread_list también tengo un child_pid para almacenar el pid de los procesos hijos
|
||||||
|
## - al crear el ThreadWithResult le paso la cola, y luego en run() la recojo y la meto en el self.q del thread
|
||||||
|
## - en interfaceAdmin() al hacer subprocess.Popen(), recojo el pid y lo escribo en la queue
|
||||||
|
## - en mon() recojo pids de la queue y los meto en thread_list 'child_pid'
|
||||||
|
## - algunas funciones llaman a interfaceAdmin más de una vez, y escriben más de un pid en la cola, y en mon() voy recogiendo y actualizando
|
||||||
|
## - por ejemplo EjecutarScript llama a interfaceAdmin() y luego llama a LeeConfiguracion() el cual llama a interfaceAdmin() otra vez
|
||||||
|
## - y cuando nos llamen a KillJob, terminamos en killer() el cual coge el 'child_pid' y zas
|
||||||
|
## - pero a lo mejor el child ya terminó
|
||||||
|
## - o a lo mejor el KillJob nos llegó demasiado pronto y todavía no hubo ningún child
|
||||||
|
##
|
||||||
|
## $ curl --insecure -X POST --data '{"nfn":"EjecutarScript","scp":"cd /usr; sleep 30; pwd; ls","ids":"0"}' https://192.168.2.199:8000/ogAdmClient/EjecutarScript
|
||||||
|
## {"job_id": "EjecutarScript-333feb3f"}
|
||||||
|
## $ curl --insecure -X POST --data '{"job_id":"EjecutarScript-333feb3f"}' https://192.168.2.199:8000/CloningEngine/KillJob
|
||||||
|
##
|
||||||
|
## funciona bien, excepto que el PID no muere xD, ni siquiera haciendo subprocess.run('kill')
|
||||||
|
|
Loading…
Reference in New Issue