refs #1108 add WIP for killing subprocesses

ogcore1
Natalia Serrano 2024-11-15 11:41:12 +01:00
parent e2fcf02222
commit 87a5258de5
5 changed files with 131 additions and 18 deletions

View File

@ -1,3 +1,9 @@
ogagent (1.4.5~pre3-1) stable; urgency=medium
* Kill long running jobs in oglive
-- OpenGnsys developers <info@opengnsys.es> Wed, 06 Nov 2024 14:11:32 +0100
ogagent (1.4.5~pre2-1) stable; urgency=medium
* Remove race condition due to several monitoring threads

View File

@ -1 +1 @@
1.4.5-pre1
1.4.5-pre3

View File

@ -322,3 +322,10 @@ class CloningEngineWorker (ogLiveWorker):
def process_InventarioSoftware (self, path, get_params, post_params, server):
logger.debug ('in process_InventarioSoftware, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server))
return self._long_running_job ('InventarioSoftware', self.do_InventarioSoftware, args=(post_params,))
## curl --insecure -X POST --data '{"job_id":"foo"}' https://192.168.2.199:8000/CloningEngine/KillJob
def process_KillJob (self, path, get_params, post_params, server):
logger.debug ('in process_KillJob, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server))
jid = post_params['job_id']
r = self.killer (jid)
return r

View File

@ -561,3 +561,10 @@ class ogAdmClientWorker (ogLiveWorker):
def process_EjecutaComandosPendientes (self, path, get_params, post_params, server):
logger.debug ('in process_EjecutaComandosPendientes, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server))
return {'true':'true'} ## ogAdmClient.c:2138
## curl --insecure -X POST --data '{"job_id":"foo"}' https://192.168.2.199:8000/ogAdmClient/KillJob
def process_KillJob (self, path, get_params, post_params, server):
logger.debug ('in process_KillJob, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server))
jid = post_params['job_id']
r = self.killer (jid)
return r

View File

@ -35,6 +35,7 @@ import time
import random
import subprocess
import threading
import signal
from configparser import NoOptionError
from opengnsys import REST
@ -57,6 +58,7 @@ class ThreadWithResult (threading.Thread):
class ogLiveWorker(ServerWorker):
thread_list = {}
thread_lock = threading.Lock()
tbErroresScripts = [
"Se han generado errores desconocidos. No se puede continuar la ejecución de este módulo", ## 0
@ -142,20 +144,65 @@ class ogLiveWorker(ServerWorker):
result['job_id'] = job_id
res = self.REST.sendMessage ('clients/status/webhook', result)
def killer (self, job_id):
logger.debug (f'killer() called, job_id ({job_id})')
if job_id not in self.thread_list: return { 'res': 2, 'der': 'Unknown job' }
with self.thread_lock:
if 'thread' not in self.thread_list[job_id]: return { 'res': 2, 'der': 'Job is not running' }
t = self.thread_list[job_id]['thread']
pid = self.thread_list[job_id]['child_pid']
logger.debug (f'pid ({pid})')
try_times = 8
sig = signal.SIGTERM
while True:
t.join (0.05)
if not t.is_alive():
logger.debug (f'thread exited, yay!')
## limpieza
self.q = None
self.thread_list[job_id]['child_pid'] = None
break
if pid:
if os.path.exists (f'/proc/{pid}'):
logger.debug (f'would os.kill pid ({pid})')
## si el proceso se muere justo aquí, no pasa nada: la señal va al vacío
#os.kill (pid, sig)
else:
logger.debug (f'pid ({pid}) is gone, nothing to kill...')
self.thread_list[job_id]['child_pid'] = None
else:
logger.debug (f'oops no tenemos a quien matar')
if not try_times: break
if 4 == try_times: sig = signal.SIGKILL ## change signal after a few tries
try_times -= 1
time.sleep (0.4)
## y si no lo hemos conseguido, qué??
return {'job_id':job_id}
def mon (self):
while True:
#print ('mon(): iterating')
for k in self.thread_list:
elem = self.thread_list[k]
if 'thread' not in elem: continue
logger.debug (f'considering thread ({k})')
elem['thread'].join (0.05)
if not elem['thread'].is_alive():
logger.debug (f'is no longer alive, k ({k}) thread ({elem["thread"]})')
elem['running'] = False
elem['result'] = elem['thread'].result
del elem['thread']
self.notifier (k, elem['result'])
with self.thread_lock:
for k in self.thread_list:
elem = self.thread_list[k]
if 'thread' not in elem: continue
logger.debug (f'considering thread ({k})')
if self.q:
if not q.empty():
elem['child_pid'] = q.get()
logger.debug (f'queue not empty, got pid ({elem["child_pid"]})')
else:
logger.debug (f'queue empty')
elem['thread'].join (0.05)
if not elem['thread'].is_alive():
logger.debug (f'is no longer alive, k ({k}) thread ({elem["thread"]})')
elem['running'] = False
elem['result'] = elem['thread'].result
del elem['thread']
self.notifier (k, elem['result'])
time.sleep (1)
@ -173,22 +220,36 @@ class ogLiveWorker(ServerWorker):
else:
proc = ['bash', '-c', '{} {}'.format (devel_bash_prefix, exe)]
logger.debug ('subprocess.run ("{}", capture_output=True)'.format (proc))
p = subprocess.run (proc, capture_output=True)
#p = subprocess.run (proc, capture_output=True)
p = subprocess.Popen (proc, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if self.q:
self.q.put (p.pid)
else:
logger.debug ('oops, queremos escribir el PID del hijo a la cola pero no hay cola')
sout = serr = ''
while p.poll() is None:
for l in iter (p.stdout.readline, b''): sout += l.decode ('utf-8', 'ignore')
for l in iter (p.stderr.readline, b''): serr += l.decode ('utf-8', 'ignore')
time.sleep (1)
sout = sout.strip()
serr = serr.strip()
## DEBUG
logger.info (f'stdout follows:')
for l in p.stdout.strip().decode ('utf-8').splitlines():
for l in sout.splitlines():
logger.info (f' {l}')
logger.info (f'stderr follows:')
for l in p.stderr.strip().decode ('utf-8').splitlines():
for l in serr.splitlines():
logger.info (f' {l}')
## /DEBUG
if 0 != p.returncode:
cmd_txt = ' '.join (proc)
logger.error (f'command ({cmd_txt}) failed, stderr follows:')
for l in p.stderr.strip().decode ('utf-8').splitlines():
for l in serr.splitlines():
logger.error (f' {l}')
raise Exception (f'command ({cmd_txt}) failed, see log for details')
return p.stdout.strip().decode ('utf-8')
return sout
def tomaIPlocal (self):
try:
@ -303,6 +364,7 @@ class ogLiveWorker(ServerWorker):
self.idproautoexec = None
self.idcentro = None ## Identificador del centro
self.idaula = None ## Identificador del aula
self.q = None ## for passing PIDs around
try:
url = self.service.config.get (self.name, 'remote')
@ -336,11 +398,42 @@ class ogLiveWorker(ServerWorker):
return { 'job_id': None, 'message': 'some job is already running, refusing to launch another one' }
job_id = '{}-{}'.format (name, ''.join (random.choice ('0123456789abcdef') for _ in range (8)))
import queue
self.q = queue.Queue() ## a single queue works for us because we never have more than one long_running_job at the same time
self.thread_list[job_id] = {
'thread': ThreadWithResult (target=f, args=args),
'starttime': time.time(),
'child_pid': None,
'running': True,
'result': None
}
self.thread_list[job_id]['thread'].start()
return { 'job_id': job_id }
## para matar threads tengo lo siguiente:
## - aqui en _long_running_job meto una cola en self.q
## - (self.q fue inicializado a None al instanciar el objeto, para evitar error "objeto no tiene 'q'")
## - en el thread_list también tengo un child_pid para almacenar el pid de los procesos hijos
## - en interfaceAdm() al hacer subprocess.Popen(), recojo el pid y lo escribo en la queue
## - en mon() recojo pids de la queue y los meto en thread_list 'child_pid'
## - algunas funciones llaman a interfaceAdm más de una vez, y escriben más de un pid en la cola, y en mon() voy recogiendo y actualizando
## - por ejemplo EjecutarScript llama a interfaceAdm() y luego llama a LeeConfiguracion() el cual llama a interfaceAdm() otra vez
## - y cuando nos llamen a KillJob, terminamos en killer() el cual coge el 'child_pid' y zas
## - pero a lo mejor el child ya terminó
## - o a lo mejor el KillJob nos llegó demasiado pronto y todavía no hubo ningún child
##
## está sin probar. Simplemente probé que el agente arranca (o sea, que no lo rompí con estos cambios)
## versión 1.4.5-pre3, desplegada en entornos de desarrollo y funciona bien
## la idea sería mandarle un EjecutarScript 'sleep 30' y luego un KillJob
##
## $ curl --insecure -X POST --data '{"nfn":"EjecutarScript","scp":"cd /usr; sleep 30; pwd; ls","ids":"0"}' https://192.168.2.199:8000/ogAdmClient/EjecutarScript
## {"job_id": "EjecutarScript-333feb3f"}
## $ curl --insecure -X POST --data '{"job_id":"EjecutarScript-333feb3f"}' https://192.168.2.199:8000/CloningEngine/KillJob
##
## el KillJob de primeras no va a hacer nada (la llamada a os.kill() está comentada)
## entonces sería probar primero que el flujo va como espero:
## - que primero salga "would kill pid" en el log
## - y pasados unos segundos, al llamar a KillJob otra vez, salga "pid is gone"
## o en otra prueba, metiéndole un time.sleep() a piñón en interfaceAdm() antes de lanzar un hijo
## - que salga "oops no tenemos a quien matar" en el log
## y ya con esto comprobado, descomentar el os.kill() y hacer pruebas reales