import subprocess import datetime from zoneinfo import ZoneInfo import sys import os import select import json import shutil import inspect import glob import ogGlobals import StringLib def _logtype2logfile (t): if 'log' == t.lower(): return ogGlobals.OGLOGFILE if 'jsonlog' == t.lower(): return ogGlobals.OGJSONLOGFILE elif 'command' == t.lower(): return ogGlobals.OGLOGCOMMAND elif 'session' == t.lower(): return ogGlobals.OGLOGSESSION else: raise Exception (f'unknown log type ({t})') #/** # ogEcho [str_logtype ...] [str_loglevel] "str_message" ... #@brief Muestra mensajes en consola y lo registra en fichero de incidencias. #@param str_logtype tipo de registro de incidencias ("log", "command", "session") #@param str_loglevel nivel de registro de incidencias ("info", "warning", "error") #@param str_message mensaje (puede recibir más de 1 parámetro. #@return Mensaje mostrado. #*/ def ogEcho (logtypes, loglevel, msg): logfiles = ['/dev/stdout'] if type (logtypes) is list: for l in logtypes: if 'log' == l: logfiles.append (_logtype2logfile ('log')) logfiles.append (_logtype2logfile ('jsonlog')) else: logfiles.append (_logtype2logfile (l)) else: ## string if 'log' == logtypes: logfiles.append (_logtype2logfile ('log')) logfiles.append (_logtype2logfile ('jsonlog')) else: logfiles.append (_logtype2logfile (logtypes)) if loglevel is None or 'help' == loglevel: if ogGlobals.DEBUG.lower() != "no": logfiles.append (ogGlobals.OGLOGFILE) logfiles.append (ogGlobals.OGJSONLOGFILE) for f in logfiles: with open (f, 'a') as fd: if ogGlobals.OGJSONLOGFILE == f: fd.write (json.dumps ({'message':msg}) + '\n') else: fd.write (msg + '\n') return if 'info' == loglevel or 'warning' == loglevel or 'error' == loglevel: DATETIME = datetime.datetime.now (ZoneInfo (ogGlobals.TZ)).strftime ('%F %T %Z') DATETIME_json = datetime.datetime.now (ZoneInfo (ogGlobals.TZ)).strftime ('%Y-%m-%d %H:%M:%S') for f in logfiles: with open (f, 'a') as fd: if ogGlobals.OGJSONLOGFILE == f: fd.write (json.dumps ({'timestamp':DATETIME_json, 'severity':loglevel, 'message':msg}) + '\n') else: fd.write (f"OpenGnsys {loglevel} {DATETIME} {msg}\n") else: raise Exception (f'unknown loglevel ({loglevel})') #/** # ogExecAndLog str_logfile ... str_command ... #@brief Ejecuta un comando y guarda su salida en fichero de registro. #@param str_logfile fichero de registro (pueden ser varios). #@param str_command comando y comandos a ejecutar. #@return Salida de ejecución del comando. #@note str_logfile = { LOG, SESSION, COMMAND } #*/ #ogExecAndLog (str_logfile ... str_command ...", #ogExecAndLog ([], ['/path/to/script', *args]) #ogExecAndLog ('command', ['/path/to/script', *args]) #ogExecAndLog (['command'], ['/path/to/script', *args]) #ogExecAndLog (['log', 'command'], ['/path/to/script', *args]) def ogExecAndLog (logtypes, script_and_args): logfiles = ['/dev/stdout'] if type (logtypes) is list: for l in logtypes: logtypes = list (map (lambda x: x.lower(), logtypes)) if 'log' == l: logfiles.append (_logtype2logfile ('log')) logfiles.append (_logtype2logfile ('jsonlog')) else: logfiles.append (_logtype2logfile (l)) else: ## string logtypes = logtypes.lower() if 'log' == logtypes: logfiles.append (_logtype2logfile ('log')) logfiles.append (_logtype2logfile ('jsonlog')) else: logfiles.append (_logtype2logfile (logtypes)) if not script_and_args: ogRaiseError ([], ogGlobals.OG_ERR_FORMAT, 'no function provided') return ## the original bash code does something like this: #if [ $ISCOMMAND ]; then # > $OGLOGCOMMAND # REDIREC="2>&1" #fi #eval $COMMAND $REDIREC | tee -a $FILES ## a hybrid bash/python pseudocode would end up being like the following: #if 'command' in logtypes: # rm $OGLOGCOMMAND # touch $OGLOGCOMMAND # #if 'command' in logtypes: # ## redirect both stdout and stderr # eval $COMMAND 2>&1 | tee -a $FILES #else: # ## redirect stdout only # eval $COMMAND | tee -a $FILES capture_stderr = False if 'command' in logtypes: os.unlink (ogGlobals.OGLOGCOMMAND) open (ogGlobals.OGLOGCOMMAND, 'w').close() capture_stderr = True p = subprocess.Popen (script_and_args, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) while True: ready_to_read, _, _ = select.select ([p.stdout, p.stderr], [], [], 1) partial_out = '' if p.stdout in ready_to_read: l = p.stdout.readline() partial_out += l if p.stderr in ready_to_read: l = p.stderr.readline() ## always read from stderr even if we're discarding it, to prevent buffers from filling up if capture_stderr: partial_out += l if partial_out: for f in logfiles: with open (f, 'a') as fd: if ogGlobals.OGJSONLOGFILE == f: fd.write (json.dumps ({'message':partial_out})) else: fd.write (partial_out) if p.poll() is not None: break rc = p.returncode return not rc ## negate shell return code #/** # ogGetCaller #@brief Devuelve nombre del programa o script ejecutor (padre). #@return str_name - Nombre del programa ejecutor. #*/ def ogGetCaller(): if 'COLUMNS' in os.environ: cols = os.environ['COLUMNS'] else: cols = None lines = subprocess.run (["ps", "hp", str(os.getppid()), "-o", "args"], capture_output=True, text=True).stdout.splitlines() if 0 == len (lines): return '' line = lines[0] words = line.split() if "bash" in line and len(words)>1: caller = words[1] else: caller = words[0].lstrip("-") if cols is None: del (os.environ['COLUMNS']) else: os.environ['COLUMNS'] = cols return os.path.basename(caller) #/** # ogHelp ["str_function" ["str_format" ["str_example" ... ]]] #@brief Muestra mensaje de ayuda para una función determinda. #@param str_function Nombre de la función. #@param str_format Formato de ejecución de la función. #@param str_example Ejemplo de ejecución de la función. #@return str_help - Salida de ayuda. #@note Si no se indican parámetros, la función se toma de la variable \c $FUNCNAME #@note La descripción de la función se toma de la variable compuesta por \c MSG_FUNC_$función incluida en el fichero de idiomas. #@note Pueden especificarse varios mensajes con ejemplos. #*/ def ogHelp (fname, fmt=None, examples=[]): FUNC = fname or inspect.stack()[1][3] MSG = f'ogGlobals.lang.MSG_HELP_{FUNC}' try: MSG = eval (MSG) except: MSG = '' ogEcho ([], "help", f"{ogGlobals.lang.MSG_FUNCTION} {FUNC}: {MSG}") if fmt: ogEcho([], "help", f" {ogGlobals.lang.MSG_FORMAT}: {fmt}") if type (examples) is list: for example in examples: ogEcho([], "help", f" {ogGlobals.lang.MSG_EXAMPLE}: {example}") else: ## string ogEcho([], "help", f" {ogGlobals.lang.MSG_EXAMPLE}: {examples}") #/** # ogRaiseError [str_logtype ...] int_errcode ["str_errmessage" ...] #@brief Devuelve el mensaje y el código de error correspondiente. #@param str_logtype tipo de registro de incidencias. #@param int_errcode código de error. #@param str_errmessage mensajes complementarios de error. #@return str_code - código de error #*/ def ogRaiseError (logtypes, code, msg): if code == ogGlobals.OG_ERR_FORMAT: MSG = f'{ogGlobals.lang.MSG_ERR_FORMAT} "{msg}"' elif code == ogGlobals.OG_ERR_NOTFOUND: MSG = f'{ogGlobals.lang.MSG_ERR_NOTFOUND} "{msg}"' elif code == ogGlobals.OG_ERR_OUTOFLIMIT: MSG = f'{ogGlobals.lang.MSG_ERR_OUTOFLIMIT} "{msg}"' elif code == ogGlobals.OG_ERR_PARTITION: MSG = f'{ogGlobals.lang.MSG_ERR_PARTITION} "{msg}"' elif code == ogGlobals.OG_ERR_LOCKED: MSG = f'{ogGlobals.lang.MSG_ERR_LOCKED} "{msg}"' elif code == ogGlobals.OG_ERR_CACHE: MSG = f'{ogGlobals.lang.MSG_ERR_CACHE} "{msg}"' elif code == ogGlobals.OG_ERR_NOGPT: MSG = f'{ogGlobals.lang.MSG_ERR_NOGPT} "{msg}"' elif code == ogGlobals.OG_ERR_REPO: MSG = f'{ogGlobals.lang.MSG_ERR_REPO} "{msg}"' elif code == ogGlobals.OG_ERR_FILESYS: MSG = f'{ogGlobals.lang.MSG_ERR_FILESYS} "{msg}"' elif code == ogGlobals.OG_ERR_IMAGE: MSG = f'{ogGlobals.lang.MSG_ERR_IMAGE} "{msg}"' elif code == ogGlobals.OG_ERR_NOTOS: MSG = f'{ogGlobals.lang.MSG_ERR_NOTOS} "{msg}"' elif code == ogGlobals.OG_ERR_NOTEXEC: MSG = f'{ogGlobals.lang.MSG_ERR_NOTEXEC} "{msg}"' elif code == ogGlobals.OG_ERR_NOTWRITE: MSG = f'{ogGlobals.lang.MSG_ERR_NOTWRITE} "{msg}"' elif code == ogGlobals.OG_ERR_NOTCACHE: MSG = f'{ogGlobals.lang.MSG_ERR_NOTCACHE} "{msg}"' elif code == ogGlobals.OG_ERR_CACHESIZE: MSG = f'{ogGlobals.lang.MSG_ERR_CACHESIZE} "{msg}"' elif code == ogGlobals.OG_ERR_REDUCEFS: MSG = f'{ogGlobals.lang.MSG_ERR_REDUCEFS} "{msg}"' elif code == ogGlobals.OG_ERR_EXTENDFS: MSG = f'{ogGlobals.lang.MSG_ERR_EXTENDFS} "{msg}"' elif code == ogGlobals.OG_ERR_IMGSIZEPARTITION: MSG = f'{ogGlobals.lang.MSG_ERR_IMGSIZEPARTITION} "{msg}"' elif code == ogGlobals.OG_ERR_UPDATECACHE: MSG = f'{ogGlobals.lang.MSG_ERR_UPDATECACHE} "{msg}"' elif code == ogGlobals.OG_ERR_DONTFORMAT: MSG = f'{ogGlobals.lang.MSG_ERR_DONTFORMAT} "{msg}"' elif code == ogGlobals.OG_ERR_IMAGEFILE: MSG = f'{ogGlobals.lang.MSG_ERR_IMAGEFILE} "{msg}"' elif code == ogGlobals.OG_ERR_UCASTSYNTAXT: MSG = f'{ogGlobals.lang.MSG_ERR_UCASTSYNTAXT} "{msg}"' elif code == ogGlobals.OG_ERR_UCASTSENDPARTITION: MSG = f'{ogGlobals.lang.MSG_ERR_UCASTSENDPARTITION} "{msg}"' elif code == ogGlobals.OG_ERR_UCASTSENDFILE: MSG = f'{ogGlobals.lang.MSG_ERR_UCASTSENDFILE} "{msg}"' elif code == ogGlobals.OG_ERR_UCASTRECEIVERPARTITION: MSG = f'{ogGlobals.lang.MSG_ERR_UCASTRECEIVERPARTITION} "{msg}"' elif code == ogGlobals.OG_ERR_UCASTRECEIVERFILE: MSG = f'{ogGlobals.lang.MSG_ERR_UCASTRECEIVERFILE} "{msg}"' elif code == ogGlobals.OG_ERR_MCASTSYNTAXT: MSG = f'{ogGlobals.lang.MSG_ERR_MCASTSYNTAXT} "{msg}"' elif code == ogGlobals.OG_ERR_MCASTSENDFILE: MSG = f'{ogGlobals.lang.MSG_ERR_MCASTSENDFILE} "{msg}"' elif code == ogGlobals.OG_ERR_MCASTRECEIVERFILE: MSG = f'{ogGlobals.lang.MSG_ERR_MCASTRECEIVERFILE} "{msg}"' elif code == ogGlobals.OG_ERR_MCASTSENDPARTITION: MSG = f'{ogGlobals.lang.MSG_ERR_MCASTSENDPARTITION} "{msg}"' elif code == ogGlobals.OG_ERR_MCASTRECEIVERPARTITION: MSG = f'{ogGlobals.lang.MSG_ERR_MCASTRECEIVERPARTITION} "{msg}"' elif code == ogGlobals.OG_ERR_PROTOCOLJOINMASTER: MSG = f'{ogGlobals.lang.MSG_ERR_PROTOCOLJOINMASTER} "{msg}"' elif code == ogGlobals.OG_ERR_DONTMOUNT_IMAGE: MSG = f'{ogGlobals.lang.MSG_ERR_DONTMOUNT_IMAGE} "{msg}"' elif code == ogGlobals.OG_ERR_DONTUNMOUNT_IMAGE: MSG = f'{ogGlobals.lang.MSG_ERR_DONTUNMOUNT_IMAGE} "{msg}"' elif code == ogGlobals.OG_ERR_DONTSYNC_IMAGE: MSG = f'{ogGlobals.lang.MSG_ERR_DONTSYNC_IMAGE} "{msg}"' elif code == ogGlobals.OG_ERR_NOTDIFFERENT: MSG = f'{ogGlobals.lang.MSG_ERR_NOTDIFFERENT} "{msg}"' elif code == ogGlobals.OG_ERR_SYNCHRONIZING: MSG = f'{ogGlobals.lang.MSG_ERR_SYNCHRONIZING} "{msg}"' elif code == ogGlobals.OG_ERR_NOTUEFI: MSG = f'{ogGlobals.lang.MSG_ERR_NOTUEFI} "{msg}"' elif code == ogGlobals.OG_ERR_NOMSDOS: MSG = f'{ogGlobals.lang.MSG_ERR_NOMSDOS} "{msg}"' elif code == ogGlobals.OG_ERR_NOTBIOS: MSG = f'{ogGlobals.lang.MSG_ERR_NOTBIOS} "{msg}"' else: MSG = ogGlobals.lang.MSG_ERR_GENERIC CODE = ogGlobals.OG_ERR_GENERIC # Obtener lista de funciones afectadas, incluyendo el script que las llama. call_stack = [i[3] for i in inspect.stack()] if len (call_stack) < 2: return ## shouldn't happen call_stack.pop() ## remove '' call_stack.pop(0) ## remove 'ogRaiseError' str_call_stack = ' '.join (call_stack) # Mostrar mensaje de error si es función depurable y salir con el código indicado. if code == ogGlobals.OG_ERR_FORMAT or \ (str_call_stack in ogGlobals.NODEBUGFUNCTIONS) or \ not (len(call_stack)>0 and (call_stack[0] in ogGlobals.NODEBUGFUNCTIONS)): ogEcho (logtypes, "error", f"{str_call_stack.replace(' ', '<-')}: {MSG}") return code #/** # ogIsRepoLocked #@brief Comprueba si el repositorio está siendo usado (tiene ficheros abiertos). #@param No. #@return Código de salida: 0 - bloqueado, 1 - sin bloquear o error. #*/ def ogIsRepoLocked(): # No hacer nada, si no está definido el punto de montaje del repositorio. if not ogGlobals.OGIMG: return False # Comprobar si alguno de los ficheros abiertos por los procesos activos está en el # punto de montaje del repositorio de imágenes. proc_entries = glob.glob ('/proc/[0-9]*/fd/*') for e in proc_entries: p = os.path.realpath (e) if ogGlobals.OGIMG in p: return True return False ## has no users #def ogCheckProgram(program): # FUNCNAME = ogCheckProgram.__name__ # # if not program or not isinstance(program, str): # ogRaiseError ("session", ogGlobals.OG_ERR_FORMAT, f"Error: {ogGlobals.lang.MSG_ERR_FORMAT} {FUNCNAME} \"program\"") # return # # if not shutil.which(program): # ogRaiseError ( "session", ogGlobals.OG_ERR_NOTEXEC, f"Error: The program '{program}' is not available on the system.") # return # # return 0 def ogIsVirtualMachine(): output = subprocess.run (["dmidecode", "-s", "system-product-name"], capture_output=True, text=True).stdout return "KVM" in output or "VirtualBox" in output