import os import re import subprocess import shutil import sys import platform import ogGlobals import SystemLib import DiskLib import FileSystemLib import CacheLib #/** # ogCreateCache [int_ndisk] int_partsize #@brief Define la caché local, por defecto en partición 4 del disco 1. #@param int_ndisk numero de disco donde crear la cache, si no se indica es el 1 por defecto #@param int_npart número de partición (opcional, 4 por defecto) #@param int_partsize tamaño de la partición (en KB) #@return (nada, por determinar) #@exception OG_ERR_FORMAT formato incorrecto. #@note Requisitos: sfdisk, parted, awk, sed #@warning El tamaño de caché debe estar entre 50 MB y la mitad del disco. #@warning La caché no puede solaparse con las particiones de datos. #*/ ## def ogCreateCache(ndisk=1, npart=4, partsize=None): """ Define la caché local, por defecto en partición 4 del disco 1. :param ndisk: número de disco donde crear la caché, por defecto es 1. :param npart: número de partición (opcional, 4 por defecto). :param partsize: tamaño de la partición en KB. :raises ValueError: Si el formato de los parámetros es incorrecto. :raises RuntimeError: Si ocurre un error durante la creación de la caché. """ if partsize is None: raise ValueError("El tamaño de la partición debe especificarse.") # Verifica si las herramientas necesarias están instaladas required_tools = ["sfdisk", "parted", "awk", "sed"] for tool in required_tools: if not shutil.which(tool): raise RuntimeError(f"La herramienta {tool} no está instalada.") # Preparar los comandos para crear la caché disk = f"/dev/sd{chr(96 + ndisk)}" size_in_sectors = partsize * 2 # Asumiendo 512B por sector try: # Lógica simplificada para crear la caché en la partición subprocess.run(["parted", disk, "mkpart", "primary", str(npart), str(partsize)], check=True) except subprocess.CalledProcessError as e: raise RuntimeError(f"Error al crear la caché: {e}") #/** # ogDeleteCache #@brief Elimina la partición de caché local. #@return (nada, por determinar) #@exception OG_ERR_FORMAT formato incorrecto. #@note Requisitos: fdisk, sgdisk, partprobe #@version 0.91 - Definición de caché local. #@author Ramon Gomez, ETSII Universidad de Sevilla #@date 2010/03/11 #@version 1.0.4 - Soporte para discos GPT. #@author Universidad de Huelva #@date 2012/03/13 #@version 1.0.6b - llamada correcta a ogUpdatePartitionTable #@author Antonio Doblas Universidad de Málaga #@date 2016/11/16 #@version 1.1.0 - Sustituir "sfdisk" por "fdisk" para discos MSDOS. #@author Ramon Gomez, ETSII Universidad de Sevilla #@date 2016/05/25 #*/ ## def ogDeleteCache(): """ Borra la partición utilizada para caché. :raises RuntimeError: Si ocurre un error durante la eliminación de la partición de caché. """ cachepart = ogFindCache() if cachepart is None: raise RuntimeError("No se encontró la partición de caché.") disk = f"/dev/sd{chr(96 + int(cachepart))}" try: subprocess.run(["parted", disk, "rm", cachepart], check=True) except subprocess.CalledProcessError as e: raise RuntimeError(f"Error al borrar la partición de caché: {e}") #/** # ogFindCache #@brief Detecta la partición caché local. #@param No requiere parametros #@return int_ndisk int_npart - devuelve el par nº de disco-nº de partición . #@warning Si no hay cache no devuelve nada #*/ ## def ogFindCache(): # Obtener el dispositivo del sistema de archivos etiquetado como "CACHE". PART = subprocess.run (['blkid', '-L', 'CACHE'], capture_output=True, text=True).stdout.strip() # En discos nvme con particiones GPT la partición se detecta usando el tag PARTLABEL if not PART: out = subprocess.run (['blkid', '-t', 'PARTLABEL=CACHE'], capture_output=True, text=True).stdout.strip() PART = out.split (':')[0] # Si no se detecta, obtener particiones marcadas de tipo caché en discos MSDOS. if not PART: out = subprocess.run (['sfdisk', '-l'], capture_output=True, text=True).stdout.splitlines() for l in out: elems = l.split (maxsplit=5) if 6 > len (elems): continue if 'ca' in elems[5] or 'a7' in elems[5]: PART=elems[0] break # Por último revisar todos los discos GPT y obtener las particiones etiquetadas como caché. if not PART: PART = '' for d in DiskLib.ogDiskToDev(): out = subprocess.run (['sgdisk', '-p', d], capture_output=True, text=True).stdout.splitlines() for l in out: elems = l.split (maxsplit=6) if 7 > len (elems): continue if 'CACHE' in elems[6]: p = 'p' if 'nvme' in d else '' PART += f' {d}{p}{elems[0]}' if not PART: return return DiskLib.ogDevToDisk (PART.split()[0]) # usar la 1ª partición encontrada. #/** # ogFormatCache #@brief Formatea el sistema de ficheros para la caché local. #@return (por determinar) #@warning Prueba con formato Reiser. #@attention #@note El sistema de archivos de la caché se queda montado. #*/ ## def ogFormatCache(): cachepart = ogFindCache() if cachepart is None: SystemLib.ogRaiseError ([], ogGlobals.OG_ERR_PARTITION, ogGlobals.lang.MSG_NOCACHE) return cachepart = cachepart.split() disk = DiskLib.ogDiskToDev (cachepart[0], cachepart[1]) if not disk: return ogUnmountCache() options = "extent,large_file" if re.match("^5", platform.release()): options += ",uninit_bg,^metadata_csum,^64bit" try: subprocess.run(["mkfs.ext4", "-q", "-F", disk, "-L", "CACHE", "-O", options], check=True) except subprocess.CalledProcessError as e: SystemLib.ogRaiseError ([], ogGlobals.OG_ERR_PARTITION, "CACHE") return # Crear estructura básica. mntdir = ogMountCache() j = '/'.join ([mntdir, ogGlobals.OGIMG]) ## os.path.join doesn't work: "If a segment […] is an absolute path, all previous segments are ignored" print (f'cucu mntdir ({mntdir}) OGIMG ({ogGlobals.OGIMG}) j ({j})') os.makedirs (j, exist_ok=True) # Incluir kernel e Initrd del ogLive ## como lo llamo sin especificar el path entero? #subprocess.run (['scripts/updateBootCache.py']) ## TODO #/** # ogGetCacheSize #@brief Devuelve el tamaño definido para la partición de caché. #@return int_partsize tamaño de la partición (en KB) #@exception OG_ERR_PARTITION No existe partición de caché. #*/ ## def ogGetCacheSize(): cachepart = ogFindCache() if cachepart is None: SystemLib.ogRaiseError ([], ogGlobals.OG_ERR_PARTITION, ogGlobals.lang.MSG_NOCACHE) return disk, par = cachepart.split() return DiskLib.ogGetPartitionSize (disk, par) #/** # ogGetCacheSpace #@brief Devuelve el espacio de disco disponible para la partición de caché. #@return int_size tamaño disponible (en KB) #@note El espacio disponible es el que hay entre el límite superior de la partición 3 del disco 1 y el final de dicho disco, y no puede ser superior a la mitad de dicho disco. #*/ ## def ogGetCacheSpace(): cachepart = ogFindCache() if not cachepart: SystemLib.ogRaiseError ([], ogGlobals.OG_ERR_PARTITION, ogGlobals.lang.MSG_NOCACHE) return None cachepart = cachepart.split() disk = DiskLib.ogDiskToDev (cachepart[0]) if not disk: return None sectors = 0 disk_bn = os.path.basename (disk) with open ('/proc/partitions', 'r') as fd: proc_parts = fd.read() for l in proc_parts.splitlines(): items = l.split() if len(items) < 4: continue if items[3] == disk_bn: sectors = 2 * int (items[2]) if not sectors: return None sfdisk_out = subprocess.run (['sfdisk', '-g', disk], capture_output=True, text=True).stdout cyls = int (sfdisk_out.split()[1]) sectors = sectors/cyls * cyls - 1 ## the original code has a hard dependency on the existence of a third partition ## if the disk has sda1, sda2 and sda4, the code fails. ## this is an improved version endpart3 = 0 for trypart in [3, 2, 1]: sfdisk_out = subprocess.run (['sfdisk', '-uS', '-l', disk], capture_output=True, text=True).stdout for l in sfdisk_out.splitlines(): items = l.split() if len(items) < 6: continue if f'{disk}{trypart}' == items[0]: endpart3 = int (items[2]) break if endpart3: break if not endpart3: return None # Mostrar espacio libre en KB (1 KB = 2 sectores) if endpart3 > sectors // 2: return (sectors - endpart3) // 2 else: return sectors // 4 #/** # ogMountCache #@brief Monta la partición Cache y exporta la variable $OGCAC #@param sin parametros #@return path_mountpoint - Punto de montaje del sistema de archivos de cache. #@warning Salidas de errores no determinada #*/ ## def ogMountCache(): c = ogFindCache() if not c: SystemLib.ogRaiseError ([], ogGlobals.OG_ERR_PARTITION, ogGlobals.lang.MSG_NOCACHE) return None c = c.split() m = FileSystemLib.ogMountFs (c[0], c[1]) if not m: SystemLib.ogRaiseError ([], ogGlobals.OG_ERR_PARTITION, ogGlobals.lang.MSG_NOCACHE) #return return m #/** # ogUnmountCache #@brief Desmonta la particion Cache y elimina la variable $OGCAC #@param sin parametros #@return nada #@warning Salidas de errores no determinada #*/ ## def ogUnmountCache(): cachepart = ogFindCache().split() if cachepart is None: SystemLib.ogRaiseError ([], ogGlobals.OG_ERR_PARTITION, ogGlobals.lang.MSG_NOCACHE) return if not FileSystemLib.ogIsMounted (cachepart[0], cachepart[1]): return True FileSystemLib.ogUnmountFs (cachepart[0], cachepart[1]) # Eliminar el enlace simbólico de /mnt/ParticiónCache. dev = DiskLib.ogDiskToDev (cachepart[0], cachepart[1]) dev = dev.replace ('dev', 'mnt') os.remove (dev)