source: ogAgent-Git/src/opengnsys/workers/oglive_worker.py @ 5ce0901

browser-nuevodecorare-oglive-methodsexec-ogbrowserfix-urllog-sess-lenmainno-ptt-paramno-tlsogagentuser-sigtermoggitoggit-notlsoglogoglog2override-moduleping1ping2ping3ping4report-progresssched-tasktlstls-again
Last change on this file since 5ce0901 was d5275c6, checked in by Natalia Serrano <natalia.serrano@…>, 4 months ago

refs #1483 report progress of operations to ogcore

  • Property mode set to 100644
File size: 22.1 KB
Line 
1# -*- coding: utf-8 -*-
2#
3# Copyright (c) 2024 Qindel Formación y Servicios S.L.
4# All rights reserved.
5#
6# Redistribution and use in source and binary forms, with or without modification,
7# are permitted provided that the following conditions are met:
8#
9#    * Redistributions of source code must retain the above copyright notice,
10#      this list of conditions and the following disclaimer.
11#    * Redistributions in binary form must reproduce the above copyright notice,
12#      this list of conditions and the following disclaimer in the documentation
13#      and/or other materials provided with the distribution.
14#    * Neither the name of Virtual Cable S.L. nor the names of its contributors
15#      may be used to endorse or promote products derived from this software
16#      without specific prior written permission.
17#
18# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
21# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
22# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
24# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
25# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
26# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
27# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28"""
29@author: Natalia Serrano, nserrano at qindel dot com
30"""
31# pylint: disable=unused-wildcard-import,wildcard-import
32
33import os
34import re
35import time
36import random
37import subprocess
38import threading
39import signal
40
41from configparser import NoOptionError
42from opengnsys import REST
43from opengnsys.log import logger
44from .server_worker import ServerWorker
45
46## https://stackoverflow.com/questions/6893968/how-to-get-the-return-value-from-a-thread
47class ThreadWithResult (threading.Thread):
48    def run (self):
49        try:
50            self.result = None
51            if self._target is not None:
52                ## the first arg in self._args is the queue
53                self.pid_q    = self._args[0]
54                self.stdout_q = self._args[1]
55                self._args    = self._args[2:]
56                try:
57                    self.result = self._target (*self._args, **self._kwargs)
58                except Exception as e:
59                    self.result = { 'res': 2, 'der': f'got exception: ({e})' }    ## res=2 as defined in ogAdmClient.c:2048
60        finally:
61            # Avoid a refcycle if the thread is running a function with an argument that has a member that points to the thread.
62            del self._target, self._args, self._kwargs, self.pid_q, self.stdout_q
63
64class ogLiveWorker(ServerWorker):
65    thread_list = {}
66    thread_lock = threading.Lock()
67
68    tbErroresScripts = [
69        "Se han generado errores desconocidos. No se puede continuar la ejecución de este módulo",        ## 0
70                "001-Formato de ejecución incorrecto.",
71                "002-Fichero o dispositivo no encontrado",
72                "003-Error en partición de disco",
73                "004-Partición o fichero bloqueado",
74                "005-Error al crear o restaurar una imagen",
75                "006-Sin sistema operativo",
76                "007-Programa o función BOOLEAN no ejecutable",
77                "008-Error en la creación del archivo de eco para consola remota",
78                "009-Error en la lectura del archivo temporal de intercambio",
79                "010-Error al ejecutar la llamada a la interface de administración",
80                "011-La información retornada por la interface de administración excede de la longitud permitida",
81                "012-Error en el envío de fichero por la red",
82                "013-Error en la creación del proceso hijo",
83                "014-Error de escritura en destino",
84                "015-Sin Cache en el Cliente",
85                "016-No hay espacio en la cache para almacenar fichero-imagen",
86                "017-Error al Reducir el Sistema Archivos",
87                "018-Error al Expandir el Sistema Archivos",
88                "019-Valor fuera de rango o no válido.",
89                "020-Sistema de archivos desconocido o no se puede montar",
90                "021-Error en partición de caché local",
91                "022-El disco indicado no contiene una particion GPT",
92                "023-Error no definido",
93                "024-Error no definido",
94                "025-Error no definido",
95                "026-Error no definido",
96                "027-Error no definido",
97                "028-Error no definido",
98                "029-Error no definido",
99                "030-Error al restaurar imagen - Imagen mas grande que particion",
100                "031-Error al realizar el comando updateCache",
101                "032-Error al formatear",
102                "033-Archivo de imagen corrupto o de otra versión de partclone",
103                "034-Error no definido",
104                "035-Error no definido",
105                "036-Error no definido",
106                "037-Error no definido",
107                "038-Error no definido",
108                "039-Error no definido",
109                "040-Error imprevisto no definido",
110                "041-Error no definido",
111                "042-Error no definido",
112                "043-Error no definido",
113                "044-Error no definido",
114                "045-Error no definido",
115                "046-Error no definido",
116                "047-Error no definido",
117                "048-Error no definido",
118                "049-Error no definido",
119                "050-Error en la generación de sintaxis de transferenica unicast",
120                "051-Error en envio UNICAST de una particion",
121                "052-Error en envio UNICAST de un fichero",
122                "053-Error en la recepcion UNICAST de una particion",
123                "054-Error en la recepcion UNICAST de un fichero",
124                "055-Error en la generacion de sintaxis de transferenica Multicast",
125                "056-Error en envio MULTICAST de un fichero",
126                "057-Error en la recepcion MULTICAST de un fichero",
127                "058-Error en envio MULTICAST de una particion",
128                "059-Error en la recepcion MULTICAST de una particion",
129                "060-Error en la conexion de una sesion UNICAST|MULTICAST con el MASTER",
130                "061-Error no definido",
131                "062-Error no definido",
132                "063-Error no definido",
133                "064-Error no definido",
134                "065-Error no definido",
135                "066-Error no definido",
136                "067-Error no definido",
137                "068-Error no definido",
138                "069-Error no definido",
139                "070-Error al montar una imagen sincronizada.",
140                "071-Imagen no sincronizable (es monolitica).",
141                "072-Error al desmontar la imagen.",
142                "073-No se detectan diferencias entre la imagen basica y la particion.",
143                "074-Error al sincronizar, puede afectar la creacion/restauracion de la imagen.",
144                "Error desconocido",
145        ]
146
147    def notifier (self, job_id, result):
148        result['job_id'] = job_id
149        self.REST.sendMessage ('clients/status/webhook', result)
150
151    def killer (self, job_id):
152        logger.debug (f'killer() called, job_id ({job_id})')
153        if job_id not in self.thread_list: return { 'res': 2, 'der': 'Unknown job' }
154
155        with self.thread_lock:
156            if 'thread' not in self.thread_list[job_id]: return { 'res': 2, 'der': 'Job is not running' }
157            t   = self.thread_list[job_id]['thread']
158            pid = self.thread_list[job_id]['child_pid']
159            logger.debug (f'pid ({pid})')
160            try_times = 8
161            sig = signal.SIGTERM
162            msg = f'could not kill pid ({pid}) after ({try_times}) tries'
163            success = 2    ## mimic cmd['res'] in respuestaEjecucionComando(): "1" means success, "2" means failed
164            while True:
165                t.join (0.05)
166                if not t.is_alive():
167                    msg = 'job terminated'
168                    success = 1
169                    logger.debug (msg)
170                    self.thread_list[job_id]['child_pid'] = None
171                    break
172                ## race condition: if the subprocess finishes just here, then we already checked that t.is_alive() is true, but os.path.exists(/proc/pid) will be false below. msg will be 'nothing to kill'.
173                ## this is fine in the first iteration of the loop, before we send any signals. In the rest of iterations, after some signals were sent, msg should be 'job terminated' instead.
174                if pid:
175                    if os.path.exists (f'/proc/{pid}'):
176                        logger.debug (f'sending signal ({sig}) to pid ({pid})')
177                        ## if the process finishes just here, nothing happens: the signal is sent to the void
178                        os.kill (pid, sig)
179                        #subprocess.run (['kill', '--signal', str(sig), str(pid)])
180                    else:
181                        msg = f'pid ({pid}) is gone, nothing to kill'
182                        success = 1
183                        logger.debug (msg)
184                        self.thread_list[job_id]['child_pid'] = None
185                        break
186                else:
187                    msg = 'no PID to kill'
188                    logger.debug (msg)
189
190                if not try_times: break
191                if 4 == try_times: sig = signal.SIGKILL   ## change signal after a few tries
192                try_times -= 1
193                time.sleep (0.4)
194
195        return { 'res':success, 'der':msg }
196
197    def _extract_progress (self, job_id, ary=[]):
198        progress = None
199        for i in ary:
200            if m := re.search (r'^\[([0-9]+)\]', i):     ## look for strings like '[10]', '[60]'
201                logger.debug (f"matched regex, m.groups ({m.groups()})")
202                progress = float (m.groups()[0]) / 100
203        return progress
204
205    def mon (self):
206        while True:
207            with self.thread_lock:
208                for k in self.thread_list:
209                    elem = self.thread_list[k]
210                    if 'thread' not in elem: continue
211                    logger.debug (f'considering thread ({k})')
212
213                    if self.pid_q:
214                        if not self.pid_q.empty():
215                            elem['child_pid'] = self.pid_q.get()
216                            logger.debug (f'queue not empty, got pid ({elem["child_pid"]})')
217
218                    if self.stdout_q:
219                        partial = ''
220                        while not self.stdout_q.empty():
221                            partial += self.stdout_q.get()
222                        lines = partial.splitlines()
223                        if len (lines):
224                            p = self._extract_progress (k, lines)
225                            if p:
226                                m = { "job_id": k, "progress": p }
227                                self.REST.sendMessage ('clients/status/webhook', { "job_id": k, "progress": p })
228
229                    elem['thread'].join (0.05)
230                    if not elem['thread'].is_alive():
231                        logger.debug (f'is no longer alive, k ({k}) thread ({elem["thread"]})')
232                        elem['running'] = False
233                        elem['result'] = elem['thread'].result
234                        del elem['thread']
235                        self.notifier (k, elem['result'])
236
237            time.sleep (1)
238
239    def interfaceAdmin (self, method, parametros=[]):
240        exe = '{}/{}'.format (self.pathinterface, method)
241        ## for development only. Will be removed when the referenced bash code (/opt/opengnsys/lib/engine/bin/*.lib) is translated into python
242        devel_bash_prefix = f'''
243            PATH=/opt/opengnsys/scripts/:$PATH;
244            source /opt/opengnsys/etc/lang.{os.environ.get ('LANG', 'en_GB.UTF-8')}.conf;
245            for I in /opt/opengnsys/lib/engine/bin/*.lib; do source $I; done;
246            for i in $(declare -F |cut -f3 -d" "); do export -f $i; done;
247        '''
248
249        if parametros:
250            proc = ['bash', '-c', '{} {} {}'.format (devel_bash_prefix, exe, ' '.join (parametros))]
251        else:
252            proc = ['bash', '-c', '{} {}'.format (devel_bash_prefix, exe)]
253        logger.debug ('subprocess.run ("{}", capture_output=True)'.format (proc))
254
255        p = subprocess.Popen (proc, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
256        if self.pid_q:
257            self.pid_q.put (p.pid)
258        else:
259            ## esto sucede por ejemplo cuando arranca el agente, que estamos en interfaceAdmin() en el mismo hilo, sin _long_running_job ni hilo separado
260            logger.debug ('no queue--not writing any PID to it')
261
262        sout = serr = ''
263        while p.poll() is None:
264            for l in iter (p.stdout.readline, b''):
265                partial = l.decode ('utf-8', 'ignore')
266                if self.stdout_q: self.stdout_q.put (partial)
267                sout += partial
268            for l in iter (p.stderr.readline, b''):
269                partial = l.decode ('utf-8', 'ignore')
270                serr += partial
271            time.sleep (1)
272        sout = sout.strip()
273        serr = serr.strip()
274
275        ## DEBUG
276        logger.info (f'stdout follows:')
277        for l in sout.splitlines():
278            logger.info (f'  {l}')
279        logger.info (f'stderr follows:')
280        for l in serr.splitlines():
281            logger.info (f'  {l}')
282        ## /DEBUG
283        if 0 != p.returncode:
284            cmd_txt = ' '.join (proc)
285            logger.error (f'command ({cmd_txt}) failed, stderr follows:')
286            for l in serr.splitlines():
287                logger.error (f'  {l}')
288            raise Exception (f'command ({cmd_txt}) failed, see log for details')
289        return sout
290
291    def tomaIPlocal (self):
292        try:
293            self.IPlocal = self.interfaceAdmin ('getIpAddress')
294        except Exception as e:
295            logger.error (e)
296            logger.error ('No se ha podido recuperar la dirección IP del cliente')
297            return False
298        logger.info ('local IP is "{}"'.format (self.IPlocal))
299        return True
300
301    def tomaMAClocal (self):
302        ## tomaIPlocal() calls interfaceAdm('getIpAddress')
303        ## getIpAddress runs 'ip addr show' and returns the IP address of every network interface except "lo"
304        ## (ie. breaks badly if there's more than one network interface)
305        ## let's make the same assumptions here
306        mac = subprocess.run (["ip -json link show |jq -r '.[] |select (.ifname != \"lo\") |.address'"], shell=True, capture_output=True, text=True)
307        self.mac = mac.stdout.strip()
308        return True
309
310    def enviaMensajeServidor (self, path, obj={}):
311        obj['iph'] = self.IPlocal          ## Ip del ordenador
312        obj['ido'] = self.idordenador      ## Identificador del ordenador
313        obj['npc'] = self.nombreordenador  ## Nombre del ordenador
314        obj['idc'] = self.idcentro         ## Identificador del centro
315        obj['ida'] = self.idaula           ## Identificador del aula
316
317        res = self.REST.sendMessage ('/'.join ([self.name, path]), obj)
318
319        if (type (res) is not dict):
320            #logger.error ('No se ha podido establecer conexión con el Servidor de Administración')   ## Error de conexión con el servidor
321            logger.debug (f'res ({res})')
322            logger.error ('Error al enviar trama ***send() fallo')
323            return False
324
325        return res
326
327    ## en C, esto envia una trama de respuesta al servidor. Devuelve un boolean
328    ## en python, simplemente termina de construir la respuesta y la devuelve; no envía nada por la red. El caller la usa en return() para enviar implícitamente la respuesta
329    def respuestaEjecucionComando (self, cmd, herror, ids=None):
330        if ids:                 ## Existe seguimiento
331            cmd['ids'] = ids    ## Añade identificador de la sesión
332
333        if 0 == herror:  ## el comando terminó con resultado satisfactorio
334            cmd['res'] = 1
335            cmd['der'] = ''
336        else:            ## el comando tuvo algún error
337            cmd['res'] = 2
338            cmd['der'] = self.tbErroresScripts[herror]
339
340        return cmd
341
342    def cargaPaginaWeb (self, url=None):
343        if (not url): url = self.urlMenu
344        os.system ('pkill -9 browser')
345
346        p = subprocess.Popen (['/usr/bin/browser', '-qws', url])
347        try:
348            p.wait (2)       ## if the process dies before 2 seconds...
349            logger.error ('Error al ejecutar la llamada a la interface de administración')
350            logger.error ('Error en la creación del proceso hijo')
351            logger.error ('return code "{}"'.format (p.returncode))
352            return False
353        except subprocess.TimeoutExpired:
354            pass
355
356        return True
357
358    def muestraMenu (self):
359        self.cargaPaginaWeb()
360
361    def muestraMensaje (self, idx):
362        self.cargaPaginaWeb (f'{self.urlMsg}?idx={idx}')
363
364    def LeeConfiguracion (self):
365        try:
366            parametroscfg = self.interfaceAdmin ('getConfiguration')   ## Configuración de los Sistemas Operativos del cliente
367        except Exception as e:
368            logger.error (e)
369            logger.error ('No se ha podido recuperar la dirección IP del cliente')
370            return None
371        logger.debug ('parametroscfg ({})'.format (parametroscfg))
372        return parametroscfg
373
374    def cfg2obj (self, cfg):
375        obj = []
376        ptrPar = cfg.split ('\n')
377        for line in ptrPar:
378            elem = {}
379            ptrCfg = line.split ('\t')
380
381            for item in ptrCfg:
382                k, v = item.split ('=')
383                elem[k] = v
384
385            obj.append (elem)
386
387        return obj
388
389    def onActivation (self):
390        if not os.path.exists ('/scripts/oginit'):
391            ## no estamos en oglive, este modulo no debe cargarse
392            ## esta lógica la saco de src/opengnsys/linux/operations.py, donde hay un if similar
393            raise Exception ('Refusing to load within an operating system')
394
395        self.pathinterface   = None
396        self.IPlocal         = None     ## Ip del ordenador
397        self.mac             = None     ## MAC del ordenador
398        self.idordenador     = None     ## Identificador del ordenador
399        self.nombreordenador = None     ## Nombre del ordenador
400        self.cache           = None
401        self.idproautoexec   = None
402        self.idcentro        = None     ## Identificador del centro
403        self.idaula          = None     ## Identificador del aula
404        self.pid_q           = None     ## for passing PIDs around
405        self.stdout_q        = None     ## for passing stdout
406        self.progress_jobs   = {}
407
408        ogcore_scheme   = os.environ.get ('OGAGENTCFG_OGCORE_SCHEME',  'https')
409        ogcore_ip       = os.environ.get ('OGAGENTCFG_OGCORE_IP',      '192.168.2.1')
410        ogcore_port     = os.environ.get ('OGAGENTCFG_OGCORE_PORT',    '8443')
411        urlmenu_scheme  = os.environ.get ('OGAGENTCFG_URLMENU_SCHEME', 'https')
412        urlmenu_ip      = os.environ.get ('OGAGENTCFG_URLMENU_IP',     '192.168.2.1')
413        urlmenu_port    = os.environ.get ('OGAGENTCFG_URLMENU_PORT',   '8443')
414        ogcore_ip_port  = ':'.join (map (str, filter (None, [ogcore_ip,  ogcore_port ])))
415        urlmenu_ip_port = ':'.join (map (str, filter (None, [urlmenu_ip, urlmenu_port])))
416        try:
417            url                = self.service.config.get (self.name, 'remote')
418            loglevel           = self.service.config.get (self.name, 'log')
419            self.pathinterface = self.service.config.get (self.name, 'pathinterface')
420            self.urlMenu       = self.service.config.get (self.name, 'urlMenu')
421            self.urlMsg        = self.service.config.get (self.name, 'urlMsg')
422
423            url          = url.format          (ogcore_scheme,  ogcore_ip_port)
424            self.urlMenu = self.urlMenu.format (urlmenu_scheme, urlmenu_ip_port)
425        except NoOptionError as e:
426            logger.error ("Configuration error: {}".format (e))
427            raise e
428        logger.setLevel (loglevel)
429        self.REST = REST (url)
430
431        if not self.tomaIPlocal():
432            raise Exception ('Se han generado errores. No se puede continuar la ejecución de este módulo')
433
434        if not self.tomaMAClocal():
435            raise Exception ('Se han generado errores. No se puede continuar la ejecución de este módulo')
436
437        threading.Thread (name='monitoring_thread', target=self.mon, daemon=True).start()
438
439    def _long_running_job (self, name, f, args):
440        any_job_running = False
441        for k in self.thread_list:
442            if self.thread_list[k]['running']:
443                any_job_running = True
444                break
445        if any_job_running:
446            logger.info ('some job is already running, refusing to launch another one')
447            return { 'job_id': None, 'message': 'some job is already running, refusing to launch another one' }
448
449        job_id = '{}-{}'.format (name, ''.join (random.choice ('0123456789abcdef') for _ in range (8)))
450        import queue
451        self.pid_q    = queue.Queue()   ## a single queue works for us because we never have more than one long_running_job at the same time
452        self.stdout_q = queue.Queue()
453        self.thread_list[job_id] = {
454            'thread': ThreadWithResult (target=f, args=(self.pid_q, self.stdout_q) + args),
455            'starttime': time.time(),
456            'child_pid': None,
457            'running': True,
458            'result': None
459        }
460        self.thread_list[job_id]['thread'].start()
461        return { 'job_id': job_id }
462
463## para matar threads tengo lo siguiente:
464## - aqui en _long_running_job meto una cola en self.pid_q
465## - (self.pid_q fue inicializado a None al instanciar el objeto, para evitar error "objeto no tiene 'pid_q'")
466## - en el thread_list también tengo un child_pid para almacenar el pid de los procesos hijos
467## - al crear el ThreadWithResult le paso la cola, y luego en run() la recojo y la meto en el self.pid_q del thread
468## - en interfaceAdmin() al hacer subprocess.Popen(), recojo el pid y lo escribo en la queue
469## - en mon() recojo pids de la queue y los meto en thread_list 'child_pid'
470##   - algunas funciones llaman a interfaceAdmin más de una vez, y escriben más de un pid en la cola, y en mon() voy recogiendo y actualizando
471##     - por ejemplo EjecutarScript llama a interfaceAdmin() y luego llama a LeeConfiguracion() el cual llama a interfaceAdmin() otra vez
472## - y cuando nos llamen a KillJob, terminamos en killer() el cual coge el 'child_pid' y zas
473##   - pero a lo mejor el child ya terminó
474##   - o a lo mejor el KillJob nos llegó demasiado pronto y todavía no hubo ningún child
475##
476## $ curl --insecure -X POST --data '{"nfn":"EjecutarScript","scp":"cd /usr; sleep 30; pwd; ls","ids":"0"}' https://192.168.2.199:8000/ogAdmClient/EjecutarScript
477## {"job_id": "EjecutarScript-333feb3f"}
478## $ curl --insecure -X POST --data '{"job_id":"EjecutarScript-333feb3f"}' https://192.168.2.199:8000/ogAdmClient/KillJob
479##
480## funciona bien, excepto que el PID no muere xD, ni siquiera haciendo subprocess.run('kill')
481
482## para mostrar el progreso de los jobs reutilizo la misma infra
483## una cola self.stdout_q
484## en interfaceAdmin escribo la stdout parcial que ya venia recogiendo
485## mon() lo recoge y le hace un POST a ogcore
Note: See TracBrowser for help on using the repository browser.