test-ogclone/client/lib/engine/bin/FileLib.py

314 lines
11 KiB
Python

import subprocess
import os
import shutil
import ogGlobals
import SystemLib
import CacheLib
import FileSystemLib
def ogCalculateChecksum(*args):
# Check if help is requested
if "help" in args:
print("ogCalculateChecksum [ str_repo | int_ndisk int_npartition ] path_filepath")
print("ogCalculateChecksum REPO ubuntu.img ==> ef899299caf8b517ce36f1157a93d8bf")
return
# Get the file path
file_path = ogGetPath(*args)
if not file_path:
SystemLib.ogRaiseError(OG_ERR_NOTFOUND, *args)
return
# Calculate the checksum
result = subprocess.run(["tail", "-c1M", file_path], capture_output=True)
checksum = result.stdout.decode().split()[0]
return checksum
def ogCompareChecksumFiles(*args):
# Variables locales.
ARGS = args
if "help" in args:
SystemLib.ogHelp("$FUNCNAME", "$FUNCNAME [ str_repo | int_ndisk int_npartition ] path_filepath", "if $FUNCNAME REPO ubuntu.img CACHE ubuntu.img; then ...; fi")
return
ARGS = args
if args[0].startswith("/"):
# Camino completo. */ (Comentario Doxygen)
SOURCE = ogGetPath(*args[:1])
args = args[1:]
elif args[0].isdigit():
# ndisco npartición.
SOURCE = ogGetPath(*args[:3])
args = args[3:]
else:
# Otros: repo, cache, cdrom (no se permiten caminos relativos).
SOURCE = ogGetPath(*args[:2])
args = args[2:]
TARGET = ogGetPath(*args)
try:
with open(f"{SOURCE}.sum", "r") as source_file:
source_checksum = source_file.read().strip()
with open(f"{TARGET}.sum", "r") as target_file:
target_checksum = target_file.read().strip()
return source_checksum == target_checksum
except FileNotFoundError:
return False
def ogCalculateFullChecksum(*args):
# Variables locales.
FILE = None
if "help" in args:
SystemLib.ogHelp("$FUNCNAME", "$FUNCNAME [ str_repo | int_ndisk int_npartition ] path_filepath", "$FUNCNAME REPO ubuntu.img ==> ef899299caf8b517ce36f1157a93d8bf")
return
# Comprobar que existe el fichero y devolver sus datos.
FILE = ogGetPath(*args)
if not FILE:
SystemLib.ogRaiseError(OG_ERR_NOTFOUND, *args)
return
# Calculate the checksum
result = subprocess.run(["md5sum", FILE, "-b"], capture_output=True)
checksum = result.stdout.decode().split()[0]
return checksum
def ogCopyFile(*args):
# Variables locales.
ARGS = args
if "help" in args:
SystemLib.ogHelp("$FUNCNAME", "$FUNCNAME [ str_repo | int_ndisk int_npartition ] path_source [ str_repo | int_ndisk int_npartition ] path_target", "$FUNCNAME REPO newfile.txt 1 2 /tmp/newfile.txt")
return
ARGS = args
if args[0].startswith("/"):
# Camino completo. */ (Comentrio Doxygen)
SOURCE = ogGetPath(*args[:1])
args = args[1:]
elif args[0].isdigit():
# ndisco npartición.
SOURCE = ogGetPath(*args[:3])
args = args[3:]
else:
# Otros: repo, cache, cdrom (no se permiten caminos relativos).
SOURCE = ogGetPath(*args[:2])
args = args[2:]
TARGET = ogGetPath(*args)
# Comprobar fichero origen y directorio destino.
if not SOURCE:
SystemLib.ogRaiseError(OG_ERR_NOTFOUND, *args[:-1])
return
if not TARGET:
SystemLib.ogRaiseError(OG_ERR_NOTFOUND, *args)
return
# Copiar fichero (para evitar problemas de comunicaciones las copias se hacen con rsync en vez de cp).
result = subprocess.run(["rsync", "--progress", "--inplace", "-avh", SOURCE, TARGET])
return result.returncode
def ogDeleteFile(*args):
# Variables locales.
FILE = None
if "help" in args:
SystemLib.ogHelp("$FUNCNAME", "$FUNCNAME [ str_repo | int_ndisk int_npartition ] path_file", "$FUNCNAME 1 2 /tmp/newfile.txt")
return
# Comprobar que existe el fichero y borrarlo.
FILE = ogGetPath(*args)
if not FILE:
SystemLib.ogRaiseError(OG_ERR_NOTFOUND, *args)
return
try:
os.remove(FILE)
except OSError as e:
SystemLib.ogRaiseError(OG_ERR_NOTFOUND, *args)
return
def ogDeleteTree(*args):
# Variables locales.
DIR = None
if "help" in args:
SystemLib.ogHelp("$FUNCNAME", "$FUNCNAME [ str_repo | int_ndisk int_npartition ] path_dir", "$FUNCNAME 1 2 /tmp/newdir")
return
# Comprobar que existe el directorio y borrarlo con su contenido.
DIR = ogGetPath(*args)
if not DIR:
SystemLib.ogRaiseError(OG_ERR_NOTFOUND, *args)
return
try:
shutil.rmtree(DIR)
except OSError as e:
SystemLib.ogRaiseError(OG_ERR_NOTFOUND, *args)
return
#/**
# ogGetPath [ str_repo | int_ndisk int_npartition ] path_filepath
#@brief Inicia el proceso de arranque de un sistema de archivos.
#@param path_filepath camino del fichero (independiente de mayúsculas)
#@param str_repo repositorio de ficheros
#@param int_ndisk nº de orden del disco
#@param int_npartition nº de orden de la partición
#@return path_file - camino completo real del fichero.
#@note repo = { REPO, CACHE, CDROM }
#@note Requisitos: \c grep \c sed
#@exception OG_ERR_FORMAT Formato incorrecto.
#@exception OG_ERR_NOTFOUND Fichero o dispositivo no encontrado.
#@exception OG_ERR_PARTITION Tipo de partición desconocido o no se puede montar.
#@warning En caso de error, sólo devuelve el código y no da mensajes.
#@todo Terminar de definir parámetros para acceso a repositorios.
#*/ ##
#ogGetPath (file='/mnt/sda1/windows/system32') ==> '/mnt/sda1/WINDOWS/System32'
#ogGetPath (src='REPO', file='/etc/fstab') ==> '/opt/opengnsys/images/etc/fstab'
#ogGetPath (src='1 1', file='/windows/system32') ==> '/mnt/sda1/WINDOWS/System32'
def ogGetPath (src=None, file=None):
if file is None:
raise TypeError ('missing required argument: "file"')
f = file
if src is not None:
if 'REPO' == src:
f = os.path.join (ogGlobals.OGIMG, file.strip('/'))
elif 'CACHE' == src:
mntdir = CacheLib.ogMountCache()
if not mntdir: return None
f = os.path.join (mntdir, ogGlobals.OGIMG.strip('/'), file.strip('/'))
elif 'CDROM' == src:
mntdir = FileSystemLib.ogMountCdrom()
if not mntdir: return None
f = os.path.join (mntdir, file.strip('/'))
else:
try:
disk, part = src.split()
except ValueError:
SystemLib.ogRaiseError ([], ogGlobals.OG_ERR_FORMAT, '')
return
mntdir = FileSystemLib.ogMount (disk, part)
if not mntdir: return None
f = os.path.join (mntdir, file.strip('/'))
f = os.path.normpath (f)
if os.path.exists (f):
filepath = f
#print (f'f ({f}) existe, filepath=f ({filepath})')
else:
# Buscar el nombre correcto en cada subdirectorio del camino.
prevfile = ''
filepath = '/'
#print (f'f ({f}) prevfile ({prevfile})')
while f != prevfile:
#print ('\nno son iguales, nueva iteracion...')
f_first_component = f.split ('/')[0] ## take 1st component
ls_path = os.path.join (filepath, f_first_component) ## "ls" makes reference to the original bash version
#print (f'f_first_component ({f_first_component}) ls_path ({ls_path})')
## build filepath to return
if os.path.exists (ls_path):
filepath = ls_path
#print (f'ls_path existe, filepath ({filepath})')
else:
filepath = subprocess.run (['find', filepath, '-maxdepth', '1', '-iname', f_first_component, '-print'], capture_output=True, text=True).stdout.strip()
#print (f'ls_path no existe, filepath ({filepath})')
prevfile = f
f = '/'.join (f.split('/')[1:]) ## remove 1st component
#print (f'f ({f}) prevfile ({prevfile})')
return filepath
#/**
# ogGetParentPath [ str_repo | int_ndisk int_npartition ] path_filepath
#@brief Metafunción que devuelve el camino del directorio padre.
#@see ogGetPath
#*/ ##
#ogGetParentPath ([ str_repo | int_ndisk int_npartition ] path_filepath
#ogGetParentPath ('/mnt/sda1/windows/system32') ==> '/mnt/sda1/WINDOWS'
#ogGetParentPath ('REPO', '/etc/fstab') ==> '/opt/opengnsys/images/etc'
#ogGetParentPath ('1 1', '/windows/system32') ==> '/mnt/sda1/WINDOWS'
def ogGetParentPath (src=None, file=None):
if file is None:
SystemLib.ogRaiseError ([], ogGlobals.OG_ERR_FORMAT, '')
return
if src is None:
return ogGetPath (file=os.path.dirname (file))
else:
return ogGetPath (src=src, file=os.path.dirname('/'+file))
def ogIsNewerFile(*args):
# Variables locales.
ARGS = args
if "help" in args:
SystemLib.ogHelp("$FUNCNAME", "$FUNCNAME [ str_repo | int_ndisk int_npartition ] path_source [ str_repo | int_ndisk int_npartition ] path_target", "if $FUNCNAME REPO ubuntu.img CACHE ubuntu.img; then ... fi")
return
ARGS = args
if args[0].startswith("/"):
# Camino completo. */ (Comentrio Doxygen)
SOURCE = ogGetPath(*args[:1])
args = args[1:]
elif args[0].isdigit():
# ndisco npartición.
SOURCE = ogGetPath(*args[:3])
args = args[3:]
else:
# Otros: repo, cache, cdrom (no se permiten caminos relativos).
SOURCE = ogGetPath(*args[:2])
args = args[2:]
TARGET = ogGetPath(*args)
# Comprobar que existen los ficheros origen y destino.
if not SOURCE:
SystemLib.ogRaiseError(OG_ERR_NOTFOUND, *args[:-1])
return
if not TARGET:
SystemLib.ogRaiseError(OG_ERR_NOTFOUND, *args)
return
# Devolver si el primer fichero se ha modificado después que el segundo.
return os.path.getmtime(SOURCE) > os.path.getmtime(TARGET)
def ogMakeChecksumFile(*args):
# Variables locales.
FILE = None
if "help" in args:
SystemLib.ogHelp("$FUNCNAME", "$FUNCNAME [ str_repo | int_ndisk int_npartition ] path_filepath", "$FUNCNAME REPO ubuntu.img")
return
# Comprobar que existe el fichero y guardar su checksum.
FILE = ogGetPath(*args)
if not FILE:
SystemLib.ogRaiseError(OG_ERR_NOTFOUND, *args)
return
checksum = ogCalculateChecksum(FILE)
with open(f"{FILE}.sum", "w") as f:
f.write(checksum)
def ogMakeDir(*args):
# Variables locales.
PARENT = None
DIR = None
if "help" in args:
SystemLib.ogHelp("$FUNCNAME", "$FUNCNAME [ str_repo | int_ndisk int_npartition ] path_dir", "$FUNCNAME 1 2 /tmp/newdir")
return
PARENT = ogGetParentPath(*args)
if not PARENT:
SystemLib.ogRaiseError(OG_ERR_NOTFOUND, *args)
return
DIR = os.path.basename(args[-1])
os.makedirs(os.path.join(PARENT, DIR), exist_ok=True)