From 69be238f9fd45e9d0a8305ea0241d08ac2e28cfd Mon Sep 17 00:00:00 2001 From: Natalia Serrano Date: Fri, 29 Nov 2024 10:24:15 +0100 Subject: [PATCH] refs #1108 kill subprocesses in oglive --- linux/debian/changelog | 8 ++- src/VERSION | 2 +- .../modules/server/CloningEngine/__init__.py | 3 + .../modules/server/ogAdmClient/__init__.py | 3 + src/opengnsys/workers/oglive_worker.py | 67 ++++++++++--------- 5 files changed, 48 insertions(+), 35 deletions(-) diff --git a/linux/debian/changelog b/linux/debian/changelog index 1030976..b84d33d 100644 --- a/linux/debian/changelog +++ b/linux/debian/changelog @@ -1,3 +1,9 @@ +ogagent (1.4.5-1) stable; urgency=medium + + * Kill long running jobs in oglive + + -- OpenGnsys developers Fri, 29 Nov 2024 10:22:36 +0100 + ogagent (1.4.5~pre8-1) stable; urgency=medium * Add Configurar() to the CloningEngine module @@ -31,7 +37,7 @@ ogagent (1.4.5~pre4-1) stable; urgency=medium ogagent (1.4.5~pre3-1) stable; urgency=medium - * Kill long running jobs in oglive + * Kill long running jobs in oglive (not-yet-working draft) -- OpenGnsys developers Wed, 06 Nov 2024 14:11:32 +0100 diff --git a/src/VERSION b/src/VERSION index d051a1b..e516bb9 100644 --- a/src/VERSION +++ b/src/VERSION @@ -1 +1 @@ -1.4.5-pre8 +1.4.5 diff --git a/src/opengnsys/modules/server/CloningEngine/__init__.py b/src/opengnsys/modules/server/CloningEngine/__init__.py index 957598d..67ce077 100644 --- a/src/opengnsys/modules/server/CloningEngine/__init__.py +++ b/src/opengnsys/modules/server/CloningEngine/__init__.py @@ -349,4 +349,7 @@ class CloningEngineWorker (ogLiveWorker): logger.debug ('in process_KillJob, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server)) jid = post_params['job_id'] r = self.killer (jid) + logger.debug (f'r bef ({r})') + r.update ({ 'nfn':'RESPUESTA_KillJob', 'job':jid }) + logger.debug (f'r aft ({r})') return r diff --git a/src/opengnsys/modules/server/ogAdmClient/__init__.py b/src/opengnsys/modules/server/ogAdmClient/__init__.py index f08fc16..1cf33a5 100644 --- a/src/opengnsys/modules/server/ogAdmClient/__init__.py +++ b/src/opengnsys/modules/server/ogAdmClient/__init__.py @@ -567,4 +567,7 @@ class ogAdmClientWorker (ogLiveWorker): logger.debug ('in process_KillJob, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server)) jid = post_params['job_id'] r = self.killer (jid) + logger.debug (f'r bef ({r})') + r.update ({ 'nfn':'RESPUESTA_KillJob', 'job':jid }) + logger.debug (f'r aft ({r})') return r diff --git a/src/opengnsys/workers/oglive_worker.py b/src/opengnsys/workers/oglive_worker.py index 5eaae01..faaeaf5 100644 --- a/src/opengnsys/workers/oglive_worker.py +++ b/src/opengnsys/workers/oglive_worker.py @@ -48,13 +48,16 @@ class ThreadWithResult (threading.Thread): try: self.result = None if self._target is not None: + ## the first arg in self._args is the queue + self.q = self._args[0] + self._args = self._args[1:] try: self.result = self._target (*self._args, **self._kwargs) except Exception as e: self.result = { 'res': 2, 'der': f'got exception: ({e})' } ## res=2 as defined in ogAdmClient.c:2048 finally: # Avoid a refcycle if the thread is running a function with an argument that has a member that points to the thread. - del self._target, self._args, self._kwargs + del self._target, self._args, self._kwargs, self.q class ogLiveWorker(ServerWorker): thread_list = {} @@ -150,36 +153,45 @@ class ogLiveWorker(ServerWorker): with self.thread_lock: if 'thread' not in self.thread_list[job_id]: return { 'res': 2, 'der': 'Job is not running' } - t = self.thread_list[job_id]['thread'] + t = self.thread_list[job_id]['thread'] pid = self.thread_list[job_id]['child_pid'] logger.debug (f'pid ({pid})') try_times = 8 sig = signal.SIGTERM + msg = f'could not kill pid ({pid}) after ({try_times}) tries' + success = 2 ## mimic cmd['res'] in respuestaEjecucionComando(): "1" means success, "2" means failed while True: t.join (0.05) if not t.is_alive(): - logger.debug (f'thread exited, yay!') - ## limpieza - self.q = None + msg = 'job terminated' + success = 1 + logger.debug (msg) self.thread_list[job_id]['child_pid'] = None break + ## race condition: if the subprocess finishes just here, then we already checked that t.is_alive() is true, but os.path.exists(/proc/pid) will be false below. msg will be 'nothing to kill'. + ## this is fine in the first iteration of the loop, before we send any signals. In the rest of iterations, after some signals were sent, msg should be 'job terminated' instead. if pid: if os.path.exists (f'/proc/{pid}'): - logger.debug (f'would os.kill pid ({pid})') - ## si el proceso se muere justo aquí, no pasa nada: la señal va al vacío - #os.kill (pid, sig) + logger.debug (f'sending signal ({sig}) to pid ({pid})') + ## if the process finishes just here, nothing happens: the signal is sent to the void + os.kill (pid, sig) + #subprocess.run (['kill', '--signal', str(sig), str(pid)]) else: - logger.debug (f'pid ({pid}) is gone, nothing to kill...') + msg = f'pid ({pid}) is gone, nothing to kill' + success = 1 + logger.debug (msg) self.thread_list[job_id]['child_pid'] = None + break else: - logger.debug (f'oops no tenemos a quien matar') + msg = 'no PID to kill' + logger.debug (msg) + if not try_times: break if 4 == try_times: sig = signal.SIGKILL ## change signal after a few tries try_times -= 1 time.sleep (0.4) - ## y si no lo hemos conseguido, qué?? - return {'job_id':job_id} + return { 'res':success, 'der':msg } def mon (self): while True: @@ -221,13 +233,12 @@ class ogLiveWorker(ServerWorker): proc = ['bash', '-c', '{} {}'.format (devel_bash_prefix, exe)] logger.debug ('subprocess.run ("{}", capture_output=True)'.format (proc)) - #p = subprocess.run (proc, capture_output=True) p = subprocess.Popen (proc, stdout=subprocess.PIPE, stderr=subprocess.PIPE) if self.q: self.q.put (p.pid) - #else: - # ## sale este mensaje en el log, y no se por que - # logger.debug ('oops, queremos escribir el PID del hijo a la cola pero no hay cola') + else: + ## esto sucede por ejemplo cuando arranca el agente, que estamos en interfaceAdmin() en el mismo hilo, sin _long_running_job ni hilo separado + logger.debug ('no queue--not writing any PID to it') sout = serr = '' while p.poll() is None: for l in iter (p.stdout.readline, b''): sout += l.decode ('utf-8', 'ignore') @@ -299,7 +310,7 @@ class ogLiveWorker(ServerWorker): cmd['der'] = '' else: ## el comando tuvo algún error cmd['res'] = 2 - cmd['der'] = self.tbErroresScripts[herror] ## XXX + cmd['der'] = self.tbErroresScripts[herror] return cmd @@ -413,7 +424,7 @@ class ogLiveWorker(ServerWorker): import queue self.q = queue.Queue() ## a single queue works for us because we never have more than one long_running_job at the same time self.thread_list[job_id] = { - 'thread': ThreadWithResult (target=f, args=args), ## tengo que pasar self.q aqui dentro de args? + 'thread': ThreadWithResult (target=f, args=(self.q,) + args), 'starttime': time.time(), 'child_pid': None, 'running': True, @@ -426,27 +437,17 @@ class ogLiveWorker(ServerWorker): ## - aqui en _long_running_job meto una cola en self.q ## - (self.q fue inicializado a None al instanciar el objeto, para evitar error "objeto no tiene 'q'") ## - en el thread_list también tengo un child_pid para almacenar el pid de los procesos hijos -## - en interfaceAdm() al hacer subprocess.Popen(), recojo el pid y lo escribo en la queue +## - al crear el ThreadWithResult le paso la cola, y luego en run() la recojo y la meto en el self.q del thread +## - en interfaceAdmin() al hacer subprocess.Popen(), recojo el pid y lo escribo en la queue ## - en mon() recojo pids de la queue y los meto en thread_list 'child_pid' -## - algunas funciones llaman a interfaceAdm más de una vez, y escriben más de un pid en la cola, y en mon() voy recogiendo y actualizando -## - por ejemplo EjecutarScript llama a interfaceAdm() y luego llama a LeeConfiguracion() el cual llama a interfaceAdm() otra vez +## - algunas funciones llaman a interfaceAdmin más de una vez, y escriben más de un pid en la cola, y en mon() voy recogiendo y actualizando +## - por ejemplo EjecutarScript llama a interfaceAdmin() y luego llama a LeeConfiguracion() el cual llama a interfaceAdmin() otra vez ## - y cuando nos llamen a KillJob, terminamos en killer() el cual coge el 'child_pid' y zas ## - pero a lo mejor el child ya terminó ## - o a lo mejor el KillJob nos llegó demasiado pronto y todavía no hubo ningún child ## -## está sin probar. Simplemente probé que el agente arranca (o sea, que no lo rompí con estos cambios) -## versión 1.4.5-pre3, desplegada en entornos de desarrollo y funciona bien -## (aunque sale el mensaje de "oops, queremos escribir el PID del hijo a la cola pero no hay cola", no sé por qué. Lo comento para que no despiste a la gente) -## la idea sería mandarle un EjecutarScript 'sleep 30' y luego un KillJob -## ## $ curl --insecure -X POST --data '{"nfn":"EjecutarScript","scp":"cd /usr; sleep 30; pwd; ls","ids":"0"}' https://192.168.2.199:8000/ogAdmClient/EjecutarScript ## {"job_id": "EjecutarScript-333feb3f"} ## $ curl --insecure -X POST --data '{"job_id":"EjecutarScript-333feb3f"}' https://192.168.2.199:8000/CloningEngine/KillJob ## -## el KillJob de primeras no va a hacer nada (la llamada a os.kill() está comentada) -## entonces sería probar primero que el flujo va como espero: -## - que primero salga "would kill pid" en el log -## - y pasados unos segundos, al llamar a KillJob otra vez, salga "pid is gone" -## o en otra prueba, metiéndole un time.sleep() a piñón en interfaceAdm() antes de lanzar un hijo -## - que salga "oops no tenemos a quien matar" en el log -## y ya con esto comprobado, descomentar el os.kill() y hacer pruebas reales +## funciona bien, excepto que el PID no muere xD, ni siquiera haciendo subprocess.run('kill')