source: ogAgent-Git/src/opengnsys/workers/oglive_worker.py @ c39b253

maintls 4.0.0
Last change on this file since c39b253 was 5c2fbc3, checked in by Natalia Serrano <natalia.serrano@…>, 2 weeks ago

refs #1886 make log a bit cleaner

  • Property mode set to 100644
File size: 23.2 KB
Line 
1# -*- coding: utf-8 -*-
2#
3# Copyright (c) 2024 Qindel Formación y Servicios S.L.
4# All rights reserved.
5#
6# Redistribution and use in source and binary forms, with or without modification,
7# are permitted provided that the following conditions are met:
8#
9#    * Redistributions of source code must retain the above copyright notice,
10#      this list of conditions and the following disclaimer.
11#    * Redistributions in binary form must reproduce the above copyright notice,
12#      this list of conditions and the following disclaimer in the documentation
13#      and/or other materials provided with the distribution.
14#    * Neither the name of Virtual Cable S.L. nor the names of its contributors
15#      may be used to endorse or promote products derived from this software
16#      without specific prior written permission.
17#
18# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
21# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
22# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
24# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
25# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
26# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
27# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28"""
29@author: Natalia Serrano, nserrano at qindel dot com
30"""
31# pylint: disable=unused-wildcard-import,wildcard-import
32
33import os
34import re
35import time
36import random
37import subprocess
38import threading
39import signal
40
41from configparser import NoOptionError
42from opengnsys import REST
43from opengnsys.log import logger
44from .server_worker import ServerWorker
45
46## https://stackoverflow.com/questions/6893968/how-to-get-the-return-value-from-a-thread
47class ThreadWithResult (threading.Thread):
48    def run (self):
49        try:
50            self.result = None
51            if self._target is not None:
52                ## the first arg in self._args is the queue
53                self.pid_q    = self._args[0]
54                self.stdout_q = self._args[1]
55                self._args    = self._args[2:]
56                try:
57                    self.result = self._target (*self._args, **self._kwargs)
58                except Exception as e:
59                    self.result = { 'res': 2, 'der': f'got exception: ({e})' }    ## res=2 as defined in ogAdmClient.c:2048
60        finally:
61            # Avoid a refcycle if the thread is running a function with an argument that has a member that points to the thread.
62            del self._target, self._args, self._kwargs, self.pid_q, self.stdout_q
63
64class ogLiveWorker(ServerWorker):
65    thread_list = {}
66    thread_lock = threading.Lock()
67
68    tbErroresScripts = [
69        "Se han generado errores desconocidos. No se puede continuar la ejecución de este módulo",        ## 0
70                "001-Formato de ejecución incorrecto.",
71                "002-Fichero o dispositivo no encontrado",
72                "003-Error en partición de disco",
73                "004-Partición o fichero bloqueado",
74                "005-Error al crear o restaurar una imagen",
75                "006-Sin sistema operativo",
76                "007-Programa o función BOOLEAN no ejecutable",
77                "008-Error en la creación del archivo de eco para consola remota",
78                "009-Error en la lectura del archivo temporal de intercambio",
79                "010-Error al ejecutar la llamada a la interface de administración",
80                "011-La información retornada por la interface de administración excede de la longitud permitida",
81                "012-Error en el envío de fichero por la red",
82                "013-Error en la creación del proceso hijo",
83                "014-Error de escritura en destino",
84                "015-Sin Cache en el Cliente",
85                "016-No hay espacio en la cache para almacenar fichero-imagen",
86                "017-Error al Reducir el Sistema Archivos",
87                "018-Error al Expandir el Sistema Archivos",
88                "019-Valor fuera de rango o no válido.",
89                "020-Sistema de archivos desconocido o no se puede montar",
90                "021-Error en partición de caché local",
91                "022-El disco indicado no contiene una particion GPT",
92                "023-Error no definido",
93                "024-Error no definido",
94                "025-Error no definido",
95                "026-Error no definido",
96                "027-Error no definido",
97                "028-Error no definido",
98                "029-Error no definido",
99                "030-Error al restaurar imagen - Imagen mas grande que particion",
100                "031-Error al realizar el comando updateCache",
101                "032-Error al formatear",
102                "033-Archivo de imagen corrupto o de otra versión de partclone",
103                "034-Error no definido",
104                "035-Error no definido",
105                "036-Error no definido",
106                "037-Error no definido",
107                "038-Error no definido",
108                "039-Error no definido",
109                "040-Error imprevisto no definido",
110                "041-Error no definido",
111                "042-Error no definido",
112                "043-Error no definido",
113                "044-Error no definido",
114                "045-Error no definido",
115                "046-Error no definido",
116                "047-Error no definido",
117                "048-Error no definido",
118                "049-Error no definido",
119                "050-Error en la generación de sintaxis de transferenica unicast",
120                "051-Error en envio UNICAST de una particion",
121                "052-Error en envio UNICAST de un fichero",
122                "053-Error en la recepcion UNICAST de una particion",
123                "054-Error en la recepcion UNICAST de un fichero",
124                "055-Error en la generacion de sintaxis de transferenica Multicast",
125                "056-Error en envio MULTICAST de un fichero",
126                "057-Error en la recepcion MULTICAST de un fichero",
127                "058-Error en envio MULTICAST de una particion",
128                "059-Error en la recepcion MULTICAST de una particion",
129                "060-Error en la conexion de una sesion UNICAST|MULTICAST con el MASTER",
130                "061-Error no definido",
131                "062-Error no definido",
132                "063-Error no definido",
133                "064-Error no definido",
134                "065-Error no definido",
135                "066-Error no definido",
136                "067-Error no definido",
137                "068-Error no definido",
138                "069-Error no definido",
139                "070-Error al montar una imagen sincronizada.",
140                "071-Imagen no sincronizable (es monolitica).",
141                "072-Error al desmontar la imagen.",
142                "073-No se detectan diferencias entre la imagen basica y la particion.",
143                "074-Error al sincronizar, puede afectar la creacion/restauracion de la imagen.",
144                "Error desconocido",
145        ]
146
147    def notifier (self, job_id, result):
148        result['job_id'] = job_id
149        self.REST.sendMessage ('clients/status/webhook', result)
150
151    def killer (self, job_id):
152        logger.debug (f'killer() called, job_id ({job_id})')
153        if job_id not in self.thread_list: return { 'res': 2, 'der': 'Unknown job' }
154
155        with self.thread_lock:
156            if 'thread' not in self.thread_list[job_id]: return { 'res': 2, 'der': 'Job is not running' }
157            t   = self.thread_list[job_id]['thread']
158            pid = self.thread_list[job_id]['child_pid']
159            logger.debug (f'pid ({pid})')
160            try_times = 8
161            sig = signal.SIGTERM
162            msg = f'could not kill pid ({pid}) after ({try_times}) tries'
163            success = 2    ## mimic cmd['res'] in respuestaEjecucionComando(): "1" means success, "2" means failed
164            while True:
165                t.join (0.05)
166                if not t.is_alive():
167                    msg = 'job terminated'
168                    success = 1
169                    logger.debug (msg)
170                    self.thread_list[job_id]['child_pid'] = None
171                    break
172                ## race condition: if the subprocess finishes just here, then we already checked that t.is_alive() is true, but os.path.exists(/proc/pid) will be false below. msg will be 'nothing to kill'.
173                ## this is fine in the first iteration of the loop, before we send any signals. In the rest of iterations, after some signals were sent, msg should be 'job terminated' instead.
174                if pid:
175                    if os.path.exists (f'/proc/{pid}'):
176                        logger.debug (f'sending signal ({sig}) to pid ({pid})')
177                        ## if the process finishes just here, nothing happens: the signal is sent to the void
178                        os.kill (pid, sig)
179                        #subprocess.run (['kill', '--signal', str(sig), str(pid)])
180                    else:
181                        msg = f'pid ({pid}) is gone, nothing to kill'
182                        success = 1
183                        logger.debug (msg)
184                        self.thread_list[job_id]['child_pid'] = None
185                        break
186                else:
187                    msg = 'no PID to kill'
188                    logger.debug (msg)
189
190                if not try_times: break
191                if 4 == try_times: sig = signal.SIGKILL   ## change signal after a few tries
192                try_times -= 1
193                time.sleep (0.4)
194
195        return { 'res':success, 'der':msg }
196
197    def _extract_progress (self, job_id, ary=[]):
198        progress = None
199        for i in ary:
200            if m := re.search (r'^\[([0-9]+)\]', i):     ## look for strings like '[10]', '[60]'
201                #logger.debug (f"matched regex, m.groups ({m.groups()})")
202                progress = float (m.groups()[0]) / 100
203        return progress
204
205    ## monitors child threads, waits for them to finish
206    ## pings ogcore
207    def mon (self):
208        n = 0
209        while True:
210            with self.thread_lock:
211                for k in self.thread_list:
212                    elem = self.thread_list[k]
213                    if 'thread' not in elem: continue
214                    #logger.debug (f'considering thread ({k})')
215
216                    if self.pid_q:
217                        if not self.pid_q.empty():
218                            elem['child_pid'] = self.pid_q.get()
219                            logger.debug (f'queue not empty, got pid ({elem["child_pid"]})')
220
221                    if self.stdout_q:
222                        partial = ''
223                        while not self.stdout_q.empty():
224                            partial += self.stdout_q.get()
225                        lines = partial.splitlines()
226                        if len (lines):
227                            p = self._extract_progress (k, lines)
228                            if p:
229                                m = { "job_id": k, "progress": p }
230                                self.REST.sendMessage ('clients/status/webhook', { "job_id": k, "progress": p })
231
232                    elem['thread'].join (0.05)
233                    if not elem['thread'].is_alive():
234                        logger.debug (f'is no longer alive, k ({k}) thread ({elem["thread"]})')
235                        elem['running'] = False
236                        elem['result'] = elem['thread'].result
237                        del elem['thread']
238                        self.notifier (k, elem['result'])
239
240            time.sleep (1)
241            n += 1
242            if not n % 10:
243                alive_threads = []
244                for k in self.thread_list:
245                    elem = self.thread_list[k]
246                    if 'thread' not in elem: continue
247                    alive_threads.append (k)
248                if alive_threads:
249                    s = ','.join (alive_threads)
250                    logger.debug (f'alive threads: {s}')
251
252                body = {
253                    'iph': self.IPlocal,
254                    'timestamp': int (time.time()),
255                }
256                #logger.debug (f'about to send ping ({body})')
257                self.REST.sendMessage ('clients/status/webhook', body)
258
259    def interfaceAdmin (self, method, parametros=[]):
260        if method in ['Apagar', 'CambiarAcceso', 'Configurar', 'CrearImagen', 'EjecutarScript', 'getConfiguration', 'getIpAddress', 'IniciarSesion', 'InventarioHardware', 'InventarioSoftware', 'Reiniciar', 'RestaurarImagen']:
261            ## python
262            logger.debug (f'({method}) is a python method')
263            exe = '{}/{}.py'.format (self.pathinterface, method)
264            proc = [exe]+parametros
265        else:  ## ConsolaRemota procesaCache
266            ## bash
267            logger.debug (f'({method}) is a bash method')
268            exe = '{}/{}'.format (self.pathinterface, method)
269
270            LANG = os.environ.get ('LANG', 'en_GB.UTF-8').replace ('UTF_8', 'UTF-8')
271            devel_bash_prefix = f'''
272                PATH=/opt/opengnsys/scripts/:$PATH;
273                source /opt/opengnsys/etc/lang.{LANG}.conf;
274                for I in /opt/opengnsys/lib/engine/bin/*.lib; do source $I; done;
275                for i in $(declare -F |cut -f3 -d" "); do export -f $i; done;
276            '''
277
278            if parametros:
279                proc = ['bash', '-c', '{} {} {}'.format (devel_bash_prefix, exe, ' '.join (parametros))]
280            else:
281                proc = ['bash', '-c', '{} {}'.format (devel_bash_prefix, exe)]
282
283        logger.debug ('subprocess.run ("{}")'.format (' '.join (proc)))
284        p = subprocess.Popen (proc, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
285        if self.pid_q:
286            self.pid_q.put (p.pid)
287        else:
288            ## esto sucede por ejemplo cuando arranca el agente, que estamos en interfaceAdmin() en el mismo hilo, sin _long_running_job ni hilo separado
289            #logger.debug ('no queue--not writing any PID to it')
290            pass
291
292        sout = serr = ''
293        while p.poll() is None:
294            for l in iter (p.stdout.readline, b''):
295                partial = l.decode ('utf-8', 'ignore')
296                if self.stdout_q: self.stdout_q.put (partial)
297                sout += partial
298            for l in iter (p.stderr.readline, b''):
299                partial = l.decode ('utf-8', 'ignore')
300                serr += partial
301            time.sleep (1)
302        sout = sout.strip()
303        serr = serr.strip()
304
305        ## DEBUG
306        logger.debug (f'stdout follows:')
307        for l in sout.splitlines():
308            logger.debug (f'  {l}')
309        #logger.debug (f'stderr follows:')
310        #for l in serr.splitlines():
311        #    logger.debug (f'  {l}')
312        ## /DEBUG
313        if 0 != p.returncode:
314            cmd_txt = ' '.join (proc)
315            logger.error (f'command ({cmd_txt}) failed, stderr follows:')
316            for l in serr.splitlines():
317                logger.error (f'  {l}')
318            raise Exception (f'command ({cmd_txt}) failed, see log for details')
319        return sout
320
321    def tomaIPlocal (self):
322        try:
323            self.IPlocal = self.interfaceAdmin ('getIpAddress')
324        except Exception as e:
325            logger.error (e)
326            logger.error ('No se ha podido recuperar la dirección IP del cliente')
327            return False
328        logger.info ('local IP is "{}"'.format (self.IPlocal))
329        return True
330
331    def tomaMAClocal (self):
332        ## tomaIPlocal() calls interfaceAdm('getIpAddress')
333        ## getIpAddress runs 'ip addr show' and returns the IP address of every network interface except "lo"
334        ## (ie. breaks badly if there's more than one network interface)
335        ## let's make the same assumptions here
336        mac = subprocess.run (["ip -json link show |jq -r '.[] |select (.ifname != \"lo\") |.address'"], shell=True, capture_output=True, text=True)
337        self.mac = mac.stdout.strip()
338        return True
339
340    def enviaMensajeServidor (self, path, obj={}):
341        obj['iph'] = self.IPlocal          ## Ip del ordenador
342        obj['ido'] = self.idordenador      ## Identificador del ordenador
343        obj['npc'] = self.nombreordenador  ## Nombre del ordenador
344        obj['idc'] = self.idcentro         ## Identificador del centro
345        obj['ida'] = self.idaula           ## Identificador del aula
346
347        res = self.REST.sendMessage ('/'.join ([self.name, path]), obj)
348
349        if (type (res) is not dict):
350            logger.error (f'response is not a dict ({res})')
351            return False
352
353        return res
354
355    ## en C, esto envia una trama de respuesta al servidor. Devuelve un boolean
356    ## en python, simplemente termina de construir la respuesta y la devuelve; no envía nada por la red. El caller la usa en return() para enviar implícitamente la respuesta
357    def respuestaEjecucionComando (self, cmd, herror, ids=None):
358        if ids:                 ## Existe seguimiento
359            cmd['ids'] = ids    ## Añade identificador de la sesión
360
361        if 0 == herror:  ## el comando terminó con resultado satisfactorio
362            cmd['res'] = 1
363            cmd['der'] = ''
364        else:            ## el comando tuvo algún error
365            cmd['res'] = 2
366            cmd['der'] = self.tbErroresScripts[herror]
367
368        return cmd
369
370    def cargaPaginaWeb (self, url=None):
371        if (not url): url = self.urlMenu
372        os.system ('pkill -9 browser')
373
374        p = subprocess.Popen (['/usr/bin/browser', '-qws', url])
375        try:
376            p.wait (2)       ## if the process dies before 2 seconds...
377            logger.error ('Error al ejecutar browser, return code "{}"'.format (p.returncode))
378            return False
379        except subprocess.TimeoutExpired:
380            pass
381
382        return True
383
384    def muestraMenu (self):
385        self.cargaPaginaWeb()
386
387    def muestraMensaje (self, idx):
388        self.cargaPaginaWeb (f'{self.urlMsg}?idx={idx}')
389
390    def LeeConfiguracion (self):
391        try:
392            parametroscfg = self.interfaceAdmin ('getConfiguration')   ## Configuración de los Sistemas Operativos del cliente
393        except Exception as e:
394            logger.error (e)
395            logger.error ('No se ha podido recuperar la dirección IP del cliente')
396            return None
397        #logger.debug ('parametroscfg ({})'.format (parametroscfg))
398        return parametroscfg
399
400    def cfg2obj (self, cfg):
401        obj = []
402        ptrPar = cfg.split ('\n')
403        for line in ptrPar:
404            elem = {}
405            ptrCfg = line.split ('\t')
406
407            for item in ptrCfg:
408                if '=' not in item:
409                    logger.warning (f'invalid item ({item})')
410                    continue
411                k, v = item.split ('=', maxsplit=1)
412                elem[k] = v
413
414            obj.append (elem)
415
416        return obj
417
418    def onActivation (self):
419        if not os.path.exists ('/scripts/oginit'):
420            ## no estamos en oglive, este modulo no debe cargarse
421            ## esta lógica la saco de src/opengnsys/linux/operations.py, donde hay un if similar
422            raise Exception ('Refusing to load within an operating system')
423
424        self.pathinterface   = None
425        self.IPlocal         = None     ## Ip del ordenador
426        self.mac             = None     ## MAC del ordenador
427        self.idordenador     = None     ## Identificador del ordenador
428        self.nombreordenador = None     ## Nombre del ordenador
429        self.cache           = None
430        self.idproautoexec   = None
431        self.idcentro        = None     ## Identificador del centro
432        self.idaula          = None     ## Identificador del aula
433        self.pid_q           = None     ## for passing PIDs around
434        self.stdout_q        = None     ## for passing stdout
435        self.progress_jobs   = {}
436
437        ogcore_scheme   = os.environ.get ('OGAGENTCFG_OGCORE_SCHEME',  'https')
438        ogcore_ip       = os.environ.get ('OGAGENTCFG_OGCORE_IP',      '192.168.2.1')
439        ogcore_port     = os.environ.get ('OGAGENTCFG_OGCORE_PORT',    '8443')
440        urlmenu_scheme  = os.environ.get ('OGAGENTCFG_URLMENU_SCHEME', 'https')
441        urlmenu_ip      = os.environ.get ('OGAGENTCFG_URLMENU_IP',     '192.168.2.1')
442        urlmenu_port    = os.environ.get ('OGAGENTCFG_URLMENU_PORT',   '8443')
443        ogcore_ip_port  = ':'.join (map (str, filter (None, [ogcore_ip,  ogcore_port ])))
444        urlmenu_ip_port = ':'.join (map (str, filter (None, [urlmenu_ip, urlmenu_port])))
445        try:
446            url                = self.service.config.get (self.name, 'remote')
447            loglevel           = self.service.config.get (self.name, 'log')
448            self.pathinterface = self.service.config.get (self.name, 'pathinterface')
449            self.urlMenu       = self.service.config.get (self.name, 'urlMenu')
450            self.urlMsg        = self.service.config.get (self.name, 'urlMsg')
451
452            url          = url.format          (ogcore_scheme,  ogcore_ip_port)
453            self.urlMenu = self.urlMenu.format (urlmenu_scheme, urlmenu_ip_port)
454        except NoOptionError as e:
455            logger.error ("Configuration error: {}".format (e))
456            raise e
457        logger.setLevel (loglevel)
458        self.REST = REST (url)
459
460        if not self.tomaIPlocal():
461            raise Exception ('Se han generado errores. No se puede continuar la ejecución de este módulo')
462
463        if not self.tomaMAClocal():
464            raise Exception ('Se han generado errores. No se puede continuar la ejecución de este módulo')
465
466        threading.Thread (name='monitoring_thread', target=self.mon, daemon=True).start()
467
468    def _long_running_job (self, name, f, args):
469        any_job_running = False
470        for k in self.thread_list:
471            if self.thread_list[k]['running']:
472                any_job_running = True
473                break
474        if any_job_running:
475            logger.info ('some job is already running, refusing to launch another one')
476            return { 'job_id': None, 'message': 'some job is already running, refusing to launch another one' }
477
478        job_id = '{}-{}'.format (name, ''.join (random.choice ('0123456789abcdef') for _ in range (8)))
479        import queue
480        self.pid_q    = queue.Queue()   ## a single queue works for us because we never have more than one long_running_job at the same time
481        self.stdout_q = queue.Queue()
482        self.thread_list[job_id] = {
483            'thread': ThreadWithResult (target=f, args=(self.pid_q, self.stdout_q) + args),
484            'starttime': time.time(),
485            'child_pid': None,
486            'running': True,
487            'result': None
488        }
489        self.thread_list[job_id]['thread'].start()
490        return { 'job_id': job_id }
491
492## para matar threads tengo lo siguiente:
493## - aqui en _long_running_job meto una cola en self.pid_q
494## - (self.pid_q fue inicializado a None al instanciar el objeto, para evitar error "objeto no tiene 'pid_q'")
495## - en el thread_list también tengo un child_pid para almacenar el pid de los procesos hijos
496## - al crear el ThreadWithResult le paso la cola, y luego en run() la recojo y la meto en el self.pid_q del thread
497## - en interfaceAdmin() al hacer subprocess.Popen(), recojo el pid y lo escribo en la queue
498## - en mon() recojo pids de la queue y los meto en thread_list 'child_pid'
499##   - algunas funciones llaman a interfaceAdmin más de una vez, y escriben más de un pid en la cola, y en mon() voy recogiendo y actualizando
500##     - por ejemplo EjecutarScript llama a interfaceAdmin() y luego llama a LeeConfiguracion() el cual llama a interfaceAdmin() otra vez
501## - y cuando nos llamen a KillJob, terminamos en killer() el cual coge el 'child_pid' y zas
502##   - pero a lo mejor el child ya terminó
503##   - o a lo mejor el KillJob nos llegó demasiado pronto y todavía no hubo ningún child
504##
505## $ curl --insecure -X POST --data '{"nfn":"EjecutarScript","scp":"cd /usr; sleep 30; pwd; ls","ids":"0"}' https://192.168.2.199:8000/ogAdmClient/EjecutarScript
506## {"job_id": "EjecutarScript-333feb3f"}
507## $ curl --insecure -X POST --data '{"job_id":"EjecutarScript-333feb3f"}' https://192.168.2.199:8000/ogAdmClient/KillJob
508##
509## funciona bien, excepto que el PID no muere xD, ni siquiera haciendo subprocess.run('kill')
510
511## para mostrar el progreso de los jobs reutilizo la misma infra
512## una cola self.stdout_q
513## en interfaceAdmin escribo la stdout parcial que ya venia recogiendo
514## mon() lo recoge y le hace un POST a ogcore
Note: See TracBrowser for help on using the repository browser.