source: ogAgent-Git/src/opengnsys/workers/oglive_worker.py @ b5b09ec

browser-nuevodecorare-oglive-methodserror-no-netexec-ogbrowserfix-urllog-sess-lenmainno-ptt-paramno-tlsogagentuser-sigtermoggitoggit-notlsoglogoglog2override-moduleping1ping2ping3ping4report-progresssched-tasktlstls-again
Last change on this file since b5b09ec was f8563ec, checked in by Natalia Serrano <natalia.serrano@…>, 4 months ago

refs #1460 merge server modules

  • Property mode set to 100644
File size: 20.7 KB
Line 
1# -*- coding: utf-8 -*-
2#
3# Copyright (c) 2024 Qindel Formación y Servicios S.L.
4# All rights reserved.
5#
6# Redistribution and use in source and binary forms, with or without modification,
7# are permitted provided that the following conditions are met:
8#
9#    * Redistributions of source code must retain the above copyright notice,
10#      this list of conditions and the following disclaimer.
11#    * Redistributions in binary form must reproduce the above copyright notice,
12#      this list of conditions and the following disclaimer in the documentation
13#      and/or other materials provided with the distribution.
14#    * Neither the name of Virtual Cable S.L. nor the names of its contributors
15#      may be used to endorse or promote products derived from this software
16#      without specific prior written permission.
17#
18# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
21# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
22# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
24# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
25# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
26# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
27# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28"""
29@author: Natalia Serrano, nserrano at qindel dot com
30"""
31# pylint: disable=unused-wildcard-import,wildcard-import
32
33import os
34import time
35import random
36import subprocess
37import threading
38import signal
39
40from configparser import NoOptionError
41from opengnsys import REST
42from opengnsys.log import logger
43from .server_worker import ServerWorker
44
45## https://stackoverflow.com/questions/6893968/how-to-get-the-return-value-from-a-thread
46class ThreadWithResult (threading.Thread):
47    def run (self):
48        try:
49            self.result = None
50            if self._target is not None:
51                ## the first arg in self._args is the queue
52                self.q     = self._args[0]
53                self._args = self._args[1:]
54                try:
55                    self.result = self._target (*self._args, **self._kwargs)
56                except Exception as e:
57                    self.result = { 'res': 2, 'der': f'got exception: ({e})' }    ## res=2 as defined in ogAdmClient.c:2048
58        finally:
59            # Avoid a refcycle if the thread is running a function with an argument that has a member that points to the thread.
60            del self._target, self._args, self._kwargs, self.q
61
62class ogLiveWorker(ServerWorker):
63    thread_list = {}
64    thread_lock = threading.Lock()
65
66    tbErroresScripts = [
67        "Se han generado errores desconocidos. No se puede continuar la ejecución de este módulo",        ## 0
68                "001-Formato de ejecución incorrecto.",
69                "002-Fichero o dispositivo no encontrado",
70                "003-Error en partición de disco",
71                "004-Partición o fichero bloqueado",
72                "005-Error al crear o restaurar una imagen",
73                "006-Sin sistema operativo",
74                "007-Programa o función BOOLEAN no ejecutable",
75                "008-Error en la creación del archivo de eco para consola remota",
76                "009-Error en la lectura del archivo temporal de intercambio",
77                "010-Error al ejecutar la llamada a la interface de administración",
78                "011-La información retornada por la interface de administración excede de la longitud permitida",
79                "012-Error en el envío de fichero por la red",
80                "013-Error en la creación del proceso hijo",
81                "014-Error de escritura en destino",
82                "015-Sin Cache en el Cliente",
83                "016-No hay espacio en la cache para almacenar fichero-imagen",
84                "017-Error al Reducir el Sistema Archivos",
85                "018-Error al Expandir el Sistema Archivos",
86                "019-Valor fuera de rango o no válido.",
87                "020-Sistema de archivos desconocido o no se puede montar",
88                "021-Error en partición de caché local",
89                "022-El disco indicado no contiene una particion GPT",
90                "023-Error no definido",
91                "024-Error no definido",
92                "025-Error no definido",
93                "026-Error no definido",
94                "027-Error no definido",
95                "028-Error no definido",
96                "029-Error no definido",
97                "030-Error al restaurar imagen - Imagen mas grande que particion",
98                "031-Error al realizar el comando updateCache",
99                "032-Error al formatear",
100                "033-Archivo de imagen corrupto o de otra versión de partclone",
101                "034-Error no definido",
102                "035-Error no definido",
103                "036-Error no definido",
104                "037-Error no definido",
105                "038-Error no definido",
106                "039-Error no definido",
107                "040-Error imprevisto no definido",
108                "041-Error no definido",
109                "042-Error no definido",
110                "043-Error no definido",
111                "044-Error no definido",
112                "045-Error no definido",
113                "046-Error no definido",
114                "047-Error no definido",
115                "048-Error no definido",
116                "049-Error no definido",
117                "050-Error en la generación de sintaxis de transferenica unicast",
118                "051-Error en envio UNICAST de una particion",
119                "052-Error en envio UNICAST de un fichero",
120                "053-Error en la recepcion UNICAST de una particion",
121                "054-Error en la recepcion UNICAST de un fichero",
122                "055-Error en la generacion de sintaxis de transferenica Multicast",
123                "056-Error en envio MULTICAST de un fichero",
124                "057-Error en la recepcion MULTICAST de un fichero",
125                "058-Error en envio MULTICAST de una particion",
126                "059-Error en la recepcion MULTICAST de una particion",
127                "060-Error en la conexion de una sesion UNICAST|MULTICAST con el MASTER",
128                "061-Error no definido",
129                "062-Error no definido",
130                "063-Error no definido",
131                "064-Error no definido",
132                "065-Error no definido",
133                "066-Error no definido",
134                "067-Error no definido",
135                "068-Error no definido",
136                "069-Error no definido",
137                "070-Error al montar una imagen sincronizada.",
138                "071-Imagen no sincronizable (es monolitica).",
139                "072-Error al desmontar la imagen.",
140                "073-No se detectan diferencias entre la imagen basica y la particion.",
141                "074-Error al sincronizar, puede afectar la creacion/restauracion de la imagen.",
142                "Error desconocido",
143        ]
144
145    def notifier (self, job_id, result):
146        logger.debug (f'notifier() called, job_id ({job_id}) result ({result})')
147        result['job_id'] = job_id
148        self.REST.sendMessage ('clients/status/webhook', result)
149
150    def killer (self, job_id):
151        logger.debug (f'killer() called, job_id ({job_id})')
152        if job_id not in self.thread_list: return { 'res': 2, 'der': 'Unknown job' }
153
154        with self.thread_lock:
155            if 'thread' not in self.thread_list[job_id]: return { 'res': 2, 'der': 'Job is not running' }
156            t   = self.thread_list[job_id]['thread']
157            pid = self.thread_list[job_id]['child_pid']
158            logger.debug (f'pid ({pid})')
159            try_times = 8
160            sig = signal.SIGTERM
161            msg = f'could not kill pid ({pid}) after ({try_times}) tries'
162            success = 2    ## mimic cmd['res'] in respuestaEjecucionComando(): "1" means success, "2" means failed
163            while True:
164                t.join (0.05)
165                if not t.is_alive():
166                    msg = 'job terminated'
167                    success = 1
168                    logger.debug (msg)
169                    self.thread_list[job_id]['child_pid'] = None
170                    break
171                ## race condition: if the subprocess finishes just here, then we already checked that t.is_alive() is true, but os.path.exists(/proc/pid) will be false below. msg will be 'nothing to kill'.
172                ## this is fine in the first iteration of the loop, before we send any signals. In the rest of iterations, after some signals were sent, msg should be 'job terminated' instead.
173                if pid:
174                    if os.path.exists (f'/proc/{pid}'):
175                        logger.debug (f'sending signal ({sig}) to pid ({pid})')
176                        ## if the process finishes just here, nothing happens: the signal is sent to the void
177                        os.kill (pid, sig)
178                        #subprocess.run (['kill', '--signal', str(sig), str(pid)])
179                    else:
180                        msg = f'pid ({pid}) is gone, nothing to kill'
181                        success = 1
182                        logger.debug (msg)
183                        self.thread_list[job_id]['child_pid'] = None
184                        break
185                else:
186                    msg = 'no PID to kill'
187                    logger.debug (msg)
188
189                if not try_times: break
190                if 4 == try_times: sig = signal.SIGKILL   ## change signal after a few tries
191                try_times -= 1
192                time.sleep (0.4)
193
194        return { 'res':success, 'der':msg }
195
196    def mon (self):
197        while True:
198            with self.thread_lock:
199                for k in self.thread_list:
200                    elem = self.thread_list[k]
201                    if 'thread' not in elem: continue
202                    logger.debug (f'considering thread ({k})')
203
204                    if self.q:
205                        if not self.q.empty():
206                            elem['child_pid'] = self.q.get()
207                            logger.debug (f'queue not empty, got pid ({elem["child_pid"]})')
208                        else:
209                            logger.debug (f'queue empty')
210
211                    elem['thread'].join (0.05)
212                    if not elem['thread'].is_alive():
213                        logger.debug (f'is no longer alive, k ({k}) thread ({elem["thread"]})')
214                        elem['running'] = False
215                        elem['result'] = elem['thread'].result
216                        del elem['thread']
217                        self.notifier (k, elem['result'])
218
219            time.sleep (1)
220
221    def interfaceAdmin (self, method, parametros=[]):
222        exe = '{}/{}'.format (self.pathinterface, method)
223        ## for development only. Will be removed when the referenced bash code (/opt/opengnsys/lib/engine/bin/*.lib) is translated into python
224        devel_bash_prefix = '''
225            PATH=/opt/opengnsys/scripts/:$PATH;
226            for I in /opt/opengnsys/lib/engine/bin/*.lib; do source $I; done;
227            for i in $(declare -F |cut -f3 -d" "); do export -f $i; done;
228        '''
229
230        if parametros:
231            proc = ['bash', '-c', '{} {} {}'.format (devel_bash_prefix, exe, ' '.join (parametros))]
232        else:
233            proc = ['bash', '-c', '{} {}'.format (devel_bash_prefix, exe)]
234        logger.debug ('subprocess.run ("{}", capture_output=True)'.format (proc))
235
236        p = subprocess.Popen (proc, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
237        if self.q:
238            self.q.put (p.pid)
239        else:
240            ## esto sucede por ejemplo cuando arranca el agente, que estamos en interfaceAdmin() en el mismo hilo, sin _long_running_job ni hilo separado
241            logger.debug ('no queue--not writing any PID to it')
242        sout = serr = ''
243        while p.poll() is None:
244            for l in iter (p.stdout.readline, b''): sout += l.decode ('utf-8', 'ignore')
245            for l in iter (p.stderr.readline, b''): serr += l.decode ('utf-8', 'ignore')
246            time.sleep (1)
247        sout = sout.strip()
248        serr = serr.strip()
249
250        ## DEBUG
251        logger.info (f'stdout follows:')
252        for l in sout.splitlines():
253            logger.info (f'  {l}')
254        logger.info (f'stderr follows:')
255        for l in serr.splitlines():
256            logger.info (f'  {l}')
257        ## /DEBUG
258        if 0 != p.returncode:
259            cmd_txt = ' '.join (proc)
260            logger.error (f'command ({cmd_txt}) failed, stderr follows:')
261            for l in serr.splitlines():
262                logger.error (f'  {l}')
263            raise Exception (f'command ({cmd_txt}) failed, see log for details')
264        return sout
265
266    def tomaIPlocal (self):
267        try:
268            self.IPlocal = self.interfaceAdmin ('getIpAddress')
269        except Exception as e:
270            logger.error (e)
271            logger.error ('No se ha podido recuperar la dirección IP del cliente')
272            return False
273        logger.info ('local IP is "{}"'.format (self.IPlocal))
274        return True
275
276    def tomaMAClocal (self):
277        ## tomaIPlocal() calls interfaceAdm('getIpAddress')
278        ## getIpAddress runs 'ip addr show' and returns the IP address of every network interface except "lo"
279        ## (ie. breaks badly if there's more than one network interface)
280        ## let's make the same assumptions here
281        mac = subprocess.run (["ip -json link show |jq -r '.[] |select (.ifname != \"lo\") |.address'"], shell=True, capture_output=True, text=True)
282        self.mac = mac.stdout.strip()
283        return True
284
285    def enviaMensajeServidor (self, path, obj={}):
286        obj['iph'] = self.IPlocal          ## Ip del ordenador
287        obj['ido'] = self.idordenador      ## Identificador del ordenador
288        obj['npc'] = self.nombreordenador  ## Nombre del ordenador
289        obj['idc'] = self.idcentro         ## Identificador del centro
290        obj['ida'] = self.idaula           ## Identificador del aula
291
292        res = self.REST.sendMessage ('/'.join ([self.name, path]), obj)
293
294        if (type (res) is not dict):
295            #logger.error ('No se ha podido establecer conexión con el Servidor de Administración')   ## Error de conexión con el servidor
296            logger.debug (f'res ({res})')
297            logger.error ('Error al enviar trama ***send() fallo')
298            return False
299
300        return res
301
302    ## en C, esto envia una trama de respuesta al servidor. Devuelve un boolean
303    ## en python, simplemente termina de construir la respuesta y la devuelve; no envía nada por la red. El caller la usa en return() para enviar implícitamente la respuesta
304    def respuestaEjecucionComando (self, cmd, herror, ids=None):
305        if ids:                 ## Existe seguimiento
306            cmd['ids'] = ids    ## Añade identificador de la sesión
307
308        if 0 == herror:  ## el comando terminó con resultado satisfactorio
309            cmd['res'] = 1
310            cmd['der'] = ''
311        else:            ## el comando tuvo algún error
312            cmd['res'] = 2
313            cmd['der'] = self.tbErroresScripts[herror]
314
315        return cmd
316
317    def cargaPaginaWeb (self, url=None):
318        if (not url): url = self.urlMenu
319        os.system ('pkill -9 browser')
320
321        p = subprocess.Popen (['/usr/bin/browser', '-qws', url])
322        try:
323            p.wait (2)       ## if the process dies before 2 seconds...
324            logger.error ('Error al ejecutar la llamada a la interface de administración')
325            logger.error ('Error en la creación del proceso hijo')
326            logger.error ('return code "{}"'.format (p.returncode))
327            return False
328        except subprocess.TimeoutExpired:
329            pass
330
331        return True
332
333    def muestraMenu (self):
334        self.cargaPaginaWeb()
335
336    def muestraMensaje (self, idx):
337        self.cargaPaginaWeb (f'{self.urlMsg}?idx={idx}')
338
339    def LeeConfiguracion (self):
340        try:
341            parametroscfg = self.interfaceAdmin ('getConfiguration')   ## Configuración de los Sistemas Operativos del cliente
342        except Exception as e:
343            logger.error (e)
344            logger.error ('No se ha podido recuperar la dirección IP del cliente')
345            return None
346        logger.debug ('parametroscfg ({})'.format (parametroscfg))
347        return parametroscfg
348
349    def cfg2obj (self, cfg):
350        obj = []
351        ptrPar = cfg.split ('\n')
352        for line in ptrPar:
353            elem = {}
354            ptrCfg = line.split ('\t')
355
356            for item in ptrCfg:
357                k, v = item.split ('=')
358                elem[k] = v
359
360            obj.append (elem)
361
362        return obj
363
364    def onActivation (self):
365        if not os.path.exists ('/scripts/oginit'):
366            ## no estamos en oglive, este modulo no debe cargarse
367            ## esta lógica la saco de src/opengnsys/linux/operations.py, donde hay un if similar
368            raise Exception ('Refusing to load within an operating system')
369
370        self.pathinterface   = None
371        self.IPlocal         = None     ## Ip del ordenador
372        self.mac             = None     ## MAC del ordenador
373        self.idordenador     = None     ## Identificador del ordenador
374        self.nombreordenador = None     ## Nombre del ordenador
375        self.cache           = None
376        self.idproautoexec   = None
377        self.idcentro        = None     ## Identificador del centro
378        self.idaula          = None     ## Identificador del aula
379        self.q               = None     ## for passing PIDs around
380
381        ogcore_scheme   = os.environ.get ('OGAGENTCFG_OGCORE_SCHEME',  'https')
382        ogcore_ip       = os.environ.get ('OGAGENTCFG_OGCORE_IP',      '192.168.2.1')
383        ogcore_port     = os.environ.get ('OGAGENTCFG_OGCORE_PORT',    '8443')
384        urlmenu_scheme  = os.environ.get ('OGAGENTCFG_URLMENU_SCHEME', 'https')
385        urlmenu_ip      = os.environ.get ('OGAGENTCFG_URLMENU_IP',     '192.168.2.1')
386        urlmenu_port    = os.environ.get ('OGAGENTCFG_URLMENU_PORT',   '8443')
387        ogcore_ip_port  = ':'.join (map (str, filter (None, [ogcore_ip,  ogcore_port ])))
388        urlmenu_ip_port = ':'.join (map (str, filter (None, [urlmenu_ip, urlmenu_port])))
389        try:
390            url                = self.service.config.get (self.name, 'remote')
391            loglevel           = self.service.config.get (self.name, 'log')
392            self.pathinterface = self.service.config.get (self.name, 'pathinterface')
393            self.urlMenu       = self.service.config.get (self.name, 'urlMenu')
394            self.urlMsg        = self.service.config.get (self.name, 'urlMsg')
395
396            url          = url.format          (ogcore_scheme,  ogcore_ip_port)
397            self.urlMenu = self.urlMenu.format (urlmenu_scheme, urlmenu_ip_port)
398        except NoOptionError as e:
399            logger.error ("Configuration error: {}".format (e))
400            raise e
401        logger.setLevel (loglevel)
402        self.REST = REST (url)
403
404        if not self.tomaIPlocal():
405            raise Exception ('Se han generado errores. No se puede continuar la ejecución de este módulo')
406
407        if not self.tomaMAClocal():
408            raise Exception ('Se han generado errores. No se puede continuar la ejecución de este módulo')
409
410        threading.Thread (name='monitoring_thread', target=self.mon, daemon=True).start()
411
412    def _long_running_job (self, name, f, args):
413        any_job_running = False
414        for k in self.thread_list:
415            if self.thread_list[k]['running']:
416                any_job_running = True
417                break
418        if any_job_running:
419            logger.info ('some job is already running, refusing to launch another one')
420            return { 'job_id': None, 'message': 'some job is already running, refusing to launch another one' }
421
422        job_id = '{}-{}'.format (name, ''.join (random.choice ('0123456789abcdef') for _ in range (8)))
423        import queue
424        self.q = queue.Queue()   ## a single queue works for us because we never have more than one long_running_job at the same time
425        self.thread_list[job_id] = {
426            'thread': ThreadWithResult (target=f, args=(self.q,) + args),
427            'starttime': time.time(),
428            'child_pid': None,
429            'running': True,
430            'result': None
431        }
432        self.thread_list[job_id]['thread'].start()
433        return { 'job_id': job_id }
434
435## para matar threads tengo lo siguiente:
436## - aqui en _long_running_job meto una cola en self.q
437## - (self.q fue inicializado a None al instanciar el objeto, para evitar error "objeto no tiene 'q'")
438## - en el thread_list también tengo un child_pid para almacenar el pid de los procesos hijos
439## - al crear el ThreadWithResult le paso la cola, y luego en run() la recojo y la meto en el self.q del thread
440## - en interfaceAdmin() al hacer subprocess.Popen(), recojo el pid y lo escribo en la queue
441## - en mon() recojo pids de la queue y los meto en thread_list 'child_pid'
442##   - algunas funciones llaman a interfaceAdmin más de una vez, y escriben más de un pid en la cola, y en mon() voy recogiendo y actualizando
443##     - por ejemplo EjecutarScript llama a interfaceAdmin() y luego llama a LeeConfiguracion() el cual llama a interfaceAdmin() otra vez
444## - y cuando nos llamen a KillJob, terminamos en killer() el cual coge el 'child_pid' y zas
445##   - pero a lo mejor el child ya terminó
446##   - o a lo mejor el KillJob nos llegó demasiado pronto y todavía no hubo ningún child
447##
448## $ curl --insecure -X POST --data '{"nfn":"EjecutarScript","scp":"cd /usr; sleep 30; pwd; ls","ids":"0"}' https://192.168.2.199:8000/ogAdmClient/EjecutarScript
449## {"job_id": "EjecutarScript-333feb3f"}
450## $ curl --insecure -X POST --data '{"job_id":"EjecutarScript-333feb3f"}' https://192.168.2.199:8000/CloningEngine/KillJob
451##
452## funciona bien, excepto que el PID no muere xD, ni siquiera haciendo subprocess.run('kill')
Note: See TracBrowser for help on using the repository browser.