import os import re import subprocess import shutil import sys import platform import ogGlobals import SystemLib import DiskLib import FileSystemLib import CacheLib def ogCreateCache(ndisk=1, npart=4, partsize=None): """ Define la caché local, por defecto en partición 4 del disco 1. :param ndisk: número de disco donde crear la caché, por defecto es 1. :param npart: número de partición (opcional, 4 por defecto). :param partsize: tamaño de la partición en KB. :raises ValueError: Si el formato de los parámetros es incorrecto. :raises RuntimeError: Si ocurre un error durante la creación de la caché. """ if partsize is None: raise ValueError("El tamaño de la partición debe especificarse.") # Verifica si las herramientas necesarias están instaladas required_tools = ["sfdisk", "parted", "awk", "sed"] for tool in required_tools: if not shutil.which(tool): raise RuntimeError(f"La herramienta {tool} no está instalada.") # Preparar los comandos para crear la caché disk = f"/dev/sd{chr(96 + ndisk)}" size_in_sectors = partsize * 2 # Asumiendo 512B por sector try: # Lógica simplificada para crear la caché en la partición subprocess.run(["parted", disk, "mkpart", "primary", str(npart), str(partsize)], check=True) except subprocess.CalledProcessError as e: raise RuntimeError(f"Error al crear la caché: {e}") def ogDeleteCache(): """ Borra la partición utilizada para caché. :raises RuntimeError: Si ocurre un error durante la eliminación de la partición de caché. """ cachepart = ogFindCache() if cachepart is None: raise RuntimeError("No se encontró la partición de caché.") disk = f"/dev/sd{chr(96 + int(cachepart))}" try: subprocess.run(["parted", disk, "rm", cachepart], check=True) except subprocess.CalledProcessError as e: raise RuntimeError(f"Error al borrar la partición de caché: {e}") #/** # ogFindCache #@brief Detecta la partición caché local. #@param No requiere parametros #@return int_ndisk int_npart - devuelve el par nº de disco-nº de partición . #@warning Si no hay cache no devuelve nada #*/ ## def ogFindCache(): # Obtener el dispositivo del sistema de archivos etiquetado como "CACHE". PART = subprocess.run (['blkid', '-L', 'CACHE'], capture_output=True, text=True).stdout.strip() # En discos nvme con particiones GPT la partición se detecta usando el tag PARTLABEL if not PART: out = subprocess.run (['blkid', '-t', 'PARTLABEL=CACHE'], capture_output=True, text=True).stdout.strip() PART = out.split (':')[0] # Si no se detecta, obtener particiones marcadas de tipo caché en discos MSDOS. if not PART: out = subprocess.run (['sfdisk', '-l'], capture_output=True, text=True).stdout.splitlines() for l in out: elems = l.split (maxsplit=5) if 6 > len (elems): continue if 'ca' in elems[5] or 'a7' in elems[5]: PART=elems[0] break # Por último revisar todos los discos GPT y obtener las particiones etiquetadas como caché. if not PART: PART = '' for d in DiskLib.ogDiskToDev(): out = subprocess.run (['sgdisk', '-p', d], capture_output=True, text=True).stdout.splitlines() for l in out: elems = l.split (maxsplit=6) if 7 > len (elems): continue if 'CACHE' in elems[6]: p = 'p' if 'nvme' in d else '' PART += f' {d}{p}{elems[0]}' if not PART: return return DiskLib.ogDevToDisk (PART.split()[0]) # usar la 1ª partición encontrada. def ogFormatCache(): """ Formatea la partición de caché. :raises RuntimeError: Si ocurre un error durante el formateo de la partición. """ # Si se solicita, mostrar ayuda. if len(sys.argv) > 1 and sys.argv[1] == "help": ogHelp("ogFormatCache", "ogFormatCache") return # Error si no hay definida partición de caché. cachepart = ogFindCache() if cachepart is None: ogRaiseError(OG_ERR_PARTITION, MSG_NOCACHE) return disk = ogDiskToDev(cachepart) # Formatear sistema de ficheros. ogUnmountCache() options = "extent,large_file" if re.match("^5", platform.release()): options += ",uninit_bg,^metadata_csum,^64bit" try: subprocess.run(["mkfs.ext4", "-q", "-F", disk, "-L", "CACHE", "-O", options], check=True) except subprocess.CalledProcessError as e: raise RuntimeError(f"Error al formatear la partición de caché: {e}") # Crear estructura básica. mntdir = ogMountCache() os.makedirs(os.path.join(mntdir, OGIMG), exist_ok=True) # Incluir kernel e Initrd del ogLive updateBootCache() def ogGetCacheSize(): """ Obtiene el tamaño de la partición de caché. :return: Tamaño de la partición de caché en kilobytes. :raises RuntimeError: Si ocurre un error al obtener el tamaño de la partición de caché. """ # Si se solicita, mostrar ayuda. if len(sys.argv) > 1 and sys.argv[1] == "help": ogHelp("ogGetCacheSize", "help", "ogGetCacheSize") # Error si no se encuentra partición de caché. cachepart = ogFindCache() if cachepart is None: raise RuntimeError(MSG_NOCACHE) # Devuelve tamaño de la partición de caché. return ogGetPartitionSize(cachepart) def ogGetCacheSpace(): """ Obtiene el espacio libre en la partición de caché en kilobytes. :return: Espacio libre en kilobytes. :raises RuntimeError: Si ocurre un error al obtener el espacio libre. """ # Si se solicita, mostrar ayuda. if len(sys.argv) > 1 and sys.argv[1] == "help": ogHelp("ogGetCacheSpace", "help", "ogGetCacheSpace") # Error si no se encuentra partición de caché. cachepart = ogFindCache() if cachepart is None: raise RuntimeError(MSG_NOCACHE) # Obtener el tamaño del disco y el número de sectores. disk = ogDiskToDev(cachepart) try: result = subprocess.run(["sfdisk", "-g", disk], capture_output=True, text=True, check=True) output = result.stdout sectors_per_cylinder = int(output.splitlines()[1].split()[1]) total_sectors = int(output.splitlines()[1].split()[0]) * sectors_per_cylinder - 1 except subprocess.CalledProcessError as e: raise RuntimeError(f"Error al obtener el espacio libre: {e}") # Obtener el último sector de la partición 3. try: result = subprocess.run(["sfdisk", "-uS", "-l", disk], capture_output=True, text=True, check=True) output = result.stdout end_sector_part3 = int([line.split()[2] for line in output.splitlines() if cachepart + "3" in line][0]) except subprocess.CalledProcessError as e: raise RuntimeError(f"Error al obtener el espacio libre: {e}") # Calcular el espacio libre en kilobytes. if end_sector_part3 > total_sectors // 2: free_space_kb = (total_sectors - end_sector_part3) // 2 else: free_space_kb = total_sectors // 4 return free_space_kb #/** # ogMountCache #@brief Monta la partición Cache y exporta la variable $OGCAC #@param sin parametros #@return path_mountpoint - Punto de montaje del sistema de archivos de cache. #@warning Salidas de errores no determinada #*/ ## def ogMountCache(): c = CacheLib.ogFindCache().split() m = FileSystemLib.ogMountFs (c[0], c[1]) if not m: SystemLib.ogRaiseError ([], ogGlobals.OG_ERR_PARTITION, ogGlobals.lang.MSG_NOCACHE) #return return m def ogUnmountCache(): """ Desmonta la partición de caché. :raises RuntimeError: Si ocurre un error durante el desmontaje de la partición de caché. """ # Si se solicita, mostrar ayuda. if len(sys.argv) > 1 and sys.argv[1] == "help": ogHelp("ogUnmountCache", "help", "ogUnmountCache") # Obtener la partición de caché. cachepart = ogFindCache() if cachepart is None: raise RuntimeError("No se encontró la partición de caché.") # Desmontar la partición de caché. try: subprocess.run(["umount", cachepart], check=True) except subprocess.CalledProcessError as e: raise RuntimeError(f"Error al desmontar la partición de caché: {e}") # Eliminar el enlace simbólico de /mnt/ParticiónCache. os.remove(f"/mnt/{cachepart}")