#/** #@file FileLib.py #@brief Librería o clase File #@class File #@brief Funciones para gestión de archivos y directorios. #@warning License: GNU GPLv3+ #*/ import subprocess import os import shutil import hashlib import ogGlobals import SystemLib import CacheLib import FileSystemLib #/** # ogCalculateChecksum [ str_repo | int_ndisk int_npart ] path_filepath #@brief Devuelve la suma de comprobación (checksum) de un fichero. #@param path_filepath camino del fichero (independiente de mayúsculas) #@param str_repo repositorio de ficheros #@param int_ndisk nº de orden del disco #@param int_npartition nº de orden de la partición #@return hex_checksum Checksum del fichero #*/ ## #ogCalculateChecksum ([ str_repo | int_ndisk int_npartition ] path_filepath") #ogCalculateChecksum (container='REPO', file='ubuntu.img') ==> ef899299caf8b517ce36f1157a93d8bf #ogCalculateChecksum (disk=1, par=1, file='ubuntu.img') ==> ef899299caf8b517ce36f1157a93d8bf def ogCalculateChecksum (disk=None, par=None, container=None, file=None): if file is None: raise TypeError ('missing required argument: "file"') if container is not None: if disk is None and par is None: ## we were given container= f = ogGetPath (src=container, file=file) dev_err = f'{container} {file}' print (f'ogGetPath (src=({container}), file=({file})) = f ({f})') else: raise TypeError ('argument "container" can be specified along neither "disk" nor "par"') else: if disk is not None and par is not None: ## we were given disk= par= f = ogGetPath (src=f'{disk} {par}', file=file) dev_err = f'{disk} {par} {file}' print (f'ogGetPath (src=({disk} {par}), file=({file})) = f ({f})') elif disk is None and par is None: ## we were given nothing f = ogGetPath (file=file) dev_err = file print (f'ogGetPath (file=({file})) = f ({f})') else: raise TypeError ('if one of "disk" and "par" are specified, then both must be') if not f: SystemLib.ogRaiseError ([], ogGlobals.OG_ERR_NOTFOUND, dev_err) return last_n_bytes = 1024*1024 if last_n_bytes >= os.stat (f).st_size: return ogCalculateFullChecksum (disk, par, container, file) with open (f, 'rb') as fd: fd.seek (-last_n_bytes, os.SEEK_END) data = fd.read() md5 = hashlib.md5(data).hexdigest() return md5 #/** # ogCompareChecksumFiles [ str_repo | int_ndisk int_npart ] path_source [ str_repo | int_ndisk int_npart ] path_target #@brief Metafunción que compara las sumas de comprobación almacenadas de 2 ficheros. #@return bool_compare Valor de comparación. #@warning No es necesario especificar la extensión ".sum". #*/ ## def ogCompareChecksumFiles(*args): # Variables locales. ARGS = args if "help" in args: SystemLib.ogHelp("$FUNCNAME", "$FUNCNAME [ str_repo | int_ndisk int_npartition ] path_filepath", "if $FUNCNAME REPO ubuntu.img CACHE ubuntu.img; then ...; fi") return ARGS = args if args[0].startswith("/"): # Camino completo. */ (Comentario Doxygen) SOURCE = ogGetPath(*args[:1]) args = args[1:] elif args[0].isdigit(): # ndisco npartición. SOURCE = ogGetPath(*args[:3]) args = args[3:] else: # Otros: repo, cache, cdrom (no se permiten caminos relativos). SOURCE = ogGetPath(*args[:2]) args = args[2:] TARGET = ogGetPath(*args) try: with open(f"{SOURCE}.sum", "r") as source_file: source_checksum = source_file.read().strip() with open(f"{TARGET}.sum", "r") as target_file: target_checksum = target_file.read().strip() return source_checksum == target_checksum except FileNotFoundError: return False #/** # ogCalculateFullChecksum [ str_repo | int_ndisk int_npart ] path_filepath #@brief Devuelve la suma COMPLETA de comprobación (checksum) de un fichero. #@param path_filepath camino del fichero (independiente de mayúsculas) #@param str_repo repositorio de ficheros #@param int_ndisk nº de orden del disco #@param int_npartition nº de orden de la partición #@return hex_checksum Checksum del fichero #*/ ## def ogCalculateFullChecksum (disk=None, par=None, container=None, file=None): if file is None: raise TypeError ('missing required argument: "file"') if container is not None: if disk is None and par is None: ## we were given container= f = ogGetPath (src=container, file=file) dev_err = f'{container} {file}' print (f'ogGetPath (src=({container}), file=({file})) = f ({f})') else: raise TypeError ('argument "container" can be specified along neither "disk" nor "par"') else: if disk is not None and par is not None: ## we were given disk= par= f = ogGetPath (src=f'{disk} {par}', file=file) dev_err = f'{disk} {par} {file}' print (f'ogGetPath (src=({disk} {par}), file=({file})) = f ({f})') elif disk is None and par is None: ## we were given nothing f = ogGetPath (file=file) dev_err = file print (f'ogGetPath (file=({file})) = f ({f})') else: raise TypeError ('if one of "disk" and "par" are specified, then both must be') if not f: SystemLib.ogRaiseError ([], ogGlobals.OG_ERR_NOTFOUND, dev_err) return md5 = hashlib.md5() with open (f, 'rb') as fd: for chunk in iter (lambda: fd.read (64*1024), b''): md5.update (chunk) return md5.hexdigest() #/** # ogCopyFile [ str_repo | int_ndisk int_npart ] path_source [ str_repo | int_ndisk int_npart ] path_target #@brief Metafunción para copiar un fichero de sistema OpenGnSys a un directorio. #@see ogGetPath #@return Progreso de la copia. #@warning Deben existir tanto el fichero origen como el directorio destino. #*/ ## #ogCopyFile (src, dst) #ogCopyFile ({container='REPO', file='newfile.txt'}, {disk=1, par=2, file='/tmp/newfile.txt'}) #ogCopyFile ({disk=1, par=2, file='/tmp/newfile.txt'}, {container='REPO', file='newfile.txt'}) def ogCopyFile (src, dst): for elem in src, dst: if elem == src: which = 'source' else: which = 'target' if 'file' not in elem: raise TypeError ('missing required argument in {which} dict:: "file"') if 'container' in elem: if 'disk' not in elem and 'par' not in elem: ## we were given container= path = ogGetPath (src=elem['container'], file=elem['file']) dev_err = f'{elem["container"]} {elem["file"]}' print (f'ogGetPath {which} (src=({elem["container"]}), file=({elem["file"]})) = path ({path})') else: raise TypeError ('argument "container" can be specified along neither "disk" nor "par"') else: if 'disk' in elem and 'par' in elem: ## we were given disk= par= path = ogGetPath (src=f'{elem["disk"]} {elem["par"]}', file=elem['file']) dev_err = f'{elem["disk"]} {elem["par"]} {elem["file"]}' print (f'ogGetPath {which} (src=({elem["disk"]} {elem["par"]}), file=({elem["file"]})) = path ({path})') elif 'disk' not in elem and 'par' not in elem: ## we were given nothing path = ogGetPath (file=elem['file']) dev_err = elem['file'] print (f'ogGetPath {which} (file=({elem["file"]})) = path ({path})') else: raise TypeError ('if one of "disk" and "par" are specified, then both must be') if elem == src: SOURCE = path src_dev_err = dev_err else: TARGET = path dst_dev_err = dev_err if not SOURCE: SystemLib.ogRaiseError ([], ogGlobals.OG_ERR_NOTFOUND, f'device or file {src_dev_err} not found') return if not TARGET: SystemLib.ogRaiseError ([], ogGlobals.OG_ERR_NOTFOUND, f'device or file {dst_dev_err} not found') return print (f'nati: ogCopyFile: SOURCE ({SOURCE}) TARGET ({TARGET})') # Copiar fichero (para evitar problemas de comunicaciones las copias se hacen con rsync en vez de cp). print (f'nati: ogCopyFile: rsync --progress --inplace -avh ({SOURCE}) ({TARGET})') result = subprocess.run(["rsync", "--progress", "--inplace", "-avh", SOURCE, TARGET], capture_output=True, text=True) return result.returncode #/** # ogDeleteFile [ str_repo | int_ndisk int_npartition ] path_filepath #@brief Metafunción que borra un fichero de un dispositivo. #@see ogGetPath #@version 0.9 - Pruebas con OpenGnSys. #@author Ramon Gomez, ETSII Universidad de Sevilla #@date 2009-09-29 #*/ ## #ogDeleteFile ([ str_repo | int_ndisk int_npartition ] path_file) #ogDeleteFile (container='REPO', file='/tmp/newfile.txt') #ogDeleteFile (disk=1, par=2, file='/tmp/newfile.txt') def ogDeleteFile (disk=None, par=None, container=None, file=None): if file is None: raise TypeError ('missing required argument: "file"') if container is not None: if disk is None and par is None: ## we were given container= src = container else: raise TypeError ('argument "container" can be specified along neither "disk" nor "par"') else: if disk is not None and par is not None: ## we were given disk= par= src = f'{disk} {par}' else: ## we were given nothing raise TypeError ('either "container" or both "disk" and "par" must be specified') f = ogGetPath (src=src, file=file) if not f: SystemLib.ogRaiseError ([], ogGlobals.OG_ERR_NOTFOUND, f'{src} {file}') return try: os.remove (f) except OSError as e: SystemLib.ogRaiseError ([], ogGlobals.OG_ERR_GENERIC, str (e)) return def ogDeleteTree(*args): # Variables locales. DIR = None if "help" in args: SystemLib.ogHelp("$FUNCNAME", "$FUNCNAME [ str_repo | int_ndisk int_npartition ] path_dir", "$FUNCNAME 1 2 /tmp/newdir") return # Comprobar que existe el directorio y borrarlo con su contenido. DIR = ogGetPath(*args) if not DIR: SystemLib.ogRaiseError( "session", ogGlobals.OG_ERR_NOTFOUND, f"Not found: {args}" ) return try: shutil.rmtree(DIR) except OSError as e: SystemLib.ogRaiseError( "session", ogGlobals.OG_ERR_NOTFOUND, f"{e}" ) return #/** # ogGetPath [ str_repo | int_ndisk int_npartition ] path_filepath #@brief Inicia el proceso de arranque de un sistema de archivos. #@param path_filepath camino del fichero (independiente de mayúsculas) #@param str_repo repositorio de ficheros #@param int_ndisk nº de orden del disco #@param int_npartition nº de orden de la partición #@return path_file - camino completo real del fichero. #@note repo = { REPO, CACHE, CDROM } #@note Requisitos: \c grep \c sed #@exception OG_ERR_FORMAT Formato incorrecto. #@exception OG_ERR_NOTFOUND Fichero o dispositivo no encontrado. #@exception OG_ERR_PARTITION Tipo de partición desconocido o no se puede montar. #@warning En caso de error, sólo devuelve el código y no da mensajes. #@todo Terminar de definir parámetros para acceso a repositorios. #*/ ## #ogGetPath (file='/mnt/sda1/windows/system32') ==> '/mnt/sda1/WINDOWS/System32' #ogGetPath (src='REPO', file='/etc/fstab') ==> '/opt/opengnsys/images/etc/fstab' #ogGetPath (src='1 1', file='/windows/system32') ==> '/mnt/sda1/WINDOWS/System32' def ogGetPath (src=None, file=None): if file is None: raise TypeError ('missing required argument: "file"') f = file if src is not None: if 'REPO' == src: f = os.path.join (ogGlobals.OGIMG, file.strip('/')) elif 'CACHE' == src: mntdir = CacheLib.ogMountCache() if not mntdir: return None f = os.path.join (mntdir, ogGlobals.OGIMG.strip('/'), file.strip('/')) elif 'CDROM' == src: mntdir = FileSystemLib.ogMountCdrom() if not mntdir: return None f = os.path.join (mntdir, file.strip('/')) else: try: disk, part = src.split() except ValueError: SystemLib.ogRaiseError ([], ogGlobals.OG_ERR_FORMAT, '') return mntdir = FileSystemLib.ogMount (disk, part) if not mntdir: return None f = os.path.join (mntdir, file.strip('/')) f = os.path.normpath (f) if os.path.exists (f): filepath = f #print (f'f ({f}) existe, filepath=f ({filepath})') else: # Buscar el nombre correcto en cada subdirectorio del camino. prevfile = '' filepath = '/' #print (f'f ({f}) prevfile ({prevfile})') while f != prevfile: #print ('\nno son iguales, nueva iteracion...') f_first_component = f.split ('/')[0] ## take 1st component ls_path = os.path.join (filepath, f_first_component) ## "ls" makes reference to the original bash version #print (f'f_first_component ({f_first_component}) ls_path ({ls_path})') ## build filepath to return if os.path.exists (ls_path): filepath = ls_path #print (f'ls_path existe, filepath ({filepath})') else: filepath = subprocess.run (['find', filepath, '-maxdepth', '1', '-iname', f_first_component, '-print'], capture_output=True, text=True).stdout.strip() #print (f'ls_path no existe, filepath ({filepath})') prevfile = f if '/' in f: f = '/'.join (f.split('/')[1:]) ## remove 1st component #print (f'f ({f}) prevfile ({prevfile})') return filepath #/** # ogGetParentPath [ str_repo | int_ndisk int_npartition ] path_filepath #@brief Metafunción que devuelve el camino del directorio padre. #@see ogGetPath #*/ ## #ogGetParentPath ([ str_repo | int_ndisk int_npartition ] path_filepath #ogGetParentPath ( file='/mnt/sda1/windows/system32') ==> '/mnt/sda1/WINDOWS' #ogGetParentPath (src='REPO', file='/etc/fstab') ==> '/opt/opengnsys/images/etc' #ogGetParentPath (src='1 1', file='/windows/system32') ==> '/mnt/sda1/WINDOWS' def ogGetParentPath (src=None, file=None): if file is None: SystemLib.ogRaiseError ([], ogGlobals.OG_ERR_FORMAT, '') return if src is None: return ogGetPath (file=os.path.dirname (file)) else: return ogGetPath (src=src, file=os.path.dirname('/'+file)) def ogIsNewerFile(*args): # Variables locales. ARGS = args if "help" in args: SystemLib.ogHelp("$FUNCNAME", "$FUNCNAME [ str_repo | int_ndisk int_npartition ] path_source [ str_repo | int_ndisk int_npartition ] path_target", "if $FUNCNAME REPO ubuntu.img CACHE ubuntu.img; then ... fi") return ARGS = args if args[0].startswith("/"): # Camino completo. */ (Comentrio Doxygen) SOURCE = ogGetPath(*args[:1]) args = args[1:] elif args[0].isdigit(): # ndisco npartición. SOURCE = ogGetPath(*args[:3]) args = args[3:] else: # Otros: repo, cache, cdrom (no se permiten caminos relativos). SOURCE = ogGetPath(*args[:2]) args = args[2:] TARGET = ogGetPath(*args) # Comprobar que existen los ficheros origen y destino. if not SOURCE: SystemLib.ogRaiseError( "session", ogGlobals.OG_ERR_NOTFOUND, f"Not found: {args[:-1]}" ) return if not TARGET: SystemLib.ogRaiseError( "session", ogGlobals.OG_ERR_NOTFOUND, f"Not found: {args[-1]}" ) return # Devolver si el primer fichero se ha modificado después que el segundo. return os.path.getmtime(SOURCE) > os.path.getmtime(TARGET) def ogMakeChecksumFile(*args): # Variables locales. FILE = None if "help" in args: SystemLib.ogHelp("$FUNCNAME", "$FUNCNAME [ str_repo | int_ndisk int_npartition ] path_filepath", "$FUNCNAME REPO ubuntu.img") return # Comprobar que existe el fichero y guardar su checksum. FILE = ogGetPath(*args) if not FILE: SystemLib.ogRaiseError( "session", ogGlobals.OG_ERR_NOTFOUND, f"Not found: {args}" ) return checksum = ogCalculateChecksum(FILE) with open(f"{FILE}.sum", "w") as f: f.write(checksum) #/** # ogMakeDir [ str_repo | int_ndisk int_npartition ] path_dirpath #@brief Metafunción que crea un subdirectorio vacío en un dispositivo. #@see ogGetParentPath #*/ ## #ogMakeDir (container='REPO', file='/tmp/newdir') #ogMakeDir (disk='1', par='2', file='/tmp/newdir') def ogMakeDir (container=None, disk=None, par=None, file=None): if file is None: raise TypeError ('missing required argument: "file"') if container is not None: if disk is None and par is None: ## we were given container= parent = ogGetParentPath (src=container, file=file) dev_err = f'{container} {file}' print (f'ogGetParentPath (src=({container}), file=({file})) = parent ({parent})') else: raise TypeError ('argument "container" can be specified along neither "disk" nor "par"') else: if disk is not None and par is not None: ## we were given disk= par= parent = ogGetParentPath (src=f'{disk} {par}', file=file) dev_err = f'{disk} {par} {file}' print (f'ogGetParentPath (src=({disk} {par}), file=({file})) = parent ({parent})') elif disk is None and par is None: ## we were given nothing parent = ogGetParentPath (file=file) dev_err = file print (f'ogGetParentPath (file=({file})) = parent ({parent})') else: raise TypeError ('if one of "disk" and "par" are specified, then both must be') print (f'nati: ogMakeDir: parent ({parent})') if not parent: SystemLib.ogRaiseError ([], ogGlobals.OG_ERR_NOTFOUND, f'device or file {dev_err} not found') return dst = os.path.basename (file) print (f'nati: ogMakeDir: dst ({dst})') os.makedirs (os.path.join (parent, dst), exist_ok=True)