# -*- coding: utf-8 -*- # # Copyright (c) 2024 Qindel Formación y Servicios S.L. # All rights reserved. # # Redistribution and use in source and binary forms, with or without modification, # are permitted provided that the following conditions are met: # # * Redistributions of source code must retain the above copyright notice, # this list of conditions and the following disclaimer. # * Redistributions in binary form must reproduce the above copyright notice, # this list of conditions and the following disclaimer in the documentation # and/or other materials provided with the distribution. # * Neither the name of Virtual Cable S.L. nor the names of its contributors # may be used to endorse or promote products derived from this software # without specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE # DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR # SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. """ @author: Natalia Serrano, nserrano at qindel dot com """ # pylint: disable=unused-wildcard-import,wildcard-import import os import re import time import random import subprocess import threading import signal from configparser import NoOptionError from opengnsys import REST from opengnsys.log import logger from .server_worker import ServerWorker ## https://stackoverflow.com/questions/6893968/how-to-get-the-return-value-from-a-thread class ThreadWithResult (threading.Thread): def run (self): try: self.result = None if self._target is not None: ## the first arg in self._args is the queue self.pid_q = self._args[0] self.stdout_q = self._args[1] self._args = self._args[2:] try: self.result = self._target (*self._args, **self._kwargs) except Exception as e: self.result = { 'res': 2, 'der': f'got exception: ({e})' } ## res=2 as defined in ogAdmClient.c:2048 finally: # Avoid a refcycle if the thread is running a function with an argument that has a member that points to the thread. del self._target, self._args, self._kwargs, self.pid_q, self.stdout_q class ogLiveWorker(ServerWorker): thread_list = {} thread_lock = threading.Lock() tbErroresScripts = [ "Se han generado errores desconocidos. No se puede continuar la ejecución de este módulo", ## 0 "001-Formato de ejecución incorrecto.", "002-Fichero o dispositivo no encontrado", "003-Error en partición de disco", "004-Partición o fichero bloqueado", "005-Error al crear o restaurar una imagen", "006-Sin sistema operativo", "007-Programa o función BOOLEAN no ejecutable", "008-Error en la creación del archivo de eco para consola remota", "009-Error en la lectura del archivo temporal de intercambio", "010-Error al ejecutar la llamada a la interface de administración", "011-La información retornada por la interface de administración excede de la longitud permitida", "012-Error en el envío de fichero por la red", "013-Error en la creación del proceso hijo", "014-Error de escritura en destino", "015-Sin Cache en el Cliente", "016-No hay espacio en la cache para almacenar fichero-imagen", "017-Error al Reducir el Sistema Archivos", "018-Error al Expandir el Sistema Archivos", "019-Valor fuera de rango o no válido.", "020-Sistema de archivos desconocido o no se puede montar", "021-Error en partición de caché local", "022-El disco indicado no contiene una particion GPT", "023-Error no definido", "024-Error no definido", "025-Error no definido", "026-Error no definido", "027-Error no definido", "028-Error no definido", "029-Error no definido", "030-Error al restaurar imagen - Imagen mas grande que particion", "031-Error al realizar el comando updateCache", "032-Error al formatear", "033-Archivo de imagen corrupto o de otra versión de partclone", "034-Error no definido", "035-Error no definido", "036-Error no definido", "037-Error no definido", "038-Error no definido", "039-Error no definido", "040-Error imprevisto no definido", "041-Error no definido", "042-Error no definido", "043-Error no definido", "044-Error no definido", "045-Error no definido", "046-Error no definido", "047-Error no definido", "048-Error no definido", "049-Error no definido", "050-Error en la generación de sintaxis de transferenica unicast", "051-Error en envio UNICAST de una particion", "052-Error en envio UNICAST de un fichero", "053-Error en la recepcion UNICAST de una particion", "054-Error en la recepcion UNICAST de un fichero", "055-Error en la generacion de sintaxis de transferenica Multicast", "056-Error en envio MULTICAST de un fichero", "057-Error en la recepcion MULTICAST de un fichero", "058-Error en envio MULTICAST de una particion", "059-Error en la recepcion MULTICAST de una particion", "060-Error en la conexion de una sesion UNICAST|MULTICAST con el MASTER", "061-Error no definido", "062-Error no definido", "063-Error no definido", "064-Error no definido", "065-Error no definido", "066-Error no definido", "067-Error no definido", "068-Error no definido", "069-Error no definido", "070-Error al montar una imagen sincronizada.", "071-Imagen no sincronizable (es monolitica).", "072-Error al desmontar la imagen.", "073-No se detectan diferencias entre la imagen basica y la particion.", "074-Error al sincronizar, puede afectar la creacion/restauracion de la imagen.", "Error desconocido", ] def notifier (self, job_id, result): result['job_id'] = job_id self.REST.sendMessage ('clients/status/webhook', result) def killer (self, job_id): logger.debug (f'killer() called, job_id ({job_id})') if job_id not in self.thread_list: return { 'res': 2, 'der': 'Unknown job' } with self.thread_lock: if 'thread' not in self.thread_list[job_id]: return { 'res': 2, 'der': 'Job is not running' } t = self.thread_list[job_id]['thread'] pid = self.thread_list[job_id]['child_pid'] logger.debug (f'pid ({pid})') try_times = 8 sig = signal.SIGTERM msg = f'could not kill pid ({pid}) after ({try_times}) tries' success = 2 ## mimic cmd['res'] in respuestaEjecucionComando(): "1" means success, "2" means failed while True: t.join (0.05) if not t.is_alive(): msg = 'job terminated' success = 1 logger.debug (msg) self.thread_list[job_id]['child_pid'] = None break ## race condition: if the subprocess finishes just here, then we already checked that t.is_alive() is true, but os.path.exists(/proc/pid) will be false below. msg will be 'nothing to kill'. ## this is fine in the first iteration of the loop, before we send any signals. In the rest of iterations, after some signals were sent, msg should be 'job terminated' instead. if pid: if os.path.exists (f'/proc/{pid}'): logger.debug (f'sending signal ({sig}) to pid ({pid})') ## if the process finishes just here, nothing happens: the signal is sent to the void os.kill (pid, sig) #subprocess.run (['kill', '--signal', str(sig), str(pid)]) else: msg = f'pid ({pid}) is gone, nothing to kill' success = 1 logger.debug (msg) self.thread_list[job_id]['child_pid'] = None break else: msg = 'no PID to kill' logger.debug (msg) if not try_times: break if 4 == try_times: sig = signal.SIGKILL ## change signal after a few tries try_times -= 1 time.sleep (0.4) return { 'res':success, 'der':msg } def _extract_progress (self, job_id, ary=[]): progress = None for i in ary: if m := re.search (r'^\[([0-9]+)\]', i): ## look for strings like '[10]', '[60]' #logger.debug (f"matched regex, m.groups ({m.groups()})") progress = float (m.groups()[0]) / 100 return progress ## monitors child threads, waits for them to finish ## pings ogcore def mon (self): n = 0 while True: with self.thread_lock: for k in self.thread_list: elem = self.thread_list[k] if 'thread' not in elem: continue #logger.debug (f'considering thread ({k})') if self.pid_q: if not self.pid_q.empty(): elem['child_pid'] = self.pid_q.get() logger.debug (f'queue not empty, got pid ({elem["child_pid"]})') if self.stdout_q: partial = '' while not self.stdout_q.empty(): partial += self.stdout_q.get() lines = partial.splitlines() if len (lines): p = self._extract_progress (k, lines) if p: m = { "job_id": k, "progress": p } self.REST.sendMessage ('clients/status/webhook', { "job_id": k, "progress": p }) elem['thread'].join (0.05) if not elem['thread'].is_alive(): logger.debug (f'is no longer alive, k ({k}) thread ({elem["thread"]})') elem['running'] = False elem['result'] = elem['thread'].result del elem['thread'] self.notifier (k, elem['result']) time.sleep (1) n += 1 if not n % 10: alive_threads = [] for k in self.thread_list: elem = self.thread_list[k] if 'thread' not in elem: continue alive_threads.append (k) if alive_threads: s = ','.join (alive_threads) logger.debug (f'alive threads: {s}') body = { 'iph': self.IPlocal, 'timestamp': int (time.time()), } #logger.debug (f'about to send ping ({body})') self.REST.sendMessage ('clients/status/webhook', body) def interfaceAdmin (self, method, parametros=[]): if method in ['Apagar', 'CambiarAcceso', 'Configurar', 'CrearImagen', 'EjecutarScript', 'getConfiguration', 'getIpAddress', 'IniciarSesion', 'InventarioHardware', 'InventarioSoftware', 'Reiniciar', 'RestaurarImagen']: ## python logger.debug (f'({method}) is a python method') exe = '{}/{}.py'.format (self.pathinterface, method) proc = [exe]+parametros else: ## ConsolaRemota procesaCache ## bash logger.debug (f'({method}) is a bash method') exe = '{}/{}'.format (self.pathinterface, method) LANG = os.environ.get ('LANG', 'en_GB.UTF-8').replace ('UTF_8', 'UTF-8') devel_bash_prefix = f''' PATH=/opt/opengnsys/scripts/:$PATH; source /opt/opengnsys/etc/lang.{LANG}.conf; for I in /opt/opengnsys/lib/engine/bin/*.lib; do source $I; done; for i in $(declare -F |cut -f3 -d" "); do export -f $i; done; ''' if parametros: proc = ['bash', '-c', '{} {} {}'.format (devel_bash_prefix, exe, ' '.join (parametros))] else: proc = ['bash', '-c', '{} {}'.format (devel_bash_prefix, exe)] logger.debug ('subprocess.run ("{}")'.format (' '.join (proc))) p = subprocess.Popen (proc, stdout=subprocess.PIPE, stderr=subprocess.PIPE) if self.pid_q: self.pid_q.put (p.pid) else: ## esto sucede por ejemplo cuando arranca el agente, que estamos en interfaceAdmin() en el mismo hilo, sin _long_running_job ni hilo separado #logger.debug ('no queue--not writing any PID to it') pass sout = serr = '' while p.poll() is None: for l in iter (p.stdout.readline, b''): partial = l.decode ('utf-8', 'ignore') if self.stdout_q: self.stdout_q.put (partial) sout += partial for l in iter (p.stderr.readline, b''): partial = l.decode ('utf-8', 'ignore') serr += partial time.sleep (1) sout = sout.strip() serr = serr.strip() ## DEBUG logger.debug (f'stdout follows:') for l in sout.splitlines(): logger.debug (f' {l}') #logger.debug (f'stderr follows:') #for l in serr.splitlines(): # logger.debug (f' {l}') ## /DEBUG if 0 != p.returncode: cmd_txt = ' '.join (proc) logger.error (f'command ({cmd_txt}) failed, stderr follows:') for l in serr.splitlines(): logger.error (f' {l}') raise Exception (f'command ({cmd_txt}) failed, see log for details') return sout def tomaIPlocal (self): try: self.IPlocal = self.interfaceAdmin ('getIpAddress') except Exception as e: logger.error (e) logger.error ('No se ha podido recuperar la dirección IP del cliente') return False logger.info ('local IP is "{}"'.format (self.IPlocal)) return True def tomaMAClocal (self): ## tomaIPlocal() calls interfaceAdm('getIpAddress') ## getIpAddress runs 'ip addr show' and returns the IP address of every network interface except "lo" ## (ie. breaks badly if there's more than one network interface) ## let's make the same assumptions here mac = subprocess.run (["ip -json link show |jq -r '.[] |select (.ifname != \"lo\") |.address'"], shell=True, capture_output=True, text=True) self.mac = mac.stdout.strip() return True def enviaMensajeServidor (self, path, obj={}): obj['iph'] = self.IPlocal ## Ip del ordenador obj['ido'] = self.idordenador ## Identificador del ordenador obj['npc'] = self.nombreordenador ## Nombre del ordenador obj['idc'] = self.idcentro ## Identificador del centro obj['ida'] = self.idaula ## Identificador del aula res = self.REST.sendMessage ('/'.join ([self.name, path]), obj) if (type (res) is not dict): logger.error (f'response is not a dict ({res})') return False return res ## en C, esto envia una trama de respuesta al servidor. Devuelve un boolean ## en python, simplemente termina de construir la respuesta y la devuelve; no envía nada por la red. El caller la usa en return() para enviar implícitamente la respuesta def respuestaEjecucionComando (self, cmd, herror, ids=None): if ids: ## Existe seguimiento cmd['ids'] = ids ## Añade identificador de la sesión if 0 == herror: ## el comando terminó con resultado satisfactorio cmd['res'] = 1 cmd['der'] = '' else: ## el comando tuvo algún error cmd['res'] = 2 cmd['der'] = self.tbErroresScripts[herror] return cmd def cargaPaginaWeb (self, url=None): if (not url): url = self.urlMenu os.system ('pkill -9 browser') p = subprocess.Popen (['/usr/bin/browser', '-qws', url]) try: p.wait (2) ## if the process dies before 2 seconds... logger.error ('Error al ejecutar browser, return code "{}"'.format (p.returncode)) return False except subprocess.TimeoutExpired: pass return True def muestraMenu (self): self.cargaPaginaWeb() def muestraMensaje (self, idx): self.cargaPaginaWeb (f'{self.urlMsg}?idx={idx}') def LeeConfiguracion (self): try: parametroscfg = self.interfaceAdmin ('getConfiguration') ## Configuración de los Sistemas Operativos del cliente except Exception as e: logger.error (e) logger.error ('No se ha podido recuperar la dirección IP del cliente') return None #logger.debug ('parametroscfg ({})'.format (parametroscfg)) return parametroscfg def cfg2obj (self, cfg): obj = [] ptrPar = cfg.split ('\n') for line in ptrPar: elem = {} ptrCfg = line.split ('\t') for item in ptrCfg: if '=' not in item: logger.warning (f'invalid item ({item})') continue k, v = item.split ('=', maxsplit=1) elem[k] = v obj.append (elem) return obj def onActivation (self): if not os.path.exists ('/scripts/oginit'): ## no estamos en oglive, este modulo no debe cargarse ## esta lógica la saco de src/opengnsys/linux/operations.py, donde hay un if similar raise Exception ('Refusing to load within an operating system') self.pathinterface = None self.IPlocal = None ## Ip del ordenador self.mac = None ## MAC del ordenador self.idordenador = None ## Identificador del ordenador self.nombreordenador = None ## Nombre del ordenador self.cache = None self.idproautoexec = None self.idcentro = None ## Identificador del centro self.idaula = None ## Identificador del aula self.pid_q = None ## for passing PIDs around self.stdout_q = None ## for passing stdout self.progress_jobs = {} ogcore_scheme = os.environ.get ('OGAGENTCFG_OGCORE_SCHEME', 'https') ogcore_ip = os.environ.get ('OGAGENTCFG_OGCORE_IP', '192.168.2.1') ogcore_port = os.environ.get ('OGAGENTCFG_OGCORE_PORT', '8443') urlmenu_scheme = os.environ.get ('OGAGENTCFG_URLMENU_SCHEME', 'https') urlmenu_ip = os.environ.get ('OGAGENTCFG_URLMENU_IP', '192.168.2.1') urlmenu_port = os.environ.get ('OGAGENTCFG_URLMENU_PORT', '8443') ogcore_ip_port = ':'.join (map (str, filter (None, [ogcore_ip, ogcore_port ]))) urlmenu_ip_port = ':'.join (map (str, filter (None, [urlmenu_ip, urlmenu_port]))) try: url = self.service.config.get (self.name, 'remote') loglevel = self.service.config.get (self.name, 'log') self.pathinterface = self.service.config.get (self.name, 'pathinterface') self.urlMenu = self.service.config.get (self.name, 'urlMenu') self.urlMsg = self.service.config.get (self.name, 'urlMsg') url = url.format (ogcore_scheme, ogcore_ip_port) self.urlMenu = self.urlMenu.format (urlmenu_scheme, urlmenu_ip_port) except NoOptionError as e: logger.error ("Configuration error: {}".format (e)) raise e logger.setLevel (loglevel) self.REST = REST (url) if not self.tomaIPlocal(): raise Exception ('Se han generado errores. No se puede continuar la ejecución de este módulo') if not self.tomaMAClocal(): raise Exception ('Se han generado errores. No se puede continuar la ejecución de este módulo') threading.Thread (name='monitoring_thread', target=self.mon, daemon=True).start() def _long_running_job (self, name, f, args): any_job_running = False for k in self.thread_list: if self.thread_list[k]['running']: any_job_running = True break if any_job_running: logger.info ('some job is already running, refusing to launch another one') return { 'job_id': None, 'message': 'some job is already running, refusing to launch another one' } job_id = '{}-{}'.format (name, ''.join (random.choice ('0123456789abcdef') for _ in range (8))) import queue self.pid_q = queue.Queue() ## a single queue works for us because we never have more than one long_running_job at the same time self.stdout_q = queue.Queue() self.thread_list[job_id] = { 'thread': ThreadWithResult (target=f, args=(self.pid_q, self.stdout_q) + args), 'starttime': time.time(), 'child_pid': None, 'running': True, 'result': None } self.thread_list[job_id]['thread'].start() return { 'job_id': job_id } ## para matar threads tengo lo siguiente: ## - aqui en _long_running_job meto una cola en self.pid_q ## - (self.pid_q fue inicializado a None al instanciar el objeto, para evitar error "objeto no tiene 'pid_q'") ## - en el thread_list también tengo un child_pid para almacenar el pid de los procesos hijos ## - al crear el ThreadWithResult le paso la cola, y luego en run() la recojo y la meto en el self.pid_q del thread ## - en interfaceAdmin() al hacer subprocess.Popen(), recojo el pid y lo escribo en la queue ## - en mon() recojo pids de la queue y los meto en thread_list 'child_pid' ## - algunas funciones llaman a interfaceAdmin más de una vez, y escriben más de un pid en la cola, y en mon() voy recogiendo y actualizando ## - por ejemplo EjecutarScript llama a interfaceAdmin() y luego llama a LeeConfiguracion() el cual llama a interfaceAdmin() otra vez ## - y cuando nos llamen a KillJob, terminamos en killer() el cual coge el 'child_pid' y zas ## - pero a lo mejor el child ya terminó ## - o a lo mejor el KillJob nos llegó demasiado pronto y todavía no hubo ningún child ## ## $ curl --insecure -X POST --data '{"nfn":"EjecutarScript","scp":"cd /usr; sleep 30; pwd; ls","ids":"0"}' https://192.168.2.199:8000/ogAdmClient/EjecutarScript ## {"job_id": "EjecutarScript-333feb3f"} ## $ curl --insecure -X POST --data '{"job_id":"EjecutarScript-333feb3f"}' https://192.168.2.199:8000/ogAdmClient/KillJob ## ## funciona bien, excepto que el PID no muere xD, ni siquiera haciendo subprocess.run('kill') ## para mostrar el progreso de los jobs reutilizo la misma infra ## una cola self.stdout_q ## en interfaceAdmin escribo la stdout parcial que ya venia recogiendo ## mon() lo recoge y le hace un POST a ogcore