ogclone-engine/client/lib/engine/bin/FileLib.py

293 lines
9.3 KiB
Python

import subprocess
import os
import shutil
print (">>>>>>>>>>>>>>>>>>>> Load ", __name__, " <<<<<<<<<<<<<<<<<<<<<<")
def ogCalculateChecksum(*args):
# Check if help is requested
if "help" in args:
print("ogCalculateChecksum [ str_repo | int_ndisk int_npartition ] path_filepath")
print("ogCalculateChecksum REPO ubuntu.img ==> ef899299caf8b517ce36f1157a93d8bf")
return
# Get the file path
file_path = ogGetPath(*args)
if not file_path:
ogRaiseError(OG_ERR_NOTFOUND, *args)
return
# Calculate the checksum
result = subprocess.run(["tail", "-c1M", file_path], capture_output=True)
checksum = result.stdout.decode().split()[0]
return checksum
def ogCompareChecksumFiles(*args):
# Variables locales.
ARGS = args
if "help" in args:
ogHelp("$FUNCNAME", "$FUNCNAME [ str_repo | int_ndisk int_npartition ] path_filepath", "if $FUNCNAME REPO ubuntu.img CACHE ubuntu.img; then ...; fi")
return
ARGS = args
if args[0].startswith("/"):
# Camino completo. */ (Comentario Doxygen)
SOURCE = ogGetPath(*args[:1])
args = args[1:]
elif args[0].isdigit():
# ndisco npartición.
SOURCE = ogGetPath(*args[:3])
args = args[3:]
else:
# Otros: repo, cache, cdrom (no se permiten caminos relativos).
SOURCE = ogGetPath(*args[:2])
args = args[2:]
TARGET = ogGetPath(*args)
try:
with open(f"{SOURCE}.sum", "r") as source_file:
source_checksum = source_file.read().strip()
with open(f"{TARGET}.sum", "r") as target_file:
target_checksum = target_file.read().strip()
return source_checksum == target_checksum
except FileNotFoundError:
return False
def ogCalculateFullChecksum(*args):
# Variables locales.
FILE = None
if "help" in args:
ogHelp("$FUNCNAME", "$FUNCNAME [ str_repo | int_ndisk int_npartition ] path_filepath", "$FUNCNAME REPO ubuntu.img ==> ef899299caf8b517ce36f1157a93d8bf")
return
# Comprobar que existe el fichero y devolver sus datos.
FILE = ogGetPath(*args)
if not FILE:
ogRaiseError(OG_ERR_NOTFOUND, *args)
return
# Calculate the checksum
result = subprocess.run(["md5sum", FILE, "-b"], capture_output=True)
checksum = result.stdout.decode().split()[0]
return checksum
def ogCopyFile(*args):
# Variables locales.
ARGS = args
if "help" in args:
ogHelp("$FUNCNAME", "$FUNCNAME [ str_repo | int_ndisk int_npartition ] path_source [ str_repo | int_ndisk int_npartition ] path_target", "$FUNCNAME REPO newfile.txt 1 2 /tmp/newfile.txt")
return
ARGS = args
if args[0].startswith("/"):
# Camino completo. */ (Comentrio Doxygen)
SOURCE = ogGetPath(*args[:1])
args = args[1:]
elif args[0].isdigit():
# ndisco npartición.
SOURCE = ogGetPath(*args[:3])
args = args[3:]
else:
# Otros: repo, cache, cdrom (no se permiten caminos relativos).
SOURCE = ogGetPath(*args[:2])
args = args[2:]
TARGET = ogGetPath(*args)
# Comprobar fichero origen y directorio destino.
if not SOURCE:
ogRaiseError(OG_ERR_NOTFOUND, *args[:-1])
return
if not TARGET:
ogRaiseError(OG_ERR_NOTFOUND, *args)
return
# Copiar fichero (para evitar problemas de comunicaciones las copias se hacen con rsync en vez de cp).
result = subprocess.run(["rsync", "--progress", "--inplace", "-avh", SOURCE, TARGET])
return result.returncode
def ogDeleteFile(*args):
# Variables locales.
FILE = None
if "help" in args:
ogHelp("$FUNCNAME", "$FUNCNAME [ str_repo | int_ndisk int_npartition ] path_file", "$FUNCNAME 1 2 /tmp/newfile.txt")
return
# Comprobar que existe el fichero y borrarlo.
FILE = ogGetPath(*args)
if not FILE:
ogRaiseError(OG_ERR_NOTFOUND, *args)
return
try:
os.remove(FILE)
except OSError as e:
ogRaiseError(OG_ERR_NOTFOUND, *args)
return
def ogDeleteTree(*args):
# Variables locales.
DIR = None
if "help" in args:
ogHelp("$FUNCNAME", "$FUNCNAME [ str_repo | int_ndisk int_npartition ] path_dir", "$FUNCNAME 1 2 /tmp/newdir")
return
# Comprobar que existe el directorio y borrarlo con su contenido.
DIR = ogGetPath(*args)
if not DIR:
ogRaiseError(OG_ERR_NOTFOUND, *args)
return
try:
shutil.rmtree(DIR)
except OSError as e:
ogRaiseError(OG_ERR_NOTFOUND, *args)
return
def ogGetPath(*args):
# Variables locales.
MNTDIR = None
FILE = None
PREVFILE = None
FILEPATH = None
CURRENTDIR = None
# Si se solicita, mostrar ayuda.
if "help" in args:
ogHelp("$FUNCNAME", "$FUNCNAME [ str_repo | int_ndisk int_npartition ] path_filepath", "$FUNCNAME \"/mnt/sda1/windows/system32\" ==> /mnt/sda1/WINDOWS/System32", "$FUNCNAME REPO /etc/fstab ==> /opt/opengnsys/images/etc/fstab", "$FUNCNAME 1 1 \"/windows/system32\" ==> /mnt/sda1/WINDOWS/System32")
return
# Procesar camino según el número de parámetros.
if len(args) == 1:
FILE = args[0]
elif len(args) == 2:
if args[0].upper() == "REPO":
FILE = os.path.join(OGIMG, args[1])
elif args[0].upper() == "CACHE":
MNTDIR = ogMountCache()
if not MNTDIR:
return
FILE = os.path.join(MNTDIR, OGIMG, args[1])
elif args[0].upper() == "CDROM":
MNTDIR = ogMountCdrom()
if not MNTDIR:
return
FILE = os.path.join(MNTDIR, args[1])
else:
ogRaiseError(OG_ERR_FORMAT)
return
elif len(args) == 3:
MNTDIR = ogMount(args[0], args[1])
if not MNTDIR:
return
FILE = os.path.join(MNTDIR, args[2])
else:
ogRaiseError(OG_ERR_FORMAT)
return
# Eliminar caracteres \c / duplicados y finales.
FILE = os.path.normpath(FILE)
# Comprobar si existe el fichero para reducir tiempos.
if os.path.exists(FILE):
FILEPATH = FILE
else:
# Buscar el nombre correcto en cada subdirectorio del camino.
FILEPATH = "/"
while FILE != PREVFILE:
FILEPATH = os.path.join(FILEPATH.rstrip("/"), FILE.split("/")[0])
PREVFILE = FILE
FILE = "/".join(FILE.split("/")[1:])
if FILEPATH:
return FILEPATH
else:
return None
def ogGetParentPath(*args):
# Variables locales.
PARENT = None
if "help" in args:
ogHelp("$FUNCNAME", "$FUNCNAME [ str_repo | int_ndisk int_npartition ] path_filepath", "$FUNCNAME \"/mnt/sda1/windows/system32\" ==> /mnt/sda1/WINDOWS", "$FUNCNAME REPO /etc/fstab ==> /opt/opengnsys/images/etc", "$FUNCNAME 1 1 \"/windows/system32\" ==> /mnt/sda1/WINDOWS")
return
# Procesar camino según el número de parámetros.
if len(args) == 1:
PARENT = os.path.dirname(args[0])
elif len(args) == 2:
PARENT = f"{args[0]} {os.path.dirname(f'/{args[1]}')}"
elif len(args) == 3:
PARENT = f"{args[0]} {args[1]} {os.path.dirname(f'/{args[2]}')}"
else:
ogRaiseError(OG_ERR_FORMAT)
return
return ogGetPath(PARENT)
def ogIsNewerFile(*args):
# Variables locales.
ARGS = args
if "help" in args:
ogHelp("$FUNCNAME", "$FUNCNAME [ str_repo | int_ndisk int_npartition ] path_source [ str_repo | int_ndisk int_npartition ] path_target", "if $FUNCNAME REPO ubuntu.img CACHE ubuntu.img; then ... fi")
return
ARGS = args
if args[0].startswith("/"):
# Camino completo. */ (Comentrio Doxygen)
SOURCE = ogGetPath(*args[:1])
args = args[1:]
elif args[0].isdigit():
# ndisco npartición.
SOURCE = ogGetPath(*args[:3])
args = args[3:]
else:
# Otros: repo, cache, cdrom (no se permiten caminos relativos).
SOURCE = ogGetPath(*args[:2])
args = args[2:]
TARGET = ogGetPath(*args)
# Comprobar que existen los ficheros origen y destino.
if not SOURCE:
ogRaiseError(OG_ERR_NOTFOUND, *args[:-1])
return
if not TARGET:
ogRaiseError(OG_ERR_NOTFOUND, *args)
return
# Devolver si el primer fichero se ha modificado después que el segundo.
return os.path.getmtime(SOURCE) > os.path.getmtime(TARGET)
def ogMakeChecksumFile(*args):
# Variables locales.
FILE = None
if "help" in args:
ogHelp("$FUNCNAME", "$FUNCNAME [ str_repo | int_ndisk int_npartition ] path_filepath", "$FUNCNAME REPO ubuntu.img")
return
# Comprobar que existe el fichero y guardar su checksum.
FILE = ogGetPath(*args)
if not FILE:
ogRaiseError(OG_ERR_NOTFOUND, *args)
return
checksum = ogCalculateChecksum(FILE)
with open(f"{FILE}.sum", "w") as f:
f.write(checksum)
def ogMakeDir(*args):
# Variables locales.
PARENT = None
DIR = None
if "help" in args:
ogHelp("$FUNCNAME", "$FUNCNAME [ str_repo | int_ndisk int_npartition ] path_dir", "$FUNCNAME 1 2 /tmp/newdir")
return
PARENT = ogGetParentPath(*args)
if not PARENT:
ogRaiseError(OG_ERR_NOTFOUND, *args)
return
DIR = os.path.basename(args[-1])
os.makedirs(os.path.join(PARENT, DIR), exist_ok=True)