ogclone-engine/ogclient/lib/python3/SystemLib.py

320 lines
14 KiB
Python

import subprocess
import datetime
from zoneinfo import ZoneInfo
import sys
import os
import select
import json
import shutil
import inspect
import glob
import ogGlobals
import StringLib
def _logtype2logfile (t):
if 'log' == t.lower(): return ogGlobals.OGLOGFILE
if 'jsonlog' == t.lower(): return ogGlobals.OGJSONLOGFILE
elif 'command' == t.lower(): return ogGlobals.OGLOGCOMMAND
elif 'session' == t.lower(): return ogGlobals.OGLOGSESSION
else: raise Exception (f'unknown log type ({t})')
#/**
# ogEcho [str_logtype ...] [str_loglevel] "str_message" ...
#@brief Muestra mensajes en consola y lo registra en fichero de incidencias.
#@param str_logtype tipo de registro de incidencias ("log", "command", "session")
#@param str_loglevel nivel de registro de incidencias ("info", "warning", "error")
#@param str_message mensaje (puede recibir más de 1 parámetro.
#@return Mensaje mostrado.
#*/
def ogEcho (logtypes, loglevel, msg):
logfiles = ['/dev/stdout']
if type (logtypes) is list:
for l in logtypes:
if 'log' == l:
logfiles.append (_logtype2logfile ('log'))
logfiles.append (_logtype2logfile ('jsonlog'))
else:
logfiles.append (_logtype2logfile (l))
else: ## string
if 'log' == logtypes:
logfiles.append (_logtype2logfile ('log'))
logfiles.append (_logtype2logfile ('jsonlog'))
else:
logfiles.append (_logtype2logfile (logtypes))
if loglevel is None or 'help' == loglevel:
if ogGlobals.DEBUG.lower() != "no":
logfiles.append (ogGlobals.OGLOGFILE)
logfiles.append (ogGlobals.OGJSONLOGFILE)
for f in logfiles:
with open (f, 'a') as fd:
if ogGlobals.OGJSONLOGFILE == f:
fd.write (json.dumps ({'message':msg}) + '\n')
else:
fd.write (msg + '\n')
return
if 'info' == loglevel or 'warning' == loglevel or 'error' == loglevel:
DATETIME = datetime.datetime.now (ZoneInfo (ogGlobals.TZ)).strftime ('%F %T %Z')
DATETIME_json = datetime.datetime.now (ZoneInfo (ogGlobals.TZ)).strftime ('%Y-%m-%d %H:%M:%S')
for f in logfiles:
with open (f, 'a') as fd:
if ogGlobals.OGJSONLOGFILE == f:
fd.write (json.dumps ({'timestamp':DATETIME_json, 'severity':loglevel, 'message':msg}) + '\n')
else:
fd.write (f"OpenGnsys {loglevel} {DATETIME} {msg}\n")
else:
raise Exception (f'unknown loglevel ({loglevel})')
#/**
# ogExecAndLog str_logfile ... str_command ...
#@brief Ejecuta un comando y guarda su salida en fichero de registro.
#@param str_logfile fichero de registro (pueden ser varios).
#@param str_command comando y comandos a ejecutar.
#@return Salida de ejecución del comando.
#@note str_logfile = { LOG, SESSION, COMMAND }
#*/
#ogExecAndLog (str_logfile ... str_command ...",
#ogExecAndLog ([], ['/path/to/script', *args])
#ogExecAndLog ('command', ['/path/to/script', *args])
#ogExecAndLog (['command'], ['/path/to/script', *args])
#ogExecAndLog (['log', 'command'], ['/path/to/script', *args])
def ogExecAndLog (logtypes, script_and_args):
logfiles = ['/dev/stdout']
if type (logtypes) is list:
for l in logtypes:
logtypes = list (map (lambda x: x.lower(), logtypes))
if 'log' == l:
logfiles.append (_logtype2logfile ('log'))
logfiles.append (_logtype2logfile ('jsonlog'))
else:
logfiles.append (_logtype2logfile (l))
else: ## string
logtypes = logtypes.lower()
if 'log' == logtypes:
logfiles.append (_logtype2logfile ('log'))
logfiles.append (_logtype2logfile ('jsonlog'))
else:
logfiles.append (_logtype2logfile (logtypes))
if not script_and_args:
ogRaiseError ([], ogGlobals.OG_ERR_FORMAT, 'no function provided')
return
## the original bash code does something like this:
#if [ $ISCOMMAND ]; then
# > $OGLOGCOMMAND
# REDIREC="2>&1"
#fi
#eval $COMMAND $REDIREC | tee -a $FILES
## a hybrid bash/python pseudocode would end up being like the following:
#if 'command' in logtypes:
# rm $OGLOGCOMMAND
# touch $OGLOGCOMMAND
#
#if 'command' in logtypes:
# ## redirect both stdout and stderr
# eval $COMMAND 2>&1 | tee -a $FILES
#else:
# ## redirect stdout only
# eval $COMMAND | tee -a $FILES
capture_stderr = False
if 'command' in logtypes:
os.unlink (ogGlobals.OGLOGCOMMAND)
open (ogGlobals.OGLOGCOMMAND, 'w').close()
capture_stderr = True
p = subprocess.Popen (script_and_args, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
while True:
ready_to_read, _, _ = select.select ([p.stdout, p.stderr], [], [], 1)
partial_out = ''
if p.stdout in ready_to_read:
l = p.stdout.readline()
partial_out += l
if p.stderr in ready_to_read:
l = p.stderr.readline() ## always read from stderr even if we're discarding it, to prevent buffers from filling up
if capture_stderr:
partial_out += l
if partial_out:
for f in logfiles:
with open (f, 'a') as fd:
if ogGlobals.OGJSONLOGFILE == f:
fd.write (json.dumps ({'message':partial_out}))
else:
fd.write (partial_out)
if p.poll() is not None:
break
rc = p.returncode
return not rc ## negate shell return code
#/**
# ogGetCaller
#@brief Devuelve nombre del programa o script ejecutor (padre).
#@return str_name - Nombre del programa ejecutor.
#*/
def ogGetCaller():
if 'COLUMNS' in os.environ:
cols = os.environ['COLUMNS']
else:
cols = None
lines = subprocess.run (["ps", "hp", str(os.getppid()), "-o", "args"], capture_output=True, text=True).stdout.splitlines()
if 0 == len (lines):
return ''
line = lines[0]
words = line.split()
if "bash" in line and len(words)>1:
caller = words[1]
else:
caller = words[0].lstrip("-")
if cols is None:
del (os.environ['COLUMNS'])
else:
os.environ['COLUMNS'] = cols
return os.path.basename(caller)
#/**
# ogHelp ["str_function" ["str_format" ["str_example" ... ]]]
#@brief Muestra mensaje de ayuda para una función determinda.
#@param str_function Nombre de la función.
#@param str_format Formato de ejecución de la función.
#@param str_example Ejemplo de ejecución de la función.
#@return str_help - Salida de ayuda.
#@note Si no se indican parámetros, la función se toma de la variable \c $FUNCNAME
#@note La descripción de la función se toma de la variable compuesta por \c MSG_FUNC_$función incluida en el fichero de idiomas.
#@note Pueden especificarse varios mensajes con ejemplos.
#*/
def ogHelp (fname, fmt=None, examples=[]):
FUNC = fname or inspect.stack()[1][3]
MSG = f'ogGlobals.lang.MSG_HELP_{FUNC}'
try:
MSG = eval (MSG)
except:
MSG = ''
ogEcho ([], "help", f"{ogGlobals.lang.MSG_FUNCTION} {FUNC}: {MSG}")
if fmt:
ogEcho([], "help", f" {ogGlobals.lang.MSG_FORMAT}: {fmt}")
if type (examples) is list:
for example in examples:
ogEcho([], "help", f" {ogGlobals.lang.MSG_EXAMPLE}: {example}")
else: ## string
ogEcho([], "help", f" {ogGlobals.lang.MSG_EXAMPLE}: {examples}")
#/**
# ogRaiseError [str_logtype ...] int_errcode ["str_errmessage" ...]
#@brief Devuelve el mensaje y el código de error correspondiente.
#@param str_logtype tipo de registro de incidencias.
#@param int_errcode código de error.
#@param str_errmessage mensajes complementarios de error.
#@return str_code - código de error
#*/
def ogRaiseError (logtypes, code, msg):
if code == ogGlobals.OG_ERR_FORMAT: MSG = f'{ogGlobals.lang.MSG_ERR_FORMAT} "{msg}"'
elif code == ogGlobals.OG_ERR_NOTFOUND: MSG = f'{ogGlobals.lang.MSG_ERR_NOTFOUND} "{msg}"'
elif code == ogGlobals.OG_ERR_OUTOFLIMIT: MSG = f'{ogGlobals.lang.MSG_ERR_OUTOFLIMIT} "{msg}"'
elif code == ogGlobals.OG_ERR_PARTITION: MSG = f'{ogGlobals.lang.MSG_ERR_PARTITION} "{msg}"'
elif code == ogGlobals.OG_ERR_LOCKED: MSG = f'{ogGlobals.lang.MSG_ERR_LOCKED} "{msg}"'
elif code == ogGlobals.OG_ERR_CACHE: MSG = f'{ogGlobals.lang.MSG_ERR_CACHE} "{msg}"'
elif code == ogGlobals.OG_ERR_NOGPT: MSG = f'{ogGlobals.lang.MSG_ERR_NOGPT} "{msg}"'
elif code == ogGlobals.OG_ERR_REPO: MSG = f'{ogGlobals.lang.MSG_ERR_REPO} "{msg}"'
elif code == ogGlobals.OG_ERR_FILESYS: MSG = f'{ogGlobals.lang.MSG_ERR_FILESYS} "{msg}"'
elif code == ogGlobals.OG_ERR_IMAGE: MSG = f'{ogGlobals.lang.MSG_ERR_IMAGE} "{msg}"'
elif code == ogGlobals.OG_ERR_NOTOS: MSG = f'{ogGlobals.lang.MSG_ERR_NOTOS} "{msg}"'
elif code == ogGlobals.OG_ERR_NOTEXEC: MSG = f'{ogGlobals.lang.MSG_ERR_NOTEXEC} "{msg}"'
elif code == ogGlobals.OG_ERR_NOTWRITE: MSG = f'{ogGlobals.lang.MSG_ERR_NOTWRITE} "{msg}"'
elif code == ogGlobals.OG_ERR_NOTCACHE: MSG = f'{ogGlobals.lang.MSG_ERR_NOTCACHE} "{msg}"'
elif code == ogGlobals.OG_ERR_CACHESIZE: MSG = f'{ogGlobals.lang.MSG_ERR_CACHESIZE} "{msg}"'
elif code == ogGlobals.OG_ERR_REDUCEFS: MSG = f'{ogGlobals.lang.MSG_ERR_REDUCEFS} "{msg}"'
elif code == ogGlobals.OG_ERR_EXTENDFS: MSG = f'{ogGlobals.lang.MSG_ERR_EXTENDFS} "{msg}"'
elif code == ogGlobals.OG_ERR_IMGSIZEPARTITION: MSG = f'{ogGlobals.lang.MSG_ERR_IMGSIZEPARTITION} "{msg}"'
elif code == ogGlobals.OG_ERR_UPDATECACHE: MSG = f'{ogGlobals.lang.MSG_ERR_UPDATECACHE} "{msg}"'
elif code == ogGlobals.OG_ERR_DONTFORMAT: MSG = f'{ogGlobals.lang.MSG_ERR_DONTFORMAT} "{msg}"'
elif code == ogGlobals.OG_ERR_IMAGEFILE: MSG = f'{ogGlobals.lang.MSG_ERR_IMAGEFILE} "{msg}"'
elif code == ogGlobals.OG_ERR_UCASTSYNTAXT: MSG = f'{ogGlobals.lang.MSG_ERR_UCASTSYNTAXT} "{msg}"'
elif code == ogGlobals.OG_ERR_UCASTSENDPARTITION: MSG = f'{ogGlobals.lang.MSG_ERR_UCASTSENDPARTITION} "{msg}"'
elif code == ogGlobals.OG_ERR_UCASTSENDFILE: MSG = f'{ogGlobals.lang.MSG_ERR_UCASTSENDFILE} "{msg}"'
elif code == ogGlobals.OG_ERR_UCASTRECEIVERPARTITION: MSG = f'{ogGlobals.lang.MSG_ERR_UCASTRECEIVERPARTITION} "{msg}"'
elif code == ogGlobals.OG_ERR_UCASTRECEIVERFILE: MSG = f'{ogGlobals.lang.MSG_ERR_UCASTRECEIVERFILE} "{msg}"'
elif code == ogGlobals.OG_ERR_MCASTSYNTAXT: MSG = f'{ogGlobals.lang.MSG_ERR_MCASTSYNTAXT} "{msg}"'
elif code == ogGlobals.OG_ERR_MCASTSENDFILE: MSG = f'{ogGlobals.lang.MSG_ERR_MCASTSENDFILE} "{msg}"'
elif code == ogGlobals.OG_ERR_MCASTRECEIVERFILE: MSG = f'{ogGlobals.lang.MSG_ERR_MCASTRECEIVERFILE} "{msg}"'
elif code == ogGlobals.OG_ERR_MCASTSENDPARTITION: MSG = f'{ogGlobals.lang.MSG_ERR_MCASTSENDPARTITION} "{msg}"'
elif code == ogGlobals.OG_ERR_MCASTRECEIVERPARTITION: MSG = f'{ogGlobals.lang.MSG_ERR_MCASTRECEIVERPARTITION} "{msg}"'
elif code == ogGlobals.OG_ERR_PROTOCOLJOINMASTER: MSG = f'{ogGlobals.lang.MSG_ERR_PROTOCOLJOINMASTER} "{msg}"'
elif code == ogGlobals.OG_ERR_DONTMOUNT_IMAGE: MSG = f'{ogGlobals.lang.MSG_ERR_DONTMOUNT_IMAGE} "{msg}"'
elif code == ogGlobals.OG_ERR_DONTUNMOUNT_IMAGE: MSG = f'{ogGlobals.lang.MSG_ERR_DONTUNMOUNT_IMAGE} "{msg}"'
elif code == ogGlobals.OG_ERR_DONTSYNC_IMAGE: MSG = f'{ogGlobals.lang.MSG_ERR_DONTSYNC_IMAGE} "{msg}"'
elif code == ogGlobals.OG_ERR_NOTDIFFERENT: MSG = f'{ogGlobals.lang.MSG_ERR_NOTDIFFERENT} "{msg}"'
elif code == ogGlobals.OG_ERR_SYNCHRONIZING: MSG = f'{ogGlobals.lang.MSG_ERR_SYNCHRONIZING} "{msg}"'
elif code == ogGlobals.OG_ERR_NOTUEFI: MSG = f'{ogGlobals.lang.MSG_ERR_NOTUEFI} "{msg}"'
elif code == ogGlobals.OG_ERR_NOMSDOS: MSG = f'{ogGlobals.lang.MSG_ERR_NOMSDOS} "{msg}"'
elif code == ogGlobals.OG_ERR_NOTBIOS: MSG = f'{ogGlobals.lang.MSG_ERR_NOTBIOS} "{msg}"'
else:
MSG = ogGlobals.lang.MSG_ERR_GENERIC
CODE = ogGlobals.OG_ERR_GENERIC
# Obtener lista de funciones afectadas, incluyendo el script que las llama.
call_stack = [i[3] for i in inspect.stack()]
if len (call_stack) < 2: return ## shouldn't happen
call_stack.pop() ## remove '<module>'
call_stack.pop(0) ## remove 'ogRaiseError'
str_call_stack = ' '.join (call_stack)
# Mostrar mensaje de error si es función depurable y salir con el código indicado.
if code == ogGlobals.OG_ERR_FORMAT or \
(str_call_stack in ogGlobals.NODEBUGFUNCTIONS) or \
not (len(call_stack)>0 and (call_stack[0] in ogGlobals.NODEBUGFUNCTIONS)):
ogEcho (logtypes, "error", f"{str_call_stack.replace(' ', '<-')}: {MSG}")
return code
#/**
# ogIsRepoLocked
#@brief Comprueba si el repositorio está siendo usado (tiene ficheros abiertos).
#@param No.
#@return Código de salida: 0 - bloqueado, 1 - sin bloquear o error.
#*/
def ogIsRepoLocked():
# No hacer nada, si no está definido el punto de montaje del repositorio.
if not ogGlobals.OGIMG:
return False
# Comprobar si alguno de los ficheros abiertos por los procesos activos está en el
# punto de montaje del repositorio de imágenes.
proc_entries = glob.glob ('/proc/[0-9]*/fd/*')
for e in proc_entries:
p = os.path.realpath (e)
if ogGlobals.OGIMG in p:
return True
return False
## has no users
#def ogCheckProgram(program):
# FUNCNAME = ogCheckProgram.__name__
#
# if not program or not isinstance(program, str):
# ogRaiseError ("session", ogGlobals.OG_ERR_FORMAT, f"Error: {ogGlobals.lang.MSG_ERR_FORMAT} {FUNCNAME} \"program\"")
# return
#
# if not shutil.which(program):
# ogRaiseError ( "session", ogGlobals.OG_ERR_NOTEXEC, f"Error: The program '{program}' is not available on the system.")
# return
#
# return 0
def ogIsVirtualMachine():
output = subprocess.run (["dmidecode", "-s", "system-product-name"], capture_output=True, text=True).stdout
return "KVM" in output or "VirtualBox" in output