refs #1108 kill subprocesses in oglive

ogcore1
Natalia Serrano 2024-11-29 10:24:15 +01:00
parent be1fd7d624
commit 69be238f9f
5 changed files with 48 additions and 35 deletions

View File

@ -1,3 +1,9 @@
ogagent (1.4.5-1) stable; urgency=medium
* Kill long running jobs in oglive
-- OpenGnsys developers <info@opengnsys.es> Fri, 29 Nov 2024 10:22:36 +0100
ogagent (1.4.5~pre8-1) stable; urgency=medium
* Add Configurar() to the CloningEngine module
@ -31,7 +37,7 @@ ogagent (1.4.5~pre4-1) stable; urgency=medium
ogagent (1.4.5~pre3-1) stable; urgency=medium
* Kill long running jobs in oglive
* Kill long running jobs in oglive (not-yet-working draft)
-- OpenGnsys developers <info@opengnsys.es> Wed, 06 Nov 2024 14:11:32 +0100

View File

@ -1 +1 @@
1.4.5-pre8
1.4.5

View File

@ -349,4 +349,7 @@ class CloningEngineWorker (ogLiveWorker):
logger.debug ('in process_KillJob, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server))
jid = post_params['job_id']
r = self.killer (jid)
logger.debug (f'r bef ({r})')
r.update ({ 'nfn':'RESPUESTA_KillJob', 'job':jid })
logger.debug (f'r aft ({r})')
return r

View File

@ -567,4 +567,7 @@ class ogAdmClientWorker (ogLiveWorker):
logger.debug ('in process_KillJob, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server))
jid = post_params['job_id']
r = self.killer (jid)
logger.debug (f'r bef ({r})')
r.update ({ 'nfn':'RESPUESTA_KillJob', 'job':jid })
logger.debug (f'r aft ({r})')
return r

View File

@ -48,13 +48,16 @@ class ThreadWithResult (threading.Thread):
try:
self.result = None
if self._target is not None:
## the first arg in self._args is the queue
self.q = self._args[0]
self._args = self._args[1:]
try:
self.result = self._target (*self._args, **self._kwargs)
except Exception as e:
self.result = { 'res': 2, 'der': f'got exception: ({e})' } ## res=2 as defined in ogAdmClient.c:2048
finally:
# Avoid a refcycle if the thread is running a function with an argument that has a member that points to the thread.
del self._target, self._args, self._kwargs
del self._target, self._args, self._kwargs, self.q
class ogLiveWorker(ServerWorker):
thread_list = {}
@ -150,36 +153,45 @@ class ogLiveWorker(ServerWorker):
with self.thread_lock:
if 'thread' not in self.thread_list[job_id]: return { 'res': 2, 'der': 'Job is not running' }
t = self.thread_list[job_id]['thread']
t = self.thread_list[job_id]['thread']
pid = self.thread_list[job_id]['child_pid']
logger.debug (f'pid ({pid})')
try_times = 8
sig = signal.SIGTERM
msg = f'could not kill pid ({pid}) after ({try_times}) tries'
success = 2 ## mimic cmd['res'] in respuestaEjecucionComando(): "1" means success, "2" means failed
while True:
t.join (0.05)
if not t.is_alive():
logger.debug (f'thread exited, yay!')
## limpieza
self.q = None
msg = 'job terminated'
success = 1
logger.debug (msg)
self.thread_list[job_id]['child_pid'] = None
break
## race condition: if the subprocess finishes just here, then we already checked that t.is_alive() is true, but os.path.exists(/proc/pid) will be false below. msg will be 'nothing to kill'.
## this is fine in the first iteration of the loop, before we send any signals. In the rest of iterations, after some signals were sent, msg should be 'job terminated' instead.
if pid:
if os.path.exists (f'/proc/{pid}'):
logger.debug (f'would os.kill pid ({pid})')
## si el proceso se muere justo aquí, no pasa nada: la señal va al vacío
#os.kill (pid, sig)
logger.debug (f'sending signal ({sig}) to pid ({pid})')
## if the process finishes just here, nothing happens: the signal is sent to the void
os.kill (pid, sig)
#subprocess.run (['kill', '--signal', str(sig), str(pid)])
else:
logger.debug (f'pid ({pid}) is gone, nothing to kill...')
msg = f'pid ({pid}) is gone, nothing to kill'
success = 1
logger.debug (msg)
self.thread_list[job_id]['child_pid'] = None
break
else:
logger.debug (f'oops no tenemos a quien matar')
msg = 'no PID to kill'
logger.debug (msg)
if not try_times: break
if 4 == try_times: sig = signal.SIGKILL ## change signal after a few tries
try_times -= 1
time.sleep (0.4)
## y si no lo hemos conseguido, qué??
return {'job_id':job_id}
return { 'res':success, 'der':msg }
def mon (self):
while True:
@ -221,13 +233,12 @@ class ogLiveWorker(ServerWorker):
proc = ['bash', '-c', '{} {}'.format (devel_bash_prefix, exe)]
logger.debug ('subprocess.run ("{}", capture_output=True)'.format (proc))
#p = subprocess.run (proc, capture_output=True)
p = subprocess.Popen (proc, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if self.q:
self.q.put (p.pid)
#else:
# ## sale este mensaje en el log, y no se por que
# logger.debug ('oops, queremos escribir el PID del hijo a la cola pero no hay cola')
else:
## esto sucede por ejemplo cuando arranca el agente, que estamos en interfaceAdmin() en el mismo hilo, sin _long_running_job ni hilo separado
logger.debug ('no queue--not writing any PID to it')
sout = serr = ''
while p.poll() is None:
for l in iter (p.stdout.readline, b''): sout += l.decode ('utf-8', 'ignore')
@ -299,7 +310,7 @@ class ogLiveWorker(ServerWorker):
cmd['der'] = ''
else: ## el comando tuvo algún error
cmd['res'] = 2
cmd['der'] = self.tbErroresScripts[herror] ## XXX
cmd['der'] = self.tbErroresScripts[herror]
return cmd
@ -413,7 +424,7 @@ class ogLiveWorker(ServerWorker):
import queue
self.q = queue.Queue() ## a single queue works for us because we never have more than one long_running_job at the same time
self.thread_list[job_id] = {
'thread': ThreadWithResult (target=f, args=args), ## tengo que pasar self.q aqui dentro de args?
'thread': ThreadWithResult (target=f, args=(self.q,) + args),
'starttime': time.time(),
'child_pid': None,
'running': True,
@ -426,27 +437,17 @@ class ogLiveWorker(ServerWorker):
## - aqui en _long_running_job meto una cola en self.q
## - (self.q fue inicializado a None al instanciar el objeto, para evitar error "objeto no tiene 'q'")
## - en el thread_list también tengo un child_pid para almacenar el pid de los procesos hijos
## - en interfaceAdm() al hacer subprocess.Popen(), recojo el pid y lo escribo en la queue
## - al crear el ThreadWithResult le paso la cola, y luego en run() la recojo y la meto en el self.q del thread
## - en interfaceAdmin() al hacer subprocess.Popen(), recojo el pid y lo escribo en la queue
## - en mon() recojo pids de la queue y los meto en thread_list 'child_pid'
## - algunas funciones llaman a interfaceAdm más de una vez, y escriben más de un pid en la cola, y en mon() voy recogiendo y actualizando
## - por ejemplo EjecutarScript llama a interfaceAdm() y luego llama a LeeConfiguracion() el cual llama a interfaceAdm() otra vez
## - algunas funciones llaman a interfaceAdmin más de una vez, y escriben más de un pid en la cola, y en mon() voy recogiendo y actualizando
## - por ejemplo EjecutarScript llama a interfaceAdmin() y luego llama a LeeConfiguracion() el cual llama a interfaceAdmin() otra vez
## - y cuando nos llamen a KillJob, terminamos en killer() el cual coge el 'child_pid' y zas
## - pero a lo mejor el child ya terminó
## - o a lo mejor el KillJob nos llegó demasiado pronto y todavía no hubo ningún child
##
## está sin probar. Simplemente probé que el agente arranca (o sea, que no lo rompí con estos cambios)
## versión 1.4.5-pre3, desplegada en entornos de desarrollo y funciona bien
## (aunque sale el mensaje de "oops, queremos escribir el PID del hijo a la cola pero no hay cola", no sé por qué. Lo comento para que no despiste a la gente)
## la idea sería mandarle un EjecutarScript 'sleep 30' y luego un KillJob
##
## $ curl --insecure -X POST --data '{"nfn":"EjecutarScript","scp":"cd /usr; sleep 30; pwd; ls","ids":"0"}' https://192.168.2.199:8000/ogAdmClient/EjecutarScript
## {"job_id": "EjecutarScript-333feb3f"}
## $ curl --insecure -X POST --data '{"job_id":"EjecutarScript-333feb3f"}' https://192.168.2.199:8000/CloningEngine/KillJob
##
## el KillJob de primeras no va a hacer nada (la llamada a os.kill() está comentada)
## entonces sería probar primero que el flujo va como espero:
## - que primero salga "would kill pid" en el log
## - y pasados unos segundos, al llamar a KillJob otra vez, salga "pid is gone"
## o en otra prueba, metiéndole un time.sleep() a piñón en interfaceAdm() antes de lanzar un hijo
## - que salga "oops no tenemos a quien matar" en el log
## y ya con esto comprobado, descomentar el os.kill() y hacer pruebas reales
## funciona bien, excepto que el PID no muere xD, ni siquiera haciendo subprocess.run('kill')