ogclone-engine/client/lib/engine/bin/SystemLib.py

297 lines
13 KiB
Python

import subprocess
import datetime
from zoneinfo import ZoneInfo
import sys
import os
import shutil
import ogGlobals
#from engine.DiskLib import *
#from engine.CacheLib import *
#from engine.StringLib import *
from StringLib import *
#NODEBUGFUNCTIONS, OGIMG, OG_ERR_CACHESIZE, OG_ERR_NOTCACHE, OG_ERR_NOTWRITE, OG_ERR_FILESYS
#OG_ERR_REPO, OG_ERR_NOTOS, OG_ERR_NOGPT, OG_ERR_OUTOFLIMIT, OG_ERR_IMAGE, OG_ERR_CACHE
#OGLOGSESSION, OGLOGCOMMAND, OGLOGFILE, OG_ERR_LOCKED, OG_ERR_PARTITION, OG_ERR_FORMAT, OG_ERR_NOTEXEC, OG_ERR_NOTFOUND
def _logtype2logfile (t):
if 'log' == t: return ogGlobals.OGLOGFILE
elif 'command' == t: return ogGlobals.OGLOGCOMMAND
elif 'session' == t: return ogGlobals.OGLOGSESSION
else: raise Exception (f'unknown log type ({t})')
#/**
# ogEcho [str_logtype ...] [str_loglevel] "str_message" ...
#@brief Muestra mensajes en consola y lo registra en fichero de incidencias.
#@param str_logtype tipo de registro de incidencias ("log", "command", "session")
#@param str_loglevel nivel de registro de incidencias ("info", "warning", "error")
#@param str_message mensaje (puede recibir más de 1 parámetro.
#@return Mensaje mostrado.
#*/
## zero or more logtypes can be specified
## zero or one loglevel can be specified
## ogEcho ([], None, msg)
## ogEcho ('log', None, msg)
## ogEcho ('session', None, msg)
## ogEcho (['log', 'session'], None, msg)
## ogEcho ([], None, 'info', msg)
## ogEcho ('log', None, 'info', msg)
## ogEcho ('session', None, 'info', msg)
## ogEcho (['log', 'session'], 'info', msg)
def ogEcho (logtypes, loglevel, msg):
logfiles = []
if type (logtypes) is list:
for l in logtypes:
logfiles.append (_logtype2logfile (l))
else: ## string
logfiles.append (_logtype2logfile (logtypes))
if loglevel is None or 'help' == loglevel:
if ogGlobals.DEBUG.lower() != "no":
logfiles.append (ogGlobals.OGLOGFILE)
for f in logfiles:
with open (f, 'a') as fd:
fd.write (msg + '\n')
return
if 'info' == loglevel or 'warning' == loglevel or 'error' == loglevel:
DATETIME = datetime.datetime.now(ZoneInfo(ogGlobals.TZ)).strftime("%F %T %Z")
for f in logfiles:
with open (f, 'a') as fd:
fd.write (f"OpenGnsys {loglevel} {DATETIME} {msg}\n")
else:
raise Exception (f'unknown loglevel ({loglevel})')
def ogExecAndLog(*args):
# Variables locales
ISCOMMAND = False
ISLOG = False
ISSESSION = False
COMMAND = ""
CONTINUE = 1
FILES = ""
REDIREC = ""
FUNCNAME = ogExecAndLog.__name__
# Si se solicita, mostrar ayuda.
if len(args) > 0 and args[0] == "help":
ogHelp(f"{FUNCNAME} str_logfile ... str_command ...",
f"{FUNCNAME} COMMAND ls -al /")
return
# Procesar parámetros.
while CONTINUE:
arg = args.pop(0).lower()
if arg == "command":
ISCOMMAND = True
continue
elif arg == "log":
ISLOG = True
continue
elif arg == "session":
ISSESSION = True
continue
else:
COMMAND = " ".join(args)
CONTINUE = 0
# Error si no se recibe un comando que ejecutar.
if not COMMAND:
ogRaiseError(OG_ERR_FORMAT)
return
# Componer lista de ficheros de registro.
if ISCOMMAND:
FILES = OGLOGCOMMAND
open(FILES, "w").close()
REDIREC = "2>&1"
if ISLOG:
FILES += " " + OGLOGFILE
if ISSESSION:
FILES += " " + OGLOGSESSION
# Ejecutar comando.
subprocess.call(f"{COMMAND} {REDIREC} | tee -a {FILES}", shell=True)
# Salida de error del comando ejecutado.
return subprocess.PIPESTATUS[0]
#/**
# ogGetCaller
#@brief Devuelve nombre del programa o script ejecutor (padre).
#@return str_name - Nombre del programa ejecutor.
#*/
def ogGetCaller():
if 'COLUMNS' in os.environ:
cols = os.environ['COLUMNS']
else:
cols = None
lines = subprocess.run (["ps", "hp", str(os.getppid()), "-o", "args"], capture_output=True, text=True).stdout.splitlines()
if 0 == len (lines):
return ''
line = lines[0]
words = line.split()
if "bash" in line and len(words)>1:
caller = words[1]
else:
caller = words[0].lstrip("-")
if cols is None:
del (os.environ['COLUMNS'])
else:
os.environ['COLUMNS'] = cols
return os.path.basename(caller)
import inspect
#/**
# ogHelp ["str_function" ["str_format" ["str_example" ... ]]]
#@brief Muestra mensaje de ayuda para una función determinda.
#@param str_function Nombre de la función.
#@param str_format Formato de ejecución de la función.
#@param str_example Ejemplo de ejecución de la función.
#@return str_help - Salida de ayuda.
#@note Si no se indican parámetros, la función se toma de la variable \c $FUNCNAME
#@note La descripción de la función se toma de la variable compuesta por \c MSG_FUNC_$función incluida en el fichero de idiomas.
#@note Pueden especificarse varios mensajes con ejemplos.
#*/
def ogHelp (fname, fmt=None, examples=[]):
FUNC = fname or inspect.stack()[1][3]
MSG = f'ogGlobals.lang.MSG_HELP_{FUNC}'
ogEcho ([], "help", f"{ogGlobals.lang.MSG_FUNCTION} {FUNC}: {eval (MSG)}")
if fmt:
ogEcho([], "help", f" {ogGlobals.lang.MSG_FORMAT}: {fmt}")
if type (examples) is list:
for example in examples:
ogEcho([], "help", f" {ogGlobals.lang.MSG_EXAMPLE}: {example}")
else: ## string
ogEcho([], "help", f" {ogGlobals.lang.MSG_EXAMPLE}: {examples}")
#/**
# ogRaiseError [str_logtype ...] int_errcode ["str_errmessage" ...]
#@brief Devuelve el mensaje y el código de error correspondiente.
#@param str_logtype tipo de registro de incidencias.
#@param int_errcode código de error.
#@param str_errmessage mensajes complementarios de error.
#@return str_code - código de error
#*/
def ogRaiseError (logtypes, code, msg):
if type (logtypes) is str and 'help' == logtypes:
ogHelp (
'ogRaiseError',
'ogRaiseError ([str_logfile, ...], int_errorcode, str_errormessage)',
[
'ogRaiseError ("log", 42, "my error message")',
'ogRaiseError (["log", "session"], 43, "my other error message")',
]
)
return
if code == ogGlobals.OG_ERR_FORMAT: MSG = f'{ogGlobals.lang.MSG_ERR_FORMAT} "{msg}"'
elif code == ogGlobals.OG_ERR_NOTFOUND: MSG = f'{ogGlobals.lang.MSG_ERR_NOTFOUND} "{msg}"'
elif code == ogGlobals.OG_ERR_OUTOFLIMIT: MSG = f'{ogGlobals.lang.MSG_ERR_OUTOFLIMIT} "{msg}"'
elif code == ogGlobals.OG_ERR_PARTITION: MSG = f'{ogGlobals.lang.MSG_ERR_PARTITION} "{msg}"'
elif code == ogGlobals.OG_ERR_LOCKED: MSG = f'{ogGlobals.lang.MSG_ERR_LOCKED} "{msg}"'
elif code == ogGlobals.OG_ERR_CACHE: MSG = f'{ogGlobals.lang.MSG_ERR_CACHE} "{msg}"'
elif code == ogGlobals.OG_ERR_NOGPT: MSG = f'{ogGlobals.lang.MSG_ERR_NOGPT} "{msg}"'
elif code == ogGlobals.OG_ERR_REPO: MSG = f'{ogGlobals.lang.MSG_ERR_REPO} "{msg}"'
elif code == ogGlobals.OG_ERR_FILESYS: MSG = f'{ogGlobals.lang.MSG_ERR_FILESYS} "{msg}"'
elif code == ogGlobals.OG_ERR_IMAGE: MSG = f'{ogGlobals.lang.MSG_ERR_IMAGE} "{msg}"'
elif code == ogGlobals.OG_ERR_NOTOS: MSG = f'{ogGlobals.lang.MSG_ERR_NOTOS} "{msg}"'
elif code == ogGlobals.OG_ERR_NOTEXEC: MSG = f'{ogGlobals.lang.MSG_ERR_NOTEXEC} "{msg}"'
elif code == ogGlobals.OG_ERR_NOTWRITE: MSG = f'{ogGlobals.lang.MSG_ERR_NOTWRITE} "{msg}"'
elif code == ogGlobals.OG_ERR_NOTCACHE: MSG = f'{ogGlobals.lang.MSG_ERR_NOTCACHE} "{msg}"'
elif code == ogGlobals.OG_ERR_CACHESIZE: MSG = f'{ogGlobals.lang.MSG_ERR_CACHESIZE} "{msg}"'
elif code == ogGlobals.OG_ERR_REDUCEFS: MSG = f'{ogGlobals.lang.MSG_ERR_REDUCEFS} "{msg}"'
elif code == ogGlobals.OG_ERR_EXTENDFS: MSG = f'{ogGlobals.lang.MSG_ERR_EXTENDFS} "{msg}"'
elif code == ogGlobals.OG_ERR_IMGSIZEPARTITION: MSG = f'{ogGlobals.lang.MSG_ERR_IMGSIZEPARTITION} "{msg}"'
elif code == ogGlobals.OG_ERR_UPDATECACHE: MSG = f'{ogGlobals.lang.MSG_ERR_UPDATECACHE} "{msg}"'
elif code == ogGlobals.OG_ERR_DONTFORMAT: MSG = f'{ogGlobals.lang.MSG_ERR_DONTFORMAT} "{msg}"'
elif code == ogGlobals.OG_ERR_IMAGEFILE: MSG = f'{ogGlobals.lang.MSG_ERR_IMAGEFILE} "{msg}"'
elif code == ogGlobals.OG_ERR_UCASTSYNTAXT: MSG = f'{ogGlobals.lang.MSG_ERR_UCASTSYNTAXT} "{msg}"'
elif code == ogGlobals.OG_ERR_UCASTSENDPARTITION: MSG = f'{ogGlobals.lang.MSG_ERR_UCASTSENDPARTITION} "{msg}"'
elif code == ogGlobals.OG_ERR_UCASTSENDFILE: MSG = f'{ogGlobals.lang.MSG_ERR_UCASTSENDFILE} "{msg}"'
elif code == ogGlobals.OG_ERR_UCASTRECEIVERPARTITION: MSG = f'{ogGlobals.lang.MSG_ERR_UCASTRECEIVERPARTITION} "{msg}"'
elif code == ogGlobals.OG_ERR_UCASTRECEIVERFILE: MSG = f'{ogGlobals.lang.MSG_ERR_UCASTRECEIVERFILE} "{msg}"'
elif code == ogGlobals.OG_ERR_MCASTSYNTAXT: MSG = f'{ogGlobals.lang.MSG_ERR_MCASTSYNTAXT} "{msg}"'
elif code == ogGlobals.OG_ERR_MCASTSENDFILE: MSG = f'{ogGlobals.lang.MSG_ERR_MCASTSENDFILE} "{msg}"'
elif code == ogGlobals.OG_ERR_MCASTRECEIVERFILE: MSG = f'{ogGlobals.lang.MSG_ERR_MCASTRECEIVERFILE} "{msg}"'
elif code == ogGlobals.OG_ERR_MCASTSENDPARTITION: MSG = f'{ogGlobals.lang.MSG_ERR_MCASTSENDPARTITION} "{msg}"'
elif code == ogGlobals.OG_ERR_MCASTRECEIVERPARTITION: MSG = f'{ogGlobals.lang.MSG_ERR_MCASTRECEIVERPARTITION} "{msg}"'
elif code == ogGlobals.OG_ERR_PROTOCOLJOINMASTER: MSG = f'{ogGlobals.lang.MSG_ERR_PROTOCOLJOINMASTER} "{msg}"'
elif code == ogGlobals.OG_ERR_DONTMOUNT_IMAGE: MSG = f'{ogGlobals.lang.MSG_ERR_DONTMOUNT_IMAGE} "{msg}"'
elif code == ogGlobals.OG_ERR_DONTUNMOUNT_IMAGE: MSG = f'{ogGlobals.lang.MSG_ERR_DONTUNMOUNT_IMAGE} "{msg}"'
elif code == ogGlobals.OG_ERR_DONTSYNC_IMAGE: MSG = f'{ogGlobals.lang.MSG_ERR_DONTSYNC_IMAGE} "{msg}"'
elif code == ogGlobals.OG_ERR_NOTDIFFERENT: MSG = f'{ogGlobals.lang.MSG_ERR_NOTDIFFERENT} "{msg}"'
elif code == ogGlobals.OG_ERR_SYNCHRONIZING: MSG = f'{ogGlobals.lang.MSG_ERR_SYNCHRONIZING} "{msg}"'
elif code == ogGlobals.OG_ERR_NOTUEFI: MSG = f'{ogGlobals.lang.MSG_ERR_NOTUEFI} "{msg}"'
elif code == ogGlobals.OG_ERR_NOMSDOS: MSG = f'{ogGlobals.lang.MSG_ERR_NOMSDOS} "{msg}"'
elif code == ogGlobals.OG_ERR_NOTBIOS: MSG = f'{ogGlobals.lang.MSG_ERR_NOTBIOS} "{msg}"'
else:
MSG = ogGlobals.lang.MSG_ERR_GENERIC
CODE = ogGlobals.OG_ERR_GENERIC
call_stack = [i[3] for i in inspect.stack()]
if len (call_stack) < 3: return ## shouldn't happen
call_stack.pop() ## remove '<module>'
call_stack.pop(0) ## remove 'ogRaiseError'
str_call_stack = ' '.join (call_stack)
if code == ogGlobals.OG_ERR_FORMAT or \
ogCheckStringInGroup (str_call_stack, ogGlobals.NODEBUGFUNCTIONS) or \
not ogCheckStringInGroup (call_stack[0], ogGlobals.NODEBUGFUNCTIONS):
ogEcho (logtypes, "error", f"{str_call_stack.replace(' ', '<-')}: {MSG}")
return code
def ogIsRepoLocked():
# Variables locales
FILES = ""
FUNCNAME = ogIsRepoLocked.__name__
# Si se solicita, mostrar ayuda.
if len(sys.argv) > 1 and sys.argv[1] == "help":
ogHelp(f"{FUNCNAME}", f"{FUNCNAME}", f"if {FUNCNAME}(): ...")
# No hacer nada, si no está definido el punto de montaje del repositorio.
if not OGIMG:
return 1
# Comprobar si alguno de los ficheros abiertos por los procesos activos está en el
# punto de montaje del repositorio de imágenes.
FILES = subprocess.check_output(["find", "/proc", "-maxdepth", "2", "-type", "f", "-lname", f"{OGIMG}/*"]).decode("utf-8")
return bool(FILES)
def ogCheckProgram(*args):
FUNCNAME = ogCheckProgram.__name__
# Si se solicita, mostrar ayuda.
if len(args) > 0 and args[0] == "help":
ogHelp(f"{FUNCNAME} \"str_program ...\"",
f"{FUNCNAME} \"partimage partclone mbuffer\"")
return
# Error si no se recibe 1 parámetro.
if len(args) != 1:
ogRaiseError(OG_ERR_FORMAT)
return
PERROR = 0
PLOG = " "
for i in args[0].split():
if not shutil.which(i):
PERROR = 1
PLOG += f" {i}"
if PERROR == 1:
ogRaiseError(OG_ERR_NOTEXEC, PLOG)
return
else:
return 0
def ogIsVirtualMachine():
output = subprocess.run (["dmidecode", "-s", "system-product-name"], capture_output=True, text=True).stdout
return "KVM" in output or "VirtualBox" in output