source: ogAgent-Git/src/opengnsys/workers/oglive_worker.py @ 5e4ef18

browser-nuevodecorare-oglive-methodsexec-ogbrowserfix-urllog-sess-lenmainno-ptt-paramno-tlsogagentuser-sigtermoggitoggit-notlsoglogoglog2override-moduleping1ping2ping3ping4sched-tasktlstls-again
Last change on this file since 5e4ef18 was 684cddd, checked in by Natalia Serrano <natalia.serrano@…>, 4 months ago

refs #1483 fix LANG bug

  • Property mode set to 100644
File size: 22.2 KB
Line 
1# -*- coding: utf-8 -*-
2#
3# Copyright (c) 2024 Qindel Formación y Servicios S.L.
4# All rights reserved.
5#
6# Redistribution and use in source and binary forms, with or without modification,
7# are permitted provided that the following conditions are met:
8#
9#    * Redistributions of source code must retain the above copyright notice,
10#      this list of conditions and the following disclaimer.
11#    * Redistributions in binary form must reproduce the above copyright notice,
12#      this list of conditions and the following disclaimer in the documentation
13#      and/or other materials provided with the distribution.
14#    * Neither the name of Virtual Cable S.L. nor the names of its contributors
15#      may be used to endorse or promote products derived from this software
16#      without specific prior written permission.
17#
18# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
21# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
22# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
24# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
25# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
26# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
27# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28"""
29@author: Natalia Serrano, nserrano at qindel dot com
30"""
31# pylint: disable=unused-wildcard-import,wildcard-import
32
33import os
34import re
35import time
36import random
37import subprocess
38import threading
39import signal
40
41from configparser import NoOptionError
42from opengnsys import REST
43from opengnsys.log import logger
44from .server_worker import ServerWorker
45
46## https://stackoverflow.com/questions/6893968/how-to-get-the-return-value-from-a-thread
47class ThreadWithResult (threading.Thread):
48    def run (self):
49        try:
50            self.result = None
51            if self._target is not None:
52                ## the first arg in self._args is the queue
53                self.pid_q    = self._args[0]
54                self.stdout_q = self._args[1]
55                self._args    = self._args[2:]
56                try:
57                    self.result = self._target (*self._args, **self._kwargs)
58                except Exception as e:
59                    self.result = { 'res': 2, 'der': f'got exception: ({e})' }    ## res=2 as defined in ogAdmClient.c:2048
60        finally:
61            # Avoid a refcycle if the thread is running a function with an argument that has a member that points to the thread.
62            del self._target, self._args, self._kwargs, self.pid_q, self.stdout_q
63
64class ogLiveWorker(ServerWorker):
65    thread_list = {}
66    thread_lock = threading.Lock()
67
68    tbErroresScripts = [
69        "Se han generado errores desconocidos. No se puede continuar la ejecución de este módulo",        ## 0
70                "001-Formato de ejecución incorrecto.",
71                "002-Fichero o dispositivo no encontrado",
72                "003-Error en partición de disco",
73                "004-Partición o fichero bloqueado",
74                "005-Error al crear o restaurar una imagen",
75                "006-Sin sistema operativo",
76                "007-Programa o función BOOLEAN no ejecutable",
77                "008-Error en la creación del archivo de eco para consola remota",
78                "009-Error en la lectura del archivo temporal de intercambio",
79                "010-Error al ejecutar la llamada a la interface de administración",
80                "011-La información retornada por la interface de administración excede de la longitud permitida",
81                "012-Error en el envío de fichero por la red",
82                "013-Error en la creación del proceso hijo",
83                "014-Error de escritura en destino",
84                "015-Sin Cache en el Cliente",
85                "016-No hay espacio en la cache para almacenar fichero-imagen",
86                "017-Error al Reducir el Sistema Archivos",
87                "018-Error al Expandir el Sistema Archivos",
88                "019-Valor fuera de rango o no válido.",
89                "020-Sistema de archivos desconocido o no se puede montar",
90                "021-Error en partición de caché local",
91                "022-El disco indicado no contiene una particion GPT",
92                "023-Error no definido",
93                "024-Error no definido",
94                "025-Error no definido",
95                "026-Error no definido",
96                "027-Error no definido",
97                "028-Error no definido",
98                "029-Error no definido",
99                "030-Error al restaurar imagen - Imagen mas grande que particion",
100                "031-Error al realizar el comando updateCache",
101                "032-Error al formatear",
102                "033-Archivo de imagen corrupto o de otra versión de partclone",
103                "034-Error no definido",
104                "035-Error no definido",
105                "036-Error no definido",
106                "037-Error no definido",
107                "038-Error no definido",
108                "039-Error no definido",
109                "040-Error imprevisto no definido",
110                "041-Error no definido",
111                "042-Error no definido",
112                "043-Error no definido",
113                "044-Error no definido",
114                "045-Error no definido",
115                "046-Error no definido",
116                "047-Error no definido",
117                "048-Error no definido",
118                "049-Error no definido",
119                "050-Error en la generación de sintaxis de transferenica unicast",
120                "051-Error en envio UNICAST de una particion",
121                "052-Error en envio UNICAST de un fichero",
122                "053-Error en la recepcion UNICAST de una particion",
123                "054-Error en la recepcion UNICAST de un fichero",
124                "055-Error en la generacion de sintaxis de transferenica Multicast",
125                "056-Error en envio MULTICAST de un fichero",
126                "057-Error en la recepcion MULTICAST de un fichero",
127                "058-Error en envio MULTICAST de una particion",
128                "059-Error en la recepcion MULTICAST de una particion",
129                "060-Error en la conexion de una sesion UNICAST|MULTICAST con el MASTER",
130                "061-Error no definido",
131                "062-Error no definido",
132                "063-Error no definido",
133                "064-Error no definido",
134                "065-Error no definido",
135                "066-Error no definido",
136                "067-Error no definido",
137                "068-Error no definido",
138                "069-Error no definido",
139                "070-Error al montar una imagen sincronizada.",
140                "071-Imagen no sincronizable (es monolitica).",
141                "072-Error al desmontar la imagen.",
142                "073-No se detectan diferencias entre la imagen basica y la particion.",
143                "074-Error al sincronizar, puede afectar la creacion/restauracion de la imagen.",
144                "Error desconocido",
145        ]
146
147    def notifier (self, job_id, result):
148        result['job_id'] = job_id
149        self.REST.sendMessage ('clients/status/webhook', result)
150
151    def killer (self, job_id):
152        logger.debug (f'killer() called, job_id ({job_id})')
153        if job_id not in self.thread_list: return { 'res': 2, 'der': 'Unknown job' }
154
155        with self.thread_lock:
156            if 'thread' not in self.thread_list[job_id]: return { 'res': 2, 'der': 'Job is not running' }
157            t   = self.thread_list[job_id]['thread']
158            pid = self.thread_list[job_id]['child_pid']
159            logger.debug (f'pid ({pid})')
160            try_times = 8
161            sig = signal.SIGTERM
162            msg = f'could not kill pid ({pid}) after ({try_times}) tries'
163            success = 2    ## mimic cmd['res'] in respuestaEjecucionComando(): "1" means success, "2" means failed
164            while True:
165                t.join (0.05)
166                if not t.is_alive():
167                    msg = 'job terminated'
168                    success = 1
169                    logger.debug (msg)
170                    self.thread_list[job_id]['child_pid'] = None
171                    break
172                ## race condition: if the subprocess finishes just here, then we already checked that t.is_alive() is true, but os.path.exists(/proc/pid) will be false below. msg will be 'nothing to kill'.
173                ## this is fine in the first iteration of the loop, before we send any signals. In the rest of iterations, after some signals were sent, msg should be 'job terminated' instead.
174                if pid:
175                    if os.path.exists (f'/proc/{pid}'):
176                        logger.debug (f'sending signal ({sig}) to pid ({pid})')
177                        ## if the process finishes just here, nothing happens: the signal is sent to the void
178                        os.kill (pid, sig)
179                        #subprocess.run (['kill', '--signal', str(sig), str(pid)])
180                    else:
181                        msg = f'pid ({pid}) is gone, nothing to kill'
182                        success = 1
183                        logger.debug (msg)
184                        self.thread_list[job_id]['child_pid'] = None
185                        break
186                else:
187                    msg = 'no PID to kill'
188                    logger.debug (msg)
189
190                if not try_times: break
191                if 4 == try_times: sig = signal.SIGKILL   ## change signal after a few tries
192                try_times -= 1
193                time.sleep (0.4)
194
195        return { 'res':success, 'der':msg }
196
197    def _extract_progress (self, job_id, ary=[]):
198        progress = None
199        for i in ary:
200            if m := re.search (r'^\[([0-9]+)\]', i):     ## look for strings like '[10]', '[60]'
201                logger.debug (f"matched regex, m.groups ({m.groups()})")
202                progress = float (m.groups()[0]) / 100
203        return progress
204
205    def mon (self):
206        while True:
207            with self.thread_lock:
208                for k in self.thread_list:
209                    elem = self.thread_list[k]
210                    if 'thread' not in elem: continue
211                    logger.debug (f'considering thread ({k})')
212
213                    if self.pid_q:
214                        if not self.pid_q.empty():
215                            elem['child_pid'] = self.pid_q.get()
216                            logger.debug (f'queue not empty, got pid ({elem["child_pid"]})')
217
218                    if self.stdout_q:
219                        partial = ''
220                        while not self.stdout_q.empty():
221                            partial += self.stdout_q.get()
222                        lines = partial.splitlines()
223                        if len (lines):
224                            p = self._extract_progress (k, lines)
225                            if p:
226                                m = { "job_id": k, "progress": p }
227                                self.REST.sendMessage ('clients/status/webhook', { "job_id": k, "progress": p })
228
229                    elem['thread'].join (0.05)
230                    if not elem['thread'].is_alive():
231                        logger.debug (f'is no longer alive, k ({k}) thread ({elem["thread"]})')
232                        elem['running'] = False
233                        elem['result'] = elem['thread'].result
234                        del elem['thread']
235                        self.notifier (k, elem['result'])
236
237            time.sleep (1)
238
239    def interfaceAdmin (self, method, parametros=[]):
240        exe = '{}/{}'.format (self.pathinterface, method)
241        ## for development only. Will be removed when the referenced bash code (/opt/opengnsys/lib/engine/bin/*.lib) is translated into python
242        LANG = os.environ.get ('LANG', 'en_GB.UTF-8').replace ('UTF_8', 'UTF-8')
243        devel_bash_prefix = f'''
244            PATH=/opt/opengnsys/scripts/:$PATH;
245            source /opt/opengnsys/etc/lang.{LANG}.conf;
246            for I in /opt/opengnsys/lib/engine/bin/*.lib; do source $I; done;
247            for i in $(declare -F |cut -f3 -d" "); do export -f $i; done;
248        '''
249
250        if parametros:
251            proc = ['bash', '-c', '{} {} {}'.format (devel_bash_prefix, exe, ' '.join (parametros))]
252        else:
253            proc = ['bash', '-c', '{} {}'.format (devel_bash_prefix, exe)]
254        logger.debug ('subprocess.run ("{}", capture_output=True)'.format (proc))
255
256        p = subprocess.Popen (proc, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
257        if self.pid_q:
258            self.pid_q.put (p.pid)
259        else:
260            ## esto sucede por ejemplo cuando arranca el agente, que estamos en interfaceAdmin() en el mismo hilo, sin _long_running_job ni hilo separado
261            logger.debug ('no queue--not writing any PID to it')
262
263        sout = serr = ''
264        while p.poll() is None:
265            for l in iter (p.stdout.readline, b''):
266                partial = l.decode ('utf-8', 'ignore')
267                if self.stdout_q: self.stdout_q.put (partial)
268                sout += partial
269            for l in iter (p.stderr.readline, b''):
270                partial = l.decode ('utf-8', 'ignore')
271                serr += partial
272            time.sleep (1)
273        sout = sout.strip()
274        serr = serr.strip()
275
276        ## DEBUG
277        logger.info (f'stdout follows:')
278        for l in sout.splitlines():
279            logger.info (f'  {l}')
280        logger.info (f'stderr follows:')
281        for l in serr.splitlines():
282            logger.info (f'  {l}')
283        ## /DEBUG
284        if 0 != p.returncode:
285            cmd_txt = ' '.join (proc)
286            logger.error (f'command ({cmd_txt}) failed, stderr follows:')
287            for l in serr.splitlines():
288                logger.error (f'  {l}')
289            raise Exception (f'command ({cmd_txt}) failed, see log for details')
290        return sout
291
292    def tomaIPlocal (self):
293        try:
294            self.IPlocal = self.interfaceAdmin ('getIpAddress')
295        except Exception as e:
296            logger.error (e)
297            logger.error ('No se ha podido recuperar la dirección IP del cliente')
298            return False
299        logger.info ('local IP is "{}"'.format (self.IPlocal))
300        return True
301
302    def tomaMAClocal (self):
303        ## tomaIPlocal() calls interfaceAdm('getIpAddress')
304        ## getIpAddress runs 'ip addr show' and returns the IP address of every network interface except "lo"
305        ## (ie. breaks badly if there's more than one network interface)
306        ## let's make the same assumptions here
307        mac = subprocess.run (["ip -json link show |jq -r '.[] |select (.ifname != \"lo\") |.address'"], shell=True, capture_output=True, text=True)
308        self.mac = mac.stdout.strip()
309        return True
310
311    def enviaMensajeServidor (self, path, obj={}):
312        obj['iph'] = self.IPlocal          ## Ip del ordenador
313        obj['ido'] = self.idordenador      ## Identificador del ordenador
314        obj['npc'] = self.nombreordenador  ## Nombre del ordenador
315        obj['idc'] = self.idcentro         ## Identificador del centro
316        obj['ida'] = self.idaula           ## Identificador del aula
317
318        res = self.REST.sendMessage ('/'.join ([self.name, path]), obj)
319
320        if (type (res) is not dict):
321            #logger.error ('No se ha podido establecer conexión con el Servidor de Administración')   ## Error de conexión con el servidor
322            logger.debug (f'res ({res})')
323            logger.error ('Error al enviar trama ***send() fallo')
324            return False
325
326        return res
327
328    ## en C, esto envia una trama de respuesta al servidor. Devuelve un boolean
329    ## en python, simplemente termina de construir la respuesta y la devuelve; no envía nada por la red. El caller la usa en return() para enviar implícitamente la respuesta
330    def respuestaEjecucionComando (self, cmd, herror, ids=None):
331        if ids:                 ## Existe seguimiento
332            cmd['ids'] = ids    ## Añade identificador de la sesión
333
334        if 0 == herror:  ## el comando terminó con resultado satisfactorio
335            cmd['res'] = 1
336            cmd['der'] = ''
337        else:            ## el comando tuvo algún error
338            cmd['res'] = 2
339            cmd['der'] = self.tbErroresScripts[herror]
340
341        return cmd
342
343    def cargaPaginaWeb (self, url=None):
344        if (not url): url = self.urlMenu
345        os.system ('pkill -9 browser')
346
347        p = subprocess.Popen (['/usr/bin/browser', '-qws', url])
348        try:
349            p.wait (2)       ## if the process dies before 2 seconds...
350            logger.error ('Error al ejecutar la llamada a la interface de administración')
351            logger.error ('Error en la creación del proceso hijo')
352            logger.error ('return code "{}"'.format (p.returncode))
353            return False
354        except subprocess.TimeoutExpired:
355            pass
356
357        return True
358
359    def muestraMenu (self):
360        self.cargaPaginaWeb()
361
362    def muestraMensaje (self, idx):
363        self.cargaPaginaWeb (f'{self.urlMsg}?idx={idx}')
364
365    def LeeConfiguracion (self):
366        try:
367            parametroscfg = self.interfaceAdmin ('getConfiguration')   ## Configuración de los Sistemas Operativos del cliente
368        except Exception as e:
369            logger.error (e)
370            logger.error ('No se ha podido recuperar la dirección IP del cliente')
371            return None
372        logger.debug ('parametroscfg ({})'.format (parametroscfg))
373        return parametroscfg
374
375    def cfg2obj (self, cfg):
376        obj = []
377        ptrPar = cfg.split ('\n')
378        for line in ptrPar:
379            elem = {}
380            ptrCfg = line.split ('\t')
381
382            for item in ptrCfg:
383                k, v = item.split ('=')
384                elem[k] = v
385
386            obj.append (elem)
387
388        return obj
389
390    def onActivation (self):
391        if not os.path.exists ('/scripts/oginit'):
392            ## no estamos en oglive, este modulo no debe cargarse
393            ## esta lógica la saco de src/opengnsys/linux/operations.py, donde hay un if similar
394            raise Exception ('Refusing to load within an operating system')
395
396        self.pathinterface   = None
397        self.IPlocal         = None     ## Ip del ordenador
398        self.mac             = None     ## MAC del ordenador
399        self.idordenador     = None     ## Identificador del ordenador
400        self.nombreordenador = None     ## Nombre del ordenador
401        self.cache           = None
402        self.idproautoexec   = None
403        self.idcentro        = None     ## Identificador del centro
404        self.idaula          = None     ## Identificador del aula
405        self.pid_q           = None     ## for passing PIDs around
406        self.stdout_q        = None     ## for passing stdout
407        self.progress_jobs   = {}
408
409        ogcore_scheme   = os.environ.get ('OGAGENTCFG_OGCORE_SCHEME',  'https')
410        ogcore_ip       = os.environ.get ('OGAGENTCFG_OGCORE_IP',      '192.168.2.1')
411        ogcore_port     = os.environ.get ('OGAGENTCFG_OGCORE_PORT',    '8443')
412        urlmenu_scheme  = os.environ.get ('OGAGENTCFG_URLMENU_SCHEME', 'https')
413        urlmenu_ip      = os.environ.get ('OGAGENTCFG_URLMENU_IP',     '192.168.2.1')
414        urlmenu_port    = os.environ.get ('OGAGENTCFG_URLMENU_PORT',   '8443')
415        ogcore_ip_port  = ':'.join (map (str, filter (None, [ogcore_ip,  ogcore_port ])))
416        urlmenu_ip_port = ':'.join (map (str, filter (None, [urlmenu_ip, urlmenu_port])))
417        try:
418            url                = self.service.config.get (self.name, 'remote')
419            loglevel           = self.service.config.get (self.name, 'log')
420            self.pathinterface = self.service.config.get (self.name, 'pathinterface')
421            self.urlMenu       = self.service.config.get (self.name, 'urlMenu')
422            self.urlMsg        = self.service.config.get (self.name, 'urlMsg')
423
424            url          = url.format          (ogcore_scheme,  ogcore_ip_port)
425            self.urlMenu = self.urlMenu.format (urlmenu_scheme, urlmenu_ip_port)
426        except NoOptionError as e:
427            logger.error ("Configuration error: {}".format (e))
428            raise e
429        logger.setLevel (loglevel)
430        self.REST = REST (url)
431
432        if not self.tomaIPlocal():
433            raise Exception ('Se han generado errores. No se puede continuar la ejecución de este módulo')
434
435        if not self.tomaMAClocal():
436            raise Exception ('Se han generado errores. No se puede continuar la ejecución de este módulo')
437
438        threading.Thread (name='monitoring_thread', target=self.mon, daemon=True).start()
439
440    def _long_running_job (self, name, f, args):
441        any_job_running = False
442        for k in self.thread_list:
443            if self.thread_list[k]['running']:
444                any_job_running = True
445                break
446        if any_job_running:
447            logger.info ('some job is already running, refusing to launch another one')
448            return { 'job_id': None, 'message': 'some job is already running, refusing to launch another one' }
449
450        job_id = '{}-{}'.format (name, ''.join (random.choice ('0123456789abcdef') for _ in range (8)))
451        import queue
452        self.pid_q    = queue.Queue()   ## a single queue works for us because we never have more than one long_running_job at the same time
453        self.stdout_q = queue.Queue()
454        self.thread_list[job_id] = {
455            'thread': ThreadWithResult (target=f, args=(self.pid_q, self.stdout_q) + args),
456            'starttime': time.time(),
457            'child_pid': None,
458            'running': True,
459            'result': None
460        }
461        self.thread_list[job_id]['thread'].start()
462        return { 'job_id': job_id }
463
464## para matar threads tengo lo siguiente:
465## - aqui en _long_running_job meto una cola en self.pid_q
466## - (self.pid_q fue inicializado a None al instanciar el objeto, para evitar error "objeto no tiene 'pid_q'")
467## - en el thread_list también tengo un child_pid para almacenar el pid de los procesos hijos
468## - al crear el ThreadWithResult le paso la cola, y luego en run() la recojo y la meto en el self.pid_q del thread
469## - en interfaceAdmin() al hacer subprocess.Popen(), recojo el pid y lo escribo en la queue
470## - en mon() recojo pids de la queue y los meto en thread_list 'child_pid'
471##   - algunas funciones llaman a interfaceAdmin más de una vez, y escriben más de un pid en la cola, y en mon() voy recogiendo y actualizando
472##     - por ejemplo EjecutarScript llama a interfaceAdmin() y luego llama a LeeConfiguracion() el cual llama a interfaceAdmin() otra vez
473## - y cuando nos llamen a KillJob, terminamos en killer() el cual coge el 'child_pid' y zas
474##   - pero a lo mejor el child ya terminó
475##   - o a lo mejor el KillJob nos llegó demasiado pronto y todavía no hubo ningún child
476##
477## $ curl --insecure -X POST --data '{"nfn":"EjecutarScript","scp":"cd /usr; sleep 30; pwd; ls","ids":"0"}' https://192.168.2.199:8000/ogAdmClient/EjecutarScript
478## {"job_id": "EjecutarScript-333feb3f"}
479## $ curl --insecure -X POST --data '{"job_id":"EjecutarScript-333feb3f"}' https://192.168.2.199:8000/ogAdmClient/KillJob
480##
481## funciona bien, excepto que el PID no muere xD, ni siquiera haciendo subprocess.run('kill')
482
483## para mostrar el progreso de los jobs reutilizo la misma infra
484## una cola self.stdout_q
485## en interfaceAdmin escribo la stdout parcial que ya venia recogiendo
486## mon() lo recoge y le hace un POST a ogcore
Note: See TracBrowser for help on using the repository browser.