ogcore1 #14
|
@ -1,3 +1,66 @@
|
|||
ogagent (1.4.6-1) UNRELEASED; urgency=medium
|
||||
|
||||
* Point to the new menu browser
|
||||
|
||||
-- OpenGnsys developers <info@opengnsys.es> Tue, 14 Jan 2025 12:00:24 +0100
|
||||
|
||||
ogagent (1.4.5-1) stable; urgency=medium
|
||||
|
||||
* Kill long running jobs in oglive
|
||||
|
||||
-- OpenGnsys developers <info@opengnsys.es> Fri, 29 Nov 2024 10:22:36 +0100
|
||||
|
||||
ogagent (1.4.5~pre8-1) stable; urgency=medium
|
||||
|
||||
* Add Configurar() to the CloningEngine module
|
||||
|
||||
-- OpenGnsys developers <info@opengnsys.es> Wed, 27 Nov 2024 20:02:42 +0100
|
||||
|
||||
ogagent (1.4.5~pre7-1) stable; urgency=medium
|
||||
|
||||
* Use old browser again
|
||||
|
||||
-- OpenGnsys developers <info@opengnsys.es> Wed, 20 Nov 2024 14:24:44 +0100
|
||||
|
||||
ogagent (1.4.5~pre6-1) stable; urgency=medium
|
||||
|
||||
* Do not use envvars for the operating-system module
|
||||
|
||||
-- OpenGnsys developers <info@opengnsys.es> Wed, 20 Nov 2024 13:45:21 +0100
|
||||
|
||||
ogagent (1.4.5~pre5-1) stable; urgency=medium
|
||||
|
||||
* Avoid some KeyErrors
|
||||
|
||||
-- OpenGnsys developers <info@opengnsys.es> Mon, 18 Nov 2024 12:14:27 +0100
|
||||
|
||||
ogagent (1.4.5~pre4-1) stable; urgency=medium
|
||||
|
||||
* Don't die when ogcore returns HTTP 4xx or 5xx
|
||||
* Get ogcore IP and port from the environment
|
||||
|
||||
-- OpenGnsys developers <info@opengnsys.es> Fri, 15 Nov 2024 11:43:01 +0100
|
||||
|
||||
ogagent (1.4.5~pre3-1) stable; urgency=medium
|
||||
|
||||
* Kill long running jobs in oglive (not-yet-working draft)
|
||||
|
||||
-- OpenGnsys developers <info@opengnsys.es> Wed, 06 Nov 2024 14:11:32 +0100
|
||||
|
||||
ogagent (1.4.5~pre2-1) stable; urgency=medium
|
||||
|
||||
* Remove race condition due to several monitoring threads
|
||||
* Include job_id in asynchronous responses
|
||||
* Remove vim swapfiles from the package contents
|
||||
|
||||
-- OpenGnsys developers <info@opengnsys.es> Wed, 06 Nov 2024 13:24:03 +0100
|
||||
|
||||
ogagent (1.4.5~pre1-1) stable; urgency=medium
|
||||
|
||||
* CrearImagen: return inventory inline
|
||||
|
||||
-- OpenGnsys developers <info@opengnsys.es> Wed, 06 Nov 2024 12:41:14 +0100
|
||||
|
||||
ogagent (1.4.4-1) stable; urgency=medium
|
||||
|
||||
* Use logger.debug() to prevent the windows agent from dying
|
||||
|
|
|
@ -22,6 +22,7 @@ install: build
|
|||
dh_prep
|
||||
dh_installdirs
|
||||
$(MAKE) DESTDIR=$(CURDIR)/debian/ogagent install-ogagent
|
||||
find $(CURDIR) -name '*.swp' -exec rm -f '{}' ';'
|
||||
binary-arch: build install
|
||||
# emptyness
|
||||
binary-indep: build install
|
||||
|
|
|
@ -178,7 +178,7 @@ def oac_recibe_archivo():
|
|||
logging.info(f'dec ({dec})')
|
||||
return jsonify({'anything':'anything'}) ## if we return {}, then we trigger "if not {}" which happens to be true
|
||||
|
||||
@app.route('/opengnsys/rest/ogAdmClient/callback', methods=['POST'])
|
||||
@app.route('/opengnsys/rest/clients/status/webhook', methods=['POST'])
|
||||
def oac_callback():
|
||||
logging.info(f'{request.get_json()}')
|
||||
return jsonify({'anything':'anything'})
|
||||
|
|
|
@ -1 +1 @@
|
|||
1.4.4
|
||||
1.4.6
|
||||
|
|
|
@ -23,15 +23,15 @@ log=DEBUG
|
|||
[ogAdmClient]
|
||||
#path=test_modules/server,more_modules/server
|
||||
|
||||
remote=https://192.168.2.1/opengnsys/rest
|
||||
remote={}://{}/opengnsys/rest
|
||||
log=DEBUG
|
||||
pathinterface=/opt/opengnsys/interfaceAdm
|
||||
urlMenu=https://192.168.2.1/opengnsys/varios/menubrowser.php
|
||||
urlMenu={}://{}/menu-browser
|
||||
urlMsg=http://localhost/cgi-bin/httpd-log.sh
|
||||
|
||||
[CloningEngine]
|
||||
remote=https://192.168.2.1/opengnsys/rest
|
||||
remote={}://{}/opengnsys/rest
|
||||
log=DEBUG
|
||||
pathinterface=/opt/opengnsys/interfaceAdm
|
||||
urlMenu=https://192.168.2.1/opengnsys/varios/menubrowser.php
|
||||
urlMenu={}://{}/menu-browser
|
||||
urlMsg=http://localhost/cgi-bin/httpd-log.sh
|
||||
|
|
|
@ -168,4 +168,9 @@ class REST(object):
|
|||
url = self._getUrl(msg)
|
||||
logger.debug('Requesting {}'.format(url))
|
||||
|
||||
return self._request(url, data)
|
||||
try:
|
||||
res = self._request(url, data)
|
||||
return res
|
||||
except:
|
||||
logger.exception()
|
||||
return None
|
||||
|
|
|
@ -42,13 +42,13 @@ class CloningEngineWorker (ogLiveWorker):
|
|||
REST = None # REST object
|
||||
|
||||
def onActivation (self):
|
||||
super().onActivation()
|
||||
super().onActivation (run_monitoring_thread=False)
|
||||
logger.info ('onActivation ok')
|
||||
|
||||
def onDeactivation (self):
|
||||
logger.debug ('onDeactivation')
|
||||
|
||||
def InventariandoSoftware (self, dsk, par, sw, nfn):
|
||||
def InventariandoSoftware (self, dsk, par, nfn):
|
||||
sft_src = f'/tmp/CSft-{self.IPlocal}-{par}'
|
||||
try:
|
||||
self.interfaceAdmin (nfn, [dsk, par, sft_src])
|
||||
|
@ -58,31 +58,23 @@ class CloningEngineWorker (ogLiveWorker):
|
|||
|
||||
if herror:
|
||||
logger.warning ('Error al ejecutar el comando')
|
||||
b64 = ''
|
||||
self.muestraMensaje (20)
|
||||
else:
|
||||
if not os.path.exists (sft_src):
|
||||
raise Exception (f'interfaceAdmin({nfn}) returned success but did not create file ({sft_src})')
|
||||
sft_src_contents = Path (sft_src).read_bytes()
|
||||
|
||||
## Envía fichero de inventario al servidor
|
||||
sft_dst = f'/tmp/Ssft-{self.IPlocal}-{par}' ## Nombre que tendra el archivo en el Servidor
|
||||
logger.debug ('sending recibeArchivo to server')
|
||||
res = self.enviaMensajeServidor ('recibeArchivo', { 'nfl': sft_dst, 'contents': base64.b64encode (sft_src_contents).decode ('utf-8') })
|
||||
logger.debug (res)
|
||||
if not res:
|
||||
herror = 12 ## Error de envío de fichero por la red
|
||||
raise Exception ('Ha ocurrido algún problema al enviar un archivo por la red')
|
||||
b64 = base64.b64encode (sft_src_contents).decode ('utf-8')
|
||||
self.muestraMensaje (19)
|
||||
|
||||
if not sw:
|
||||
cmd = {
|
||||
'nfn': 'RESPUESTA_InventarioSoftware',
|
||||
'par': par,
|
||||
'sft': sft_dst,
|
||||
}
|
||||
return self.respuestaEjecucionComando (cmd, herror, 0)
|
||||
|
||||
return {'true':'true'} ## XXX
|
||||
cmd = {
|
||||
'nfn': 'RESPUESTA_InventarioSoftware',
|
||||
'dsk': dsk, ## not in the original C code, around ogAdmClient.c:1944
|
||||
'par': par,
|
||||
'contents': b64,
|
||||
}
|
||||
return self.respuestaEjecucionComando (cmd, herror, 0)
|
||||
|
||||
def do_CrearImagen (self, post_params):
|
||||
for k in ['dsk', 'par', 'cpt', 'idi', 'nci', 'ipr', 'nfn', 'ids']:
|
||||
|
@ -102,13 +94,14 @@ class CloningEngineWorker (ogLiveWorker):
|
|||
self.muestraMensaje (7)
|
||||
|
||||
try:
|
||||
res = self.InventariandoSoftware (dsk, par, False, 'InventarioSoftware') ## Crea inventario Software previamente
|
||||
res = self.InventariandoSoftware (dsk, par, 'InventarioSoftware') ## Crea inventario Software previamente
|
||||
except:
|
||||
logger.warning ('Error al ejecutar el comando')
|
||||
return {}
|
||||
|
||||
if res:
|
||||
if res['contents']:
|
||||
self.muestraMensaje (2)
|
||||
inv_sft = res['contents']
|
||||
try:
|
||||
self.interfaceAdmin (nfn, [dsk, par, nci, ipr])
|
||||
self.muestraMensaje (9)
|
||||
|
@ -119,16 +112,18 @@ class CloningEngineWorker (ogLiveWorker):
|
|||
herror = 1
|
||||
else:
|
||||
logger.warning ('Error al ejecutar el comando')
|
||||
inv_sft = ''
|
||||
|
||||
self.muestraMenu()
|
||||
|
||||
cmd = {
|
||||
'nfn': 'RESPUESTA_CrearImagen',
|
||||
'idi': idi, ## Identificador de la imagen
|
||||
'dsk': dsk, ## Número de disco
|
||||
'par': par, ## Número de partición de donde se creó
|
||||
'cpt': cpt, ## Tipo o código de partición
|
||||
'ipr': ipr, ## Ip del repositorio donde se alojó
|
||||
'nfn': 'RESPUESTA_CrearImagen',
|
||||
'idi': idi, ## Identificador de la imagen
|
||||
'dsk': dsk, ## Número de disco
|
||||
'par': par, ## Número de partición de donde se creó
|
||||
'cpt': cpt, ## Tipo o código de partición
|
||||
'ipr': ipr, ## Ip del repositorio donde se alojó
|
||||
'inv_sft': inv_sft,
|
||||
}
|
||||
return self.respuestaEjecucionComando (cmd, herror, ids)
|
||||
|
||||
|
@ -151,7 +146,10 @@ class CloningEngineWorker (ogLiveWorker):
|
|||
self.muestraMensaje (3)
|
||||
|
||||
try:
|
||||
self.interfaceAdmin (nfn, [dsk, par, nci, ipr, ptc])
|
||||
## the ptc.split() is useless right now, since interfaceAdmin() does ' '.join(params) in order to spawn a shell
|
||||
## however we're going to need it in the future (when everything gets translated into python), plus it's harmless now. So let's do it
|
||||
#self.interfaceAdmin (nfn, [dsk, par, nci, ipr, ptc])
|
||||
self.interfaceAdmin (nfn, [dsk, par, nci, ipr] + ptc.split())
|
||||
self.muestraMensaje (11)
|
||||
herror = 0
|
||||
except:
|
||||
|
@ -192,13 +190,31 @@ class CloningEngineWorker (ogLiveWorker):
|
|||
|
||||
nfn = post_params['nfn']
|
||||
dsk = post_params['dsk']
|
||||
cfg = post_params['cfg'].replace('\n','$').replace('\t','#')
|
||||
cfg = post_params['cfg']
|
||||
ids = post_params['ids']
|
||||
|
||||
self.muestraMensaje (4)
|
||||
|
||||
params = []
|
||||
disk_info = cfg.pop (0)
|
||||
logger.debug (f'disk_info ({disk_info})')
|
||||
for k in ['dis', 'che', 'tch']:
|
||||
params.append (f'{k}={disk_info[k]}')
|
||||
disk_info_str = '*'.join (params)
|
||||
|
||||
partitions = []
|
||||
for entry in cfg:
|
||||
logger.debug (f'entry ({entry})')
|
||||
params = []
|
||||
for k in ['par', 'cpt', 'sfi', 'tam', 'ope']:
|
||||
params.append (f'{k}={entry[k]}')
|
||||
partitions.append ('*'.join (params))
|
||||
part_info_str = '%'.join (partitions)
|
||||
|
||||
cfg_str = f'{disk_info_str}!{part_info_str}%'
|
||||
|
||||
try:
|
||||
self.interfaceAdmin (nfn, [dsk, cfg])
|
||||
self.interfaceAdmin (nfn, ['ignored', cfg_str])
|
||||
self.muestraMensaje (14)
|
||||
herror = 0
|
||||
except:
|
||||
|
@ -275,16 +291,14 @@ class CloningEngineWorker (ogLiveWorker):
|
|||
self.muestraMensaje (7)
|
||||
|
||||
try:
|
||||
self.InventariandoSoftware (dsk, par, True, 'InventarioSoftware')
|
||||
cmd = self.InventariandoSoftware (dsk, par, 'InventarioSoftware')
|
||||
herror = 0
|
||||
except:
|
||||
logger.warning ('Error al ejecutar el comando')
|
||||
cmd = { 'nfn': 'RESPUESTA_InventarioSoftware' }
|
||||
herror = 1
|
||||
|
||||
self.muestraMenu()
|
||||
cmd = {
|
||||
'nfn': 'RESPUESTA_InventarioSoftware',
|
||||
}
|
||||
return self.respuestaEjecucionComando (cmd, herror, ids)
|
||||
|
||||
def process_CrearImagen (self, path, get_params, post_params, server):
|
||||
|
@ -329,3 +343,13 @@ class CloningEngineWorker (ogLiveWorker):
|
|||
def process_InventarioSoftware (self, path, get_params, post_params, server):
|
||||
logger.debug ('in process_InventarioSoftware, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server))
|
||||
return self._long_running_job ('InventarioSoftware', self.do_InventarioSoftware, args=(post_params,))
|
||||
|
||||
## curl --insecure -X POST --data '{"job_id":"foo"}' https://192.168.2.199:8000/CloningEngine/KillJob
|
||||
def process_KillJob (self, path, get_params, post_params, server):
|
||||
logger.debug ('in process_KillJob, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server))
|
||||
jid = post_params['job_id']
|
||||
r = self.killer (jid)
|
||||
logger.debug (f'r bef ({r})')
|
||||
r.update ({ 'nfn':'RESPUESTA_KillJob', 'job':jid })
|
||||
logger.debug (f'r aft ({r})')
|
||||
return r
|
||||
|
|
|
@ -279,7 +279,7 @@ class ogAdmClientWorker (ogLiveWorker):
|
|||
#}
|
||||
|
||||
def onActivation (self):
|
||||
super().onActivation()
|
||||
super().onActivation (run_monitoring_thread=True)
|
||||
logger.info ('Inicio de sesion')
|
||||
logger.info ('Abriendo sesión en el servidor de Administración')
|
||||
if (not self.inclusionCliente()):
|
||||
|
@ -561,3 +561,13 @@ class ogAdmClientWorker (ogLiveWorker):
|
|||
def process_EjecutaComandosPendientes (self, path, get_params, post_params, server):
|
||||
logger.debug ('in process_EjecutaComandosPendientes, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server))
|
||||
return {'true':'true'} ## ogAdmClient.c:2138
|
||||
|
||||
## curl --insecure -X POST --data '{"job_id":"foo"}' https://192.168.2.199:8000/ogAdmClient/KillJob
|
||||
def process_KillJob (self, path, get_params, post_params, server):
|
||||
logger.debug ('in process_KillJob, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server))
|
||||
jid = post_params['job_id']
|
||||
r = self.killer (jid)
|
||||
logger.debug (f'r bef ({r})')
|
||||
r.update ({ 'nfn':'RESPUESTA_KillJob', 'job':jid })
|
||||
logger.debug (f'r aft ({r})')
|
||||
return r
|
||||
|
|
|
@ -35,6 +35,7 @@ import time
|
|||
import random
|
||||
import subprocess
|
||||
import threading
|
||||
import signal
|
||||
|
||||
from configparser import NoOptionError
|
||||
from opengnsys import REST
|
||||
|
@ -47,16 +48,20 @@ class ThreadWithResult (threading.Thread):
|
|||
try:
|
||||
self.result = None
|
||||
if self._target is not None:
|
||||
## the first arg in self._args is the queue
|
||||
self.q = self._args[0]
|
||||
self._args = self._args[1:]
|
||||
try:
|
||||
self.result = self._target (*self._args, **self._kwargs)
|
||||
except Exception as e:
|
||||
self.result = { 'res': 2, 'der': f'got exception: ({e})' } ## res=2 as defined in ogAdmClient.c:2048
|
||||
finally:
|
||||
# Avoid a refcycle if the thread is running a function with an argument that has a member that points to the thread.
|
||||
del self._target, self._args, self._kwargs
|
||||
del self._target, self._args, self._kwargs, self.q
|
||||
|
||||
class ogLiveWorker(ServerWorker):
|
||||
thread_list = {}
|
||||
thread_lock = threading.Lock()
|
||||
|
||||
tbErroresScripts = [
|
||||
"Se han generado errores desconocidos. No se puede continuar la ejecución de este módulo", ## 0
|
||||
|
@ -137,25 +142,79 @@ class ogLiveWorker(ServerWorker):
|
|||
"Error desconocido",
|
||||
]
|
||||
|
||||
def notifier (self, result):
|
||||
logger.debug (f'notifier() called, result ({result})')
|
||||
res = self.REST.sendMessage ('/clients/status/webhook', result)
|
||||
def notifier (self, job_id, result):
|
||||
logger.debug (f'notifier() called, job_id ({job_id}) result ({result})')
|
||||
result['job_id'] = job_id
|
||||
self.REST.sendMessage ('clients/status/webhook', result)
|
||||
|
||||
def killer (self, job_id):
|
||||
logger.debug (f'killer() called, job_id ({job_id})')
|
||||
if job_id not in self.thread_list: return { 'res': 2, 'der': 'Unknown job' }
|
||||
|
||||
with self.thread_lock:
|
||||
if 'thread' not in self.thread_list[job_id]: return { 'res': 2, 'der': 'Job is not running' }
|
||||
t = self.thread_list[job_id]['thread']
|
||||
pid = self.thread_list[job_id]['child_pid']
|
||||
logger.debug (f'pid ({pid})')
|
||||
try_times = 8
|
||||
sig = signal.SIGTERM
|
||||
msg = f'could not kill pid ({pid}) after ({try_times}) tries'
|
||||
success = 2 ## mimic cmd['res'] in respuestaEjecucionComando(): "1" means success, "2" means failed
|
||||
while True:
|
||||
t.join (0.05)
|
||||
if not t.is_alive():
|
||||
msg = 'job terminated'
|
||||
success = 1
|
||||
logger.debug (msg)
|
||||
self.thread_list[job_id]['child_pid'] = None
|
||||
break
|
||||
## race condition: if the subprocess finishes just here, then we already checked that t.is_alive() is true, but os.path.exists(/proc/pid) will be false below. msg will be 'nothing to kill'.
|
||||
## this is fine in the first iteration of the loop, before we send any signals. In the rest of iterations, after some signals were sent, msg should be 'job terminated' instead.
|
||||
if pid:
|
||||
if os.path.exists (f'/proc/{pid}'):
|
||||
logger.debug (f'sending signal ({sig}) to pid ({pid})')
|
||||
## if the process finishes just here, nothing happens: the signal is sent to the void
|
||||
os.kill (pid, sig)
|
||||
#subprocess.run (['kill', '--signal', str(sig), str(pid)])
|
||||
else:
|
||||
msg = f'pid ({pid}) is gone, nothing to kill'
|
||||
success = 1
|
||||
logger.debug (msg)
|
||||
self.thread_list[job_id]['child_pid'] = None
|
||||
break
|
||||
else:
|
||||
msg = 'no PID to kill'
|
||||
logger.debug (msg)
|
||||
|
||||
if not try_times: break
|
||||
if 4 == try_times: sig = signal.SIGKILL ## change signal after a few tries
|
||||
try_times -= 1
|
||||
time.sleep (0.4)
|
||||
|
||||
return { 'res':success, 'der':msg }
|
||||
|
||||
def mon (self):
|
||||
while True:
|
||||
#print ('mon(): iterating')
|
||||
for k in self.thread_list:
|
||||
elem = self.thread_list[k]
|
||||
if 'thread' not in elem: continue
|
||||
logger.debug (f'considering thread ({k})')
|
||||
try: elem['thread'].join (0.05)
|
||||
except RuntimeError: pass ## race condition: a thread is created and this code runs before it is start()ed
|
||||
if not elem['thread'].is_alive():
|
||||
logger.debug (f'is no longer alive, k ({k}) thread ({elem["thread"]})')
|
||||
elem['running'] = False
|
||||
elem['result'] = elem['thread'].result
|
||||
del elem['thread']
|
||||
self.notifier (elem['result'])
|
||||
with self.thread_lock:
|
||||
for k in self.thread_list:
|
||||
elem = self.thread_list[k]
|
||||
if 'thread' not in elem: continue
|
||||
logger.debug (f'considering thread ({k})')
|
||||
|
||||
if self.q:
|
||||
if not self.q.empty():
|
||||
elem['child_pid'] = self.q.get()
|
||||
logger.debug (f'queue not empty, got pid ({elem["child_pid"]})')
|
||||
else:
|
||||
logger.debug (f'queue empty')
|
||||
|
||||
elem['thread'].join (0.05)
|
||||
if not elem['thread'].is_alive():
|
||||
logger.debug (f'is no longer alive, k ({k}) thread ({elem["thread"]})')
|
||||
elem['running'] = False
|
||||
elem['result'] = elem['thread'].result
|
||||
del elem['thread']
|
||||
self.notifier (k, elem['result'])
|
||||
|
||||
time.sleep (1)
|
||||
|
||||
|
@ -169,26 +228,40 @@ class ogLiveWorker(ServerWorker):
|
|||
'''
|
||||
|
||||
if parametros:
|
||||
proc = ['bash', '-c', '{} bash -x {} {}'.format (devel_bash_prefix, exe, ' '.join (parametros))]
|
||||
proc = ['bash', '-c', '{} {} {}'.format (devel_bash_prefix, exe, ' '.join (parametros))]
|
||||
else:
|
||||
proc = ['bash', '-c', '{} bash -x {}'.format (devel_bash_prefix, exe)]
|
||||
proc = ['bash', '-c', '{} {}'.format (devel_bash_prefix, exe)]
|
||||
logger.debug ('subprocess.run ("{}", capture_output=True)'.format (proc))
|
||||
p = subprocess.run (proc, capture_output=True)
|
||||
|
||||
p = subprocess.Popen (proc, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
if self.q:
|
||||
self.q.put (p.pid)
|
||||
else:
|
||||
## esto sucede por ejemplo cuando arranca el agente, que estamos en interfaceAdmin() en el mismo hilo, sin _long_running_job ni hilo separado
|
||||
logger.debug ('no queue--not writing any PID to it')
|
||||
sout = serr = ''
|
||||
while p.poll() is None:
|
||||
for l in iter (p.stdout.readline, b''): sout += l.decode ('utf-8', 'ignore')
|
||||
for l in iter (p.stderr.readline, b''): serr += l.decode ('utf-8', 'ignore')
|
||||
time.sleep (1)
|
||||
sout = sout.strip()
|
||||
serr = serr.strip()
|
||||
|
||||
## DEBUG
|
||||
logger.info (f'stdout follows:')
|
||||
for l in p.stdout.strip().decode ('utf-8').splitlines():
|
||||
for l in sout.splitlines():
|
||||
logger.info (f' {l}')
|
||||
logger.info (f'stderr follows:')
|
||||
for l in p.stderr.strip().decode ('utf-8').splitlines():
|
||||
for l in serr.splitlines():
|
||||
logger.info (f' {l}')
|
||||
## /DEBUG
|
||||
if 0 != p.returncode:
|
||||
cmd_txt = ' '.join (proc)
|
||||
logger.error (f'command ({cmd_txt}) failed, stderr follows:')
|
||||
for l in p.stderr.strip().decode ('utf-8').splitlines():
|
||||
for l in serr.splitlines():
|
||||
logger.error (f' {l}')
|
||||
raise Exception (f'command ({cmd_txt}) failed, see log for details')
|
||||
return p.stdout.strip().decode ('utf-8')
|
||||
return sout
|
||||
|
||||
def tomaIPlocal (self):
|
||||
try:
|
||||
|
@ -237,15 +310,15 @@ class ogLiveWorker(ServerWorker):
|
|||
cmd['der'] = ''
|
||||
else: ## el comando tuvo algún error
|
||||
cmd['res'] = 2
|
||||
cmd['der'] = self.tbErroresScripts[herror] ## XXX
|
||||
cmd['der'] = self.tbErroresScripts[herror]
|
||||
|
||||
return cmd
|
||||
|
||||
def cargaPaginaWeb (self, url=None):
|
||||
if (not url): url = self.urlMenu
|
||||
os.system ('pkill -9 OGBrowser')
|
||||
os.system ('pkill -9 browser')
|
||||
|
||||
p = subprocess.Popen (['/usr/bin/OGBrowser', '-qws', url])
|
||||
p = subprocess.Popen (['/usr/bin/browser', '-qws', url])
|
||||
try:
|
||||
p.wait (2) ## if the process dies before 2 seconds...
|
||||
logger.error ('Error al ejecutar la llamada a la interface de administración')
|
||||
|
@ -288,7 +361,7 @@ class ogLiveWorker(ServerWorker):
|
|||
|
||||
return obj
|
||||
|
||||
def onActivation (self):
|
||||
def onActivation (self, run_monitoring_thread):
|
||||
if not os.path.exists ('/scripts/oginit'):
|
||||
## no estamos en oglive, este modulo no debe cargarse
|
||||
## esta lógica la saco de src/opengnsys/linux/operations.py, donde hay un if similar
|
||||
|
@ -303,13 +376,25 @@ class ogLiveWorker(ServerWorker):
|
|||
self.idproautoexec = None
|
||||
self.idcentro = None ## Identificador del centro
|
||||
self.idaula = None ## Identificador del aula
|
||||
self.q = None ## for passing PIDs around
|
||||
|
||||
ogcore_scheme = os.environ.get ('OGAGENTCFG_OGCORE_SCHEME', 'https')
|
||||
ogcore_ip = os.environ.get ('OGAGENTCFG_OGCORE_IP', '192.168.2.1')
|
||||
ogcore_port = os.environ.get ('OGAGENTCFG_OGCORE_PORT', '8443')
|
||||
urlmenu_scheme = os.environ.get ('OGAGENTCFG_URLMENU_SCHEME', 'https')
|
||||
urlmenu_ip = os.environ.get ('OGAGENTCFG_URLMENU_IP', '192.168.2.1')
|
||||
urlmenu_port = os.environ.get ('OGAGENTCFG_URLMENU_PORT', '8443')
|
||||
ogcore_ip_port = ':'.join (map (str, filter (None, [ogcore_ip, ogcore_port ])))
|
||||
urlmenu_ip_port = ':'.join (map (str, filter (None, [urlmenu_ip, urlmenu_port])))
|
||||
try:
|
||||
url = self.service.config.get (self.name, 'remote')
|
||||
loglevel = self.service.config.get (self.name, 'log')
|
||||
self.pathinterface = self.service.config.get (self.name, 'pathinterface')
|
||||
self.urlMenu = self.service.config.get (self.name, 'urlMenu')
|
||||
self.urlMsg = self.service.config.get (self.name, 'urlMsg')
|
||||
|
||||
url = url.format (ogcore_scheme, ogcore_ip_port)
|
||||
self.urlMenu = self.urlMenu.format (urlmenu_scheme, urlmenu_ip_port)
|
||||
except NoOptionError as e:
|
||||
logger.error ("Configuration error: {}".format (e))
|
||||
raise e
|
||||
|
@ -322,7 +407,8 @@ class ogLiveWorker(ServerWorker):
|
|||
if not self.tomaMAClocal():
|
||||
raise Exception ('Se han generado errores. No se puede continuar la ejecución de este módulo')
|
||||
|
||||
threading.Thread (name='monitoring_thread', target=self.mon, daemon=True).start()
|
||||
if run_monitoring_thread: ## should be true for exactly one ogLiveWorker
|
||||
threading.Thread (name='monitoring_thread', target=self.mon, daemon=True).start()
|
||||
|
||||
def _long_running_job (self, name, f, args):
|
||||
any_job_running = False
|
||||
|
@ -335,11 +421,33 @@ class ogLiveWorker(ServerWorker):
|
|||
return { 'job_id': None, 'message': 'some job is already running, refusing to launch another one' }
|
||||
|
||||
job_id = '{}-{}'.format (name, ''.join (random.choice ('0123456789abcdef') for _ in range (8)))
|
||||
import queue
|
||||
self.q = queue.Queue() ## a single queue works for us because we never have more than one long_running_job at the same time
|
||||
self.thread_list[job_id] = {
|
||||
'thread': ThreadWithResult (target=f, args=args),
|
||||
'thread': ThreadWithResult (target=f, args=(self.q,) + args),
|
||||
'starttime': time.time(),
|
||||
'child_pid': None,
|
||||
'running': True,
|
||||
'result': None
|
||||
}
|
||||
self.thread_list[job_id]['thread'].start()
|
||||
return { 'job_id': job_id }
|
||||
|
||||
## para matar threads tengo lo siguiente:
|
||||
## - aqui en _long_running_job meto una cola en self.q
|
||||
## - (self.q fue inicializado a None al instanciar el objeto, para evitar error "objeto no tiene 'q'")
|
||||
## - en el thread_list también tengo un child_pid para almacenar el pid de los procesos hijos
|
||||
## - al crear el ThreadWithResult le paso la cola, y luego en run() la recojo y la meto en el self.q del thread
|
||||
## - en interfaceAdmin() al hacer subprocess.Popen(), recojo el pid y lo escribo en la queue
|
||||
## - en mon() recojo pids de la queue y los meto en thread_list 'child_pid'
|
||||
## - algunas funciones llaman a interfaceAdmin más de una vez, y escriben más de un pid en la cola, y en mon() voy recogiendo y actualizando
|
||||
## - por ejemplo EjecutarScript llama a interfaceAdmin() y luego llama a LeeConfiguracion() el cual llama a interfaceAdmin() otra vez
|
||||
## - y cuando nos llamen a KillJob, terminamos en killer() el cual coge el 'child_pid' y zas
|
||||
## - pero a lo mejor el child ya terminó
|
||||
## - o a lo mejor el KillJob nos llegó demasiado pronto y todavía no hubo ningún child
|
||||
##
|
||||
## $ curl --insecure -X POST --data '{"nfn":"EjecutarScript","scp":"cd /usr; sleep 30; pwd; ls","ids":"0"}' https://192.168.2.199:8000/ogAdmClient/EjecutarScript
|
||||
## {"job_id": "EjecutarScript-333feb3f"}
|
||||
## $ curl --insecure -X POST --data '{"job_id":"EjecutarScript-333feb3f"}' https://192.168.2.199:8000/CloningEngine/KillJob
|
||||
##
|
||||
## funciona bien, excepto que el PID no muere xD, ni siquiera haciendo subprocess.run('kill')
|
||||
|
|
Loading…
Reference in New Issue