source: ogAgent-Git/src/opengnsys/workers/oglive_worker.py @ 0e113ec

browser-nuevodecorare-oglive-methodsexec-ogbrowserfix-urllog-sess-lenmainno-tlsoggitoggit-notlsoglogoglog2ping3ping4sched-tasktlstls-again 3.1.0
Last change on this file since 0e113ec was 70517e7, checked in by Natalia Serrano <natalia.serrano@…>, 8 weeks ago

refs #1850 actually send ping

  • Property mode set to 100644
File size: 23.3 KB
Line 
1# -*- coding: utf-8 -*-
2#
3# Copyright (c) 2024 Qindel Formación y Servicios S.L.
4# All rights reserved.
5#
6# Redistribution and use in source and binary forms, with or without modification,
7# are permitted provided that the following conditions are met:
8#
9#    * Redistributions of source code must retain the above copyright notice,
10#      this list of conditions and the following disclaimer.
11#    * Redistributions in binary form must reproduce the above copyright notice,
12#      this list of conditions and the following disclaimer in the documentation
13#      and/or other materials provided with the distribution.
14#    * Neither the name of Virtual Cable S.L. nor the names of its contributors
15#      may be used to endorse or promote products derived from this software
16#      without specific prior written permission.
17#
18# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
21# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
22# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
24# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
25# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
26# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
27# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28"""
29@author: Natalia Serrano, nserrano at qindel dot com
30"""
31# pylint: disable=unused-wildcard-import,wildcard-import
32
33import os
34import re
35import time
36import random
37import subprocess
38import threading
39import signal
40
41from configparser import NoOptionError
42from opengnsys import REST
43from opengnsys.log import logger
44from .server_worker import ServerWorker
45
46## https://stackoverflow.com/questions/6893968/how-to-get-the-return-value-from-a-thread
47class ThreadWithResult (threading.Thread):
48    def run (self):
49        try:
50            self.result = None
51            if self._target is not None:
52                ## the first arg in self._args is the queue
53                self.pid_q    = self._args[0]
54                self.stdout_q = self._args[1]
55                self._args    = self._args[2:]
56                try:
57                    self.result = self._target (*self._args, **self._kwargs)
58                except Exception as e:
59                    self.result = { 'res': 2, 'der': f'got exception: ({e})' }    ## res=2 as defined in ogAdmClient.c:2048
60        finally:
61            # Avoid a refcycle if the thread is running a function with an argument that has a member that points to the thread.
62            del self._target, self._args, self._kwargs, self.pid_q, self.stdout_q
63
64class ogLiveWorker(ServerWorker):
65    thread_list = {}
66    thread_lock = threading.Lock()
67
68    tbErroresScripts = [
69        "Se han generado errores desconocidos. No se puede continuar la ejecución de este módulo",        ## 0
70                "001-Formato de ejecución incorrecto.",
71                "002-Fichero o dispositivo no encontrado",
72                "003-Error en partición de disco",
73                "004-Partición o fichero bloqueado",
74                "005-Error al crear o restaurar una imagen",
75                "006-Sin sistema operativo",
76                "007-Programa o función BOOLEAN no ejecutable",
77                "008-Error en la creación del archivo de eco para consola remota",
78                "009-Error en la lectura del archivo temporal de intercambio",
79                "010-Error al ejecutar la llamada a la interface de administración",
80                "011-La información retornada por la interface de administración excede de la longitud permitida",
81                "012-Error en el envío de fichero por la red",
82                "013-Error en la creación del proceso hijo",
83                "014-Error de escritura en destino",
84                "015-Sin Cache en el Cliente",
85                "016-No hay espacio en la cache para almacenar fichero-imagen",
86                "017-Error al Reducir el Sistema Archivos",
87                "018-Error al Expandir el Sistema Archivos",
88                "019-Valor fuera de rango o no válido.",
89                "020-Sistema de archivos desconocido o no se puede montar",
90                "021-Error en partición de caché local",
91                "022-El disco indicado no contiene una particion GPT",
92                "023-Error no definido",
93                "024-Error no definido",
94                "025-Error no definido",
95                "026-Error no definido",
96                "027-Error no definido",
97                "028-Error no definido",
98                "029-Error no definido",
99                "030-Error al restaurar imagen - Imagen mas grande que particion",
100                "031-Error al realizar el comando updateCache",
101                "032-Error al formatear",
102                "033-Archivo de imagen corrupto o de otra versión de partclone",
103                "034-Error no definido",
104                "035-Error no definido",
105                "036-Error no definido",
106                "037-Error no definido",
107                "038-Error no definido",
108                "039-Error no definido",
109                "040-Error imprevisto no definido",
110                "041-Error no definido",
111                "042-Error no definido",
112                "043-Error no definido",
113                "044-Error no definido",
114                "045-Error no definido",
115                "046-Error no definido",
116                "047-Error no definido",
117                "048-Error no definido",
118                "049-Error no definido",
119                "050-Error en la generación de sintaxis de transferenica unicast",
120                "051-Error en envio UNICAST de una particion",
121                "052-Error en envio UNICAST de un fichero",
122                "053-Error en la recepcion UNICAST de una particion",
123                "054-Error en la recepcion UNICAST de un fichero",
124                "055-Error en la generacion de sintaxis de transferenica Multicast",
125                "056-Error en envio MULTICAST de un fichero",
126                "057-Error en la recepcion MULTICAST de un fichero",
127                "058-Error en envio MULTICAST de una particion",
128                "059-Error en la recepcion MULTICAST de una particion",
129                "060-Error en la conexion de una sesion UNICAST|MULTICAST con el MASTER",
130                "061-Error no definido",
131                "062-Error no definido",
132                "063-Error no definido",
133                "064-Error no definido",
134                "065-Error no definido",
135                "066-Error no definido",
136                "067-Error no definido",
137                "068-Error no definido",
138                "069-Error no definido",
139                "070-Error al montar una imagen sincronizada.",
140                "071-Imagen no sincronizable (es monolitica).",
141                "072-Error al desmontar la imagen.",
142                "073-No se detectan diferencias entre la imagen basica y la particion.",
143                "074-Error al sincronizar, puede afectar la creacion/restauracion de la imagen.",
144                "Error desconocido",
145        ]
146
147    def notifier (self, job_id, result):
148        result['job_id'] = job_id
149        self.REST.sendMessage ('clients/status/webhook', result)
150
151    def killer (self, job_id):
152        logger.debug (f'killer() called, job_id ({job_id})')
153        if job_id not in self.thread_list: return { 'res': 2, 'der': 'Unknown job' }
154
155        with self.thread_lock:
156            if 'thread' not in self.thread_list[job_id]: return { 'res': 2, 'der': 'Job is not running' }
157            t   = self.thread_list[job_id]['thread']
158            pid = self.thread_list[job_id]['child_pid']
159            logger.debug (f'pid ({pid})')
160            try_times = 8
161            sig = signal.SIGTERM
162            msg = f'could not kill pid ({pid}) after ({try_times}) tries'
163            success = 2    ## mimic cmd['res'] in respuestaEjecucionComando(): "1" means success, "2" means failed
164            while True:
165                t.join (0.05)
166                if not t.is_alive():
167                    msg = 'job terminated'
168                    success = 1
169                    logger.debug (msg)
170                    self.thread_list[job_id]['child_pid'] = None
171                    break
172                ## race condition: if the subprocess finishes just here, then we already checked that t.is_alive() is true, but os.path.exists(/proc/pid) will be false below. msg will be 'nothing to kill'.
173                ## this is fine in the first iteration of the loop, before we send any signals. In the rest of iterations, after some signals were sent, msg should be 'job terminated' instead.
174                if pid:
175                    if os.path.exists (f'/proc/{pid}'):
176                        logger.debug (f'sending signal ({sig}) to pid ({pid})')
177                        ## if the process finishes just here, nothing happens: the signal is sent to the void
178                        os.kill (pid, sig)
179                        #subprocess.run (['kill', '--signal', str(sig), str(pid)])
180                    else:
181                        msg = f'pid ({pid}) is gone, nothing to kill'
182                        success = 1
183                        logger.debug (msg)
184                        self.thread_list[job_id]['child_pid'] = None
185                        break
186                else:
187                    msg = 'no PID to kill'
188                    logger.debug (msg)
189
190                if not try_times: break
191                if 4 == try_times: sig = signal.SIGKILL   ## change signal after a few tries
192                try_times -= 1
193                time.sleep (0.4)
194
195        return { 'res':success, 'der':msg }
196
197    def _extract_progress (self, job_id, ary=[]):
198        progress = None
199        for i in ary:
200            if m := re.search (r'^\[([0-9]+)\]', i):     ## look for strings like '[10]', '[60]'
201                logger.debug (f"matched regex, m.groups ({m.groups()})")
202                progress = float (m.groups()[0]) / 100
203        return progress
204
205    ## monitors child threads, waits for them to finish
206    ## pings ogcore
207    def mon (self):
208        n = 0
209        while True:
210            with self.thread_lock:
211                for k in self.thread_list:
212                    elem = self.thread_list[k]
213                    if 'thread' not in elem: continue
214                    logger.debug (f'considering thread ({k})')
215
216                    if self.pid_q:
217                        if not self.pid_q.empty():
218                            elem['child_pid'] = self.pid_q.get()
219                            logger.debug (f'queue not empty, got pid ({elem["child_pid"]})')
220
221                    if self.stdout_q:
222                        partial = ''
223                        while not self.stdout_q.empty():
224                            partial += self.stdout_q.get()
225                        lines = partial.splitlines()
226                        if len (lines):
227                            p = self._extract_progress (k, lines)
228                            if p:
229                                m = { "job_id": k, "progress": p }
230                                self.REST.sendMessage ('clients/status/webhook', { "job_id": k, "progress": p })
231
232                    elem['thread'].join (0.05)
233                    if not elem['thread'].is_alive():
234                        logger.debug (f'is no longer alive, k ({k}) thread ({elem["thread"]})')
235                        elem['running'] = False
236                        elem['result'] = elem['thread'].result
237                        del elem['thread']
238                        self.notifier (k, elem['result'])
239
240            time.sleep (1)
241            n += 1
242            if not n % 10:
243                body = {
244                    "iph": self.IPlocal,
245                    "ido": self.idordenador,
246                    "npc": self.nombreordenador,
247                    "idc": self.idcentro,
248                    "ida": self.idaula,
249                    "timestamp": int (time.time()),
250                }
251                logger.debug (f'about to send ping ({body})')
252                self.REST.sendMessage ('clients/status/webhook', body)
253
254    def interfaceAdmin (self, method, parametros=[]):
255        if method in ['Apagar', 'CambiarAcceso', 'Configurar', 'CrearImagen', 'EjecutarScript', 'getConfiguration', 'getIpAddress', 'IniciarSesion', 'InventarioHardware', 'InventarioSoftware', 'Reiniciar', 'RestaurarImagen']:
256            ## python
257            logger.debug (f'({method}) is a python method')
258            exe = '{}/{}.py'.format (self.pathinterface, method)
259            proc = [exe]+parametros
260        else:  ## ConsolaRemota procesaCache
261            ## bash
262            logger.debug (f'({method}) is a bash method')
263            exe = '{}/{}'.format (self.pathinterface, method)
264
265            LANG = os.environ.get ('LANG', 'en_GB.UTF-8').replace ('UTF_8', 'UTF-8')
266            devel_bash_prefix = f'''
267                PATH=/opt/opengnsys/scripts/:$PATH;
268                source /opt/opengnsys/etc/lang.{LANG}.conf;
269                for I in /opt/opengnsys/lib/engine/bin/*.lib; do source $I; done;
270                for i in $(declare -F |cut -f3 -d" "); do export -f $i; done;
271            '''
272
273            if parametros:
274                proc = ['bash', '-c', '{} {} {}'.format (devel_bash_prefix, exe, ' '.join (parametros))]
275            else:
276                proc = ['bash', '-c', '{} {}'.format (devel_bash_prefix, exe)]
277
278        logger.debug ('subprocess.run ("{}")'.format (' '.join (proc)))
279        p = subprocess.Popen (proc, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
280        if self.pid_q:
281            self.pid_q.put (p.pid)
282        else:
283            ## esto sucede por ejemplo cuando arranca el agente, que estamos en interfaceAdmin() en el mismo hilo, sin _long_running_job ni hilo separado
284            logger.debug ('no queue--not writing any PID to it')
285
286        sout = serr = ''
287        while p.poll() is None:
288            for l in iter (p.stdout.readline, b''):
289                partial = l.decode ('utf-8', 'ignore')
290                if self.stdout_q: self.stdout_q.put (partial)
291                sout += partial
292            for l in iter (p.stderr.readline, b''):
293                partial = l.decode ('utf-8', 'ignore')
294                serr += partial
295            time.sleep (1)
296        sout = sout.strip()
297        serr = serr.strip()
298
299        ## DEBUG
300        logger.info (f'stdout follows:')
301        for l in sout.splitlines():
302            logger.info (f'  {l}')
303        logger.info (f'stderr follows:')
304        for l in serr.splitlines():
305            logger.info (f'  {l}')
306        ## /DEBUG
307        if 0 != p.returncode:
308            cmd_txt = ' '.join (proc)
309            logger.error (f'command ({cmd_txt}) failed, stderr follows:')
310            for l in serr.splitlines():
311                logger.error (f'  {l}')
312            raise Exception (f'command ({cmd_txt}) failed, see log for details')
313        return sout
314
315    def tomaIPlocal (self):
316        try:
317            self.IPlocal = self.interfaceAdmin ('getIpAddress')
318        except Exception as e:
319            logger.error (e)
320            logger.error ('No se ha podido recuperar la dirección IP del cliente')
321            return False
322        logger.info ('local IP is "{}"'.format (self.IPlocal))
323        return True
324
325    def tomaMAClocal (self):
326        ## tomaIPlocal() calls interfaceAdm('getIpAddress')
327        ## getIpAddress runs 'ip addr show' and returns the IP address of every network interface except "lo"
328        ## (ie. breaks badly if there's more than one network interface)
329        ## let's make the same assumptions here
330        mac = subprocess.run (["ip -json link show |jq -r '.[] |select (.ifname != \"lo\") |.address'"], shell=True, capture_output=True, text=True)
331        self.mac = mac.stdout.strip()
332        return True
333
334    def enviaMensajeServidor (self, path, obj={}):
335        obj['iph'] = self.IPlocal          ## Ip del ordenador
336        obj['ido'] = self.idordenador      ## Identificador del ordenador
337        obj['npc'] = self.nombreordenador  ## Nombre del ordenador
338        obj['idc'] = self.idcentro         ## Identificador del centro
339        obj['ida'] = self.idaula           ## Identificador del aula
340
341        res = self.REST.sendMessage ('/'.join ([self.name, path]), obj)
342
343        if (type (res) is not dict):
344            #logger.error ('No se ha podido establecer conexión con el Servidor de Administración')   ## Error de conexión con el servidor
345            logger.debug (f'res ({res})')
346            logger.error ('Error al enviar trama ***send() fallo')
347            return False
348
349        return res
350
351    ## en C, esto envia una trama de respuesta al servidor. Devuelve un boolean
352    ## en python, simplemente termina de construir la respuesta y la devuelve; no envía nada por la red. El caller la usa en return() para enviar implícitamente la respuesta
353    def respuestaEjecucionComando (self, cmd, herror, ids=None):
354        if ids:                 ## Existe seguimiento
355            cmd['ids'] = ids    ## Añade identificador de la sesión
356
357        if 0 == herror:  ## el comando terminó con resultado satisfactorio
358            cmd['res'] = 1
359            cmd['der'] = ''
360        else:            ## el comando tuvo algún error
361            cmd['res'] = 2
362            cmd['der'] = self.tbErroresScripts[herror]
363
364        return cmd
365
366    def cargaPaginaWeb (self, url=None):
367        if (not url): url = self.urlMenu
368        os.system ('pkill -9 browser')
369
370        p = subprocess.Popen (['/usr/bin/browser', '-qws', url])
371        try:
372            p.wait (2)       ## if the process dies before 2 seconds...
373            logger.error ('Error al ejecutar la llamada a la interface de administración')
374            logger.error ('Error en la creación del proceso hijo')
375            logger.error ('return code "{}"'.format (p.returncode))
376            return False
377        except subprocess.TimeoutExpired:
378            pass
379
380        return True
381
382    def muestraMenu (self):
383        self.cargaPaginaWeb()
384
385    def muestraMensaje (self, idx):
386        self.cargaPaginaWeb (f'{self.urlMsg}?idx={idx}')
387
388    def LeeConfiguracion (self):
389        try:
390            parametroscfg = self.interfaceAdmin ('getConfiguration')   ## Configuración de los Sistemas Operativos del cliente
391        except Exception as e:
392            logger.error (e)
393            logger.error ('No se ha podido recuperar la dirección IP del cliente')
394            return None
395        logger.debug ('parametroscfg ({})'.format (parametroscfg))
396        return parametroscfg
397
398    def cfg2obj (self, cfg):
399        obj = []
400        ptrPar = cfg.split ('\n')
401        for line in ptrPar:
402            elem = {}
403            ptrCfg = line.split ('\t')
404
405            for item in ptrCfg:
406                if '=' not in item:
407                    logger.warning (f'invalid item ({item})')
408                    continue
409                k, v = item.split ('=', maxsplit=1)
410                elem[k] = v
411
412            obj.append (elem)
413
414        return obj
415
416    def onActivation (self):
417        if not os.path.exists ('/scripts/oginit'):
418            ## no estamos en oglive, este modulo no debe cargarse
419            ## esta lógica la saco de src/opengnsys/linux/operations.py, donde hay un if similar
420            raise Exception ('Refusing to load within an operating system')
421
422        self.pathinterface   = None
423        self.IPlocal         = None     ## Ip del ordenador
424        self.mac             = None     ## MAC del ordenador
425        self.idordenador     = None     ## Identificador del ordenador
426        self.nombreordenador = None     ## Nombre del ordenador
427        self.cache           = None
428        self.idproautoexec   = None
429        self.idcentro        = None     ## Identificador del centro
430        self.idaula          = None     ## Identificador del aula
431        self.pid_q           = None     ## for passing PIDs around
432        self.stdout_q        = None     ## for passing stdout
433        self.progress_jobs   = {}
434
435        ogcore_scheme   = os.environ.get ('OGAGENTCFG_OGCORE_SCHEME',  'https')
436        ogcore_ip       = os.environ.get ('OGAGENTCFG_OGCORE_IP',      '192.168.2.1')
437        ogcore_port     = os.environ.get ('OGAGENTCFG_OGCORE_PORT',    '8443')
438        urlmenu_scheme  = os.environ.get ('OGAGENTCFG_URLMENU_SCHEME', 'https')
439        urlmenu_ip      = os.environ.get ('OGAGENTCFG_URLMENU_IP',     '192.168.2.1')
440        urlmenu_port    = os.environ.get ('OGAGENTCFG_URLMENU_PORT',   '8443')
441        ogcore_ip_port  = ':'.join (map (str, filter (None, [ogcore_ip,  ogcore_port ])))
442        urlmenu_ip_port = ':'.join (map (str, filter (None, [urlmenu_ip, urlmenu_port])))
443        try:
444            url                = self.service.config.get (self.name, 'remote')
445            loglevel           = self.service.config.get (self.name, 'log')
446            self.pathinterface = self.service.config.get (self.name, 'pathinterface')
447            self.urlMenu       = self.service.config.get (self.name, 'urlMenu')
448            self.urlMsg        = self.service.config.get (self.name, 'urlMsg')
449
450            url          = url.format          (ogcore_scheme,  ogcore_ip_port)
451            self.urlMenu = self.urlMenu.format (urlmenu_scheme, urlmenu_ip_port)
452        except NoOptionError as e:
453            logger.error ("Configuration error: {}".format (e))
454            raise e
455        logger.setLevel (loglevel)
456        self.REST = REST (url)
457
458        if not self.tomaIPlocal():
459            raise Exception ('Se han generado errores. No se puede continuar la ejecución de este módulo')
460
461        if not self.tomaMAClocal():
462            raise Exception ('Se han generado errores. No se puede continuar la ejecución de este módulo')
463
464        threading.Thread (name='monitoring_thread', target=self.mon, daemon=True).start()
465
466    def _long_running_job (self, name, f, args):
467        any_job_running = False
468        for k in self.thread_list:
469            if self.thread_list[k]['running']:
470                any_job_running = True
471                break
472        if any_job_running:
473            logger.info ('some job is already running, refusing to launch another one')
474            return { 'job_id': None, 'message': 'some job is already running, refusing to launch another one' }
475
476        job_id = '{}-{}'.format (name, ''.join (random.choice ('0123456789abcdef') for _ in range (8)))
477        import queue
478        self.pid_q    = queue.Queue()   ## a single queue works for us because we never have more than one long_running_job at the same time
479        self.stdout_q = queue.Queue()
480        self.thread_list[job_id] = {
481            'thread': ThreadWithResult (target=f, args=(self.pid_q, self.stdout_q) + args),
482            'starttime': time.time(),
483            'child_pid': None,
484            'running': True,
485            'result': None
486        }
487        self.thread_list[job_id]['thread'].start()
488        return { 'job_id': job_id }
489
490## para matar threads tengo lo siguiente:
491## - aqui en _long_running_job meto una cola en self.pid_q
492## - (self.pid_q fue inicializado a None al instanciar el objeto, para evitar error "objeto no tiene 'pid_q'")
493## - en el thread_list también tengo un child_pid para almacenar el pid de los procesos hijos
494## - al crear el ThreadWithResult le paso la cola, y luego en run() la recojo y la meto en el self.pid_q del thread
495## - en interfaceAdmin() al hacer subprocess.Popen(), recojo el pid y lo escribo en la queue
496## - en mon() recojo pids de la queue y los meto en thread_list 'child_pid'
497##   - algunas funciones llaman a interfaceAdmin más de una vez, y escriben más de un pid en la cola, y en mon() voy recogiendo y actualizando
498##     - por ejemplo EjecutarScript llama a interfaceAdmin() y luego llama a LeeConfiguracion() el cual llama a interfaceAdmin() otra vez
499## - y cuando nos llamen a KillJob, terminamos en killer() el cual coge el 'child_pid' y zas
500##   - pero a lo mejor el child ya terminó
501##   - o a lo mejor el KillJob nos llegó demasiado pronto y todavía no hubo ningún child
502##
503## $ curl --insecure -X POST --data '{"nfn":"EjecutarScript","scp":"cd /usr; sleep 30; pwd; ls","ids":"0"}' https://192.168.2.199:8000/ogAdmClient/EjecutarScript
504## {"job_id": "EjecutarScript-333feb3f"}
505## $ curl --insecure -X POST --data '{"job_id":"EjecutarScript-333feb3f"}' https://192.168.2.199:8000/ogAdmClient/KillJob
506##
507## funciona bien, excepto que el PID no muere xD, ni siquiera haciendo subprocess.run('kill')
508
509## para mostrar el progreso de los jobs reutilizo la misma infra
510## una cola self.stdout_q
511## en interfaceAdmin escribo la stdout parcial que ya venia recogiendo
512## mon() lo recoge y le hace un POST a ogcore
Note: See TracBrowser for help on using the repository browser.