diff --git a/linux/debian/changelog b/linux/debian/changelog index 6d9dfe2..09edfe7 100644 --- a/linux/debian/changelog +++ b/linux/debian/changelog @@ -1,3 +1,9 @@ +ogagent (1.4.5~pre3-1) stable; urgency=medium + + * Kill long running jobs in oglive + + -- OpenGnsys developers Wed, 06 Nov 2024 14:11:32 +0100 + ogagent (1.4.5~pre2-1) stable; urgency=medium * Remove race condition due to several monitoring threads diff --git a/src/VERSION b/src/VERSION index e167f12..bf08677 100644 --- a/src/VERSION +++ b/src/VERSION @@ -1 +1 @@ -1.4.5-pre1 +1.4.5-pre3 diff --git a/src/opengnsys/modules/server/CloningEngine/__init__.py b/src/opengnsys/modules/server/CloningEngine/__init__.py index 06b704f..8eed397 100644 --- a/src/opengnsys/modules/server/CloningEngine/__init__.py +++ b/src/opengnsys/modules/server/CloningEngine/__init__.py @@ -322,3 +322,10 @@ class CloningEngineWorker (ogLiveWorker): def process_InventarioSoftware (self, path, get_params, post_params, server): logger.debug ('in process_InventarioSoftware, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server)) return self._long_running_job ('InventarioSoftware', self.do_InventarioSoftware, args=(post_params,)) + + ## curl --insecure -X POST --data '{"job_id":"foo"}' https://192.168.2.199:8000/CloningEngine/KillJob + def process_KillJob (self, path, get_params, post_params, server): + logger.debug ('in process_KillJob, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server)) + jid = post_params['job_id'] + r = self.killer (jid) + return r diff --git a/src/opengnsys/modules/server/ogAdmClient/__init__.py b/src/opengnsys/modules/server/ogAdmClient/__init__.py index 286c9e5..f08fc16 100644 --- a/src/opengnsys/modules/server/ogAdmClient/__init__.py +++ b/src/opengnsys/modules/server/ogAdmClient/__init__.py @@ -561,3 +561,10 @@ class ogAdmClientWorker (ogLiveWorker): def process_EjecutaComandosPendientes (self, path, get_params, post_params, server): logger.debug ('in process_EjecutaComandosPendientes, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server)) return {'true':'true'} ## ogAdmClient.c:2138 + + ## curl --insecure -X POST --data '{"job_id":"foo"}' https://192.168.2.199:8000/ogAdmClient/KillJob + def process_KillJob (self, path, get_params, post_params, server): + logger.debug ('in process_KillJob, path "{}" get_params "{}" post_params "{}" server "{}"'.format (path, get_params, post_params, server)) + jid = post_params['job_id'] + r = self.killer (jid) + return r diff --git a/src/opengnsys/workers/oglive_worker.py b/src/opengnsys/workers/oglive_worker.py index ada7bc2..0634a22 100644 --- a/src/opengnsys/workers/oglive_worker.py +++ b/src/opengnsys/workers/oglive_worker.py @@ -35,6 +35,7 @@ import time import random import subprocess import threading +import signal from configparser import NoOptionError from opengnsys import REST @@ -57,6 +58,7 @@ class ThreadWithResult (threading.Thread): class ogLiveWorker(ServerWorker): thread_list = {} + thread_lock = threading.Lock() tbErroresScripts = [ "Se han generado errores desconocidos. No se puede continuar la ejecución de este módulo", ## 0 @@ -142,20 +144,65 @@ class ogLiveWorker(ServerWorker): result['job_id'] = job_id res = self.REST.sendMessage ('clients/status/webhook', result) + def killer (self, job_id): + logger.debug (f'killer() called, job_id ({job_id})') + if job_id not in self.thread_list: return { 'res': 2, 'der': 'Unknown job' } + + with self.thread_lock: + if 'thread' not in self.thread_list[job_id]: return { 'res': 2, 'der': 'Job is not running' } + t = self.thread_list[job_id]['thread'] + pid = self.thread_list[job_id]['child_pid'] + logger.debug (f'pid ({pid})') + try_times = 8 + sig = signal.SIGTERM + while True: + t.join (0.05) + if not t.is_alive(): + logger.debug (f'thread exited, yay!') + ## limpieza + self.q = None + self.thread_list[job_id]['child_pid'] = None + break + if pid: + if os.path.exists (f'/proc/{pid}'): + logger.debug (f'would os.kill pid ({pid})') + ## si el proceso se muere justo aquí, no pasa nada: la señal va al vacío + #os.kill (pid, sig) + else: + logger.debug (f'pid ({pid}) is gone, nothing to kill...') + self.thread_list[job_id]['child_pid'] = None + else: + logger.debug (f'oops no tenemos a quien matar') + if not try_times: break + if 4 == try_times: sig = signal.SIGKILL ## change signal after a few tries + try_times -= 1 + time.sleep (0.4) + + ## y si no lo hemos conseguido, qué?? + return {'job_id':job_id} + def mon (self): while True: - #print ('mon(): iterating') - for k in self.thread_list: - elem = self.thread_list[k] - if 'thread' not in elem: continue - logger.debug (f'considering thread ({k})') - elem['thread'].join (0.05) - if not elem['thread'].is_alive(): - logger.debug (f'is no longer alive, k ({k}) thread ({elem["thread"]})') - elem['running'] = False - elem['result'] = elem['thread'].result - del elem['thread'] - self.notifier (k, elem['result']) + with self.thread_lock: + for k in self.thread_list: + elem = self.thread_list[k] + if 'thread' not in elem: continue + logger.debug (f'considering thread ({k})') + + if self.q: + if not q.empty(): + elem['child_pid'] = q.get() + logger.debug (f'queue not empty, got pid ({elem["child_pid"]})') + else: + logger.debug (f'queue empty') + + elem['thread'].join (0.05) + if not elem['thread'].is_alive(): + logger.debug (f'is no longer alive, k ({k}) thread ({elem["thread"]})') + elem['running'] = False + elem['result'] = elem['thread'].result + del elem['thread'] + self.notifier (k, elem['result']) time.sleep (1) @@ -173,22 +220,36 @@ class ogLiveWorker(ServerWorker): else: proc = ['bash', '-c', '{} {}'.format (devel_bash_prefix, exe)] logger.debug ('subprocess.run ("{}", capture_output=True)'.format (proc)) - p = subprocess.run (proc, capture_output=True) + + #p = subprocess.run (proc, capture_output=True) + p = subprocess.Popen (proc, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + if self.q: + self.q.put (p.pid) + else: + logger.debug ('oops, queremos escribir el PID del hijo a la cola pero no hay cola') + sout = serr = '' + while p.poll() is None: + for l in iter (p.stdout.readline, b''): sout += l.decode ('utf-8', 'ignore') + for l in iter (p.stderr.readline, b''): serr += l.decode ('utf-8', 'ignore') + time.sleep (1) + sout = sout.strip() + serr = serr.strip() + ## DEBUG logger.info (f'stdout follows:') - for l in p.stdout.strip().decode ('utf-8').splitlines(): + for l in sout.splitlines(): logger.info (f' {l}') logger.info (f'stderr follows:') - for l in p.stderr.strip().decode ('utf-8').splitlines(): + for l in serr.splitlines(): logger.info (f' {l}') ## /DEBUG if 0 != p.returncode: cmd_txt = ' '.join (proc) logger.error (f'command ({cmd_txt}) failed, stderr follows:') - for l in p.stderr.strip().decode ('utf-8').splitlines(): + for l in serr.splitlines(): logger.error (f' {l}') raise Exception (f'command ({cmd_txt}) failed, see log for details') - return p.stdout.strip().decode ('utf-8') + return sout def tomaIPlocal (self): try: @@ -303,6 +364,7 @@ class ogLiveWorker(ServerWorker): self.idproautoexec = None self.idcentro = None ## Identificador del centro self.idaula = None ## Identificador del aula + self.q = None ## for passing PIDs around try: url = self.service.config.get (self.name, 'remote') @@ -336,11 +398,42 @@ class ogLiveWorker(ServerWorker): return { 'job_id': None, 'message': 'some job is already running, refusing to launch another one' } job_id = '{}-{}'.format (name, ''.join (random.choice ('0123456789abcdef') for _ in range (8))) + import queue + self.q = queue.Queue() ## a single queue works for us because we never have more than one long_running_job at the same time self.thread_list[job_id] = { 'thread': ThreadWithResult (target=f, args=args), 'starttime': time.time(), + 'child_pid': None, 'running': True, 'result': None } self.thread_list[job_id]['thread'].start() return { 'job_id': job_id } + +## para matar threads tengo lo siguiente: +## - aqui en _long_running_job meto una cola en self.q +## - (self.q fue inicializado a None al instanciar el objeto, para evitar error "objeto no tiene 'q'") +## - en el thread_list también tengo un child_pid para almacenar el pid de los procesos hijos +## - en interfaceAdm() al hacer subprocess.Popen(), recojo el pid y lo escribo en la queue +## - en mon() recojo pids de la queue y los meto en thread_list 'child_pid' +## - algunas funciones llaman a interfaceAdm más de una vez, y escriben más de un pid en la cola, y en mon() voy recogiendo y actualizando +## - por ejemplo EjecutarScript llama a interfaceAdm() y luego llama a LeeConfiguracion() el cual llama a interfaceAdm() otra vez +## - y cuando nos llamen a KillJob, terminamos en killer() el cual coge el 'child_pid' y zas +## - pero a lo mejor el child ya terminó +## - o a lo mejor el KillJob nos llegó demasiado pronto y todavía no hubo ningún child +## +## está sin probar. Simplemente probé que el agente arranca (o sea, que no lo rompí con estos cambios) +## versión 1.4.5-pre3, desplegada en entornos de desarrollo y funciona bien +## la idea sería mandarle un EjecutarScript 'sleep 30' y luego un KillJob +## +## $ curl --insecure -X POST --data '{"nfn":"EjecutarScript","scp":"cd /usr; sleep 30; pwd; ls","ids":"0"}' https://192.168.2.199:8000/ogAdmClient/EjecutarScript +## {"job_id": "EjecutarScript-333feb3f"} +## $ curl --insecure -X POST --data '{"job_id":"EjecutarScript-333feb3f"}' https://192.168.2.199:8000/CloningEngine/KillJob +## +## el KillJob de primeras no va a hacer nada (la llamada a os.kill() está comentada) +## entonces sería probar primero que el flujo va como espero: +## - que primero salga "would kill pid" en el log +## - y pasados unos segundos, al llamar a KillJob otra vez, salga "pid is gone" +## o en otra prueba, metiéndole un time.sleep() a piñón en interfaceAdm() antes de lanzar un hijo +## - que salga "oops no tenemos a quien matar" en el log +## y ya con esto comprobado, descomentar el os.kill() y hacer pruebas reales