#/** #@file Disk.lib #@brief Librería o clase Disk #@class Disk #@brief Funciones para gestión de discos y particiones. #@version 1.1.1 #@warning License: GNU GPLv3+ #*/ import filecmp import subprocess import shutil import os import stat from pathlib import Path import ogGlobals import SystemLib import CacheLib import FileSystemLib # Función ficticia para lanzar parted con timeout, evitando cuelgues del programa. def parted(*args): parted_path = shutil.which("parted") if parted_path: try: result = subprocess.run( [parted_path] + list(args), timeout=3, capture_output=True, text=True ) return result.stdout except subprocess.TimeoutExpired: return "Error: Command 'parted' timed out" else: return "Error: 'parted' command not found" #/** # ogCreatePartitions int_ndisk str_parttype:int_partsize ... #@brief Define el conjunto de particiones de un disco. #@param int_ndisk nº de orden del disco #@param str_parttype mnemónico del tipo de partición #@param int_partsize tamaño de la partición (en KB) #@return (nada, por determinar) #@exception OG_ERR_FORMAT formato incorrecto. #@exception OG_ERR_NOTFOUND disco o partición no detectado (no es un dispositivo). #@exception OG_ERR_PARTITION error en partición o en tabla de particiones. #@attention El nº de partición se indica por el orden de los párametros \c parttype:partsize #@attention Pueden definirse particiones vacías de tipo \c EMPTY #@attention No puede definirse partición de cache y no se modifica si existe. #@note Requisitos: sfdisk, parted, partprobe, awk #@todo Definir atributos (arranque, oculta) y tamaños en MB, GB, etc. #*/ ## def ogCreatePartitions(*args): # Variables locales ND = DISK = PTTYPE = PART = SECTORS = START = SIZE = TYPE = CACHEPART = None IODISCO = IOSIZE = CACHESIZE = EXTSTART = EXTSIZE = NVME_PREFIX = tmpsfdisk = None # Si se solicita, mostrar ayuda. if len(args) == 1 and args[0] == "help": SystemLib.ogHelp('ogCreatePartitions', 'ogCreatePartitions int_ndisk str_parttype:int_partsize ...', 'ogCreatePartitions 1 NTFS:10000000 EXT3:5000000 LINUX-SWAP:1000000') return # Error si no se reciben al menos 2 parámetros. if len(args) < 2: SystemLib.ogRaiseError(OG_ERR_FORMAT) return # Nº total de sectores, para evitar desbordamiento (evitar redondeo). ND = args[0] DISK = ogDiskToDev(ND) if DISK is None: return PTTYPE = ogGetPartitionTableType(ND) or "MSDOS" # Por defecto para discos vacíos. if PTTYPE == "GPT": ogCreateGptPartitions(*args) return elif PTTYPE != "MSDOS": SystemLib.ogRaiseError(OG_ERR_PARTITION, PTTYPE) return SECTORS = ogGetLastSector(ND) # Se recalcula el nº de sectores del disco 1, si existe partición de caché. CACHEPART = CacheLib.ogFindCache() if CACHEPART and ND == CACHEPART.split()[0]: CACHESIZE = int(CacheLib.ogGetCacheSize()) * 2 # Sector de inicio (la partición 1 empieza en el sector 63). IODISCO = ogDiskToDev(ND) IOSIZE = subprocess.getoutput(f"fdisk -l {IODISCO} | awk '/I\\/O/ {{print $4}}'") if IOSIZE == "4096": START = 4096 SECTORS -= 8192 if CACHESIZE: SECTORS = SECTORS - CACHESIZE + 2048 - (SECTORS - CACHESIZE) % 2048 - 1 else: START = 63 if CACHESIZE: SECTORS -= CACHESIZE PART = 1 # Fichero temporal de entrada para "sfdisk" tmpsfdisk = f"/tmp/sfdisk{os.getpid()}" try: with open(tmpsfdisk, 'w') as f: f.write("unit: sectors\n\n") NVME_PREFIX = "p" if "nvme" in DISK else "" # Generar fichero de entrada para "sfdisk" con las particiones. args = args[1:] # Shift while args: # Conservar los datos de la partición de caché. if f"{ND} {PART}" == CACHEPART and CACHESIZE: with open(tmpsfdisk, 'a') as f: f.write(f"{DISK}{NVME_PREFIX}{PART} : start={SECTORS + 1}, size={CACHESIZE}, Id=ca\n") PART += 1 # Leer formato de cada parámetro - Tipo:Tamaño TYPE = args[0].split(":")[0] SIZE = int(args[0].split(":")[1]) # Obtener identificador de tipo de partición válido. ID = ogTypeToId(TYPE, "MSDOS") with open(tmpsfdisk, 'a') as f: f.write(f"{DISK}{NVME_PREFIX}{PART} : start={START}, size={SIZE}, Id={ID}\n") START += SIZE SECTORS -= SIZE PART += 1 args = args[1:] # Ejecutar "sfdisk" con el fichero temporal. subprocess.run(["sfdisk", DISK], input=open(tmpsfdisk, 'r').read(), text=True) subprocess.run(["partprobe", DISK]) finally: # Eliminar fichero temporal. if os.path.exists(tmpsfdisk): os.remove(tmpsfdisk) if CACHESIZE: CacheLib.ogMountCache() return 0 #/** # ogCreateGptPartitions int_ndisk str_parttype:int_partsize ... #@brief Define el conjunto de particiones de un disco GPT #@param int_ndisk nº de orden del disco #@param str_parttype mnemónico del tipo de partición #@param int_partsize tamaño de la partición (en KB) #@return (nada, por determinar) #@exception OG_ERR_FORMAT formato incorrecto. #@exception OG_ERR_NOTFOUND disco o partición no detectado (no es un dispositivo). #@exception OG_ERR_PARTITION error en partición o en tabla de particiones. #@attention El nº de partición se indica por el orden de los párametros \c parttype:partsize #@attention Pueden definirse particiones vacías de tipo \c EMPTY #@attention No puede definirse partición de caché y no se modifica si existe. #@note Requisitos: sfdisk, parted, partprobe, awk #@todo Definir atributos (arranque, oculta) y tamaños en MB, GB, etc. #*/ ## def ogCreateGptPartitions(*args): # Variables locales ND = DISK = PART = SECTORS = ALIGN = START = SIZE = TYPE = CACHEPART = CACHESIZE = DELOPTIONS = OPTIONS = None # Si se solicita, mostrar ayuda. if len(args) == 1 and args[0] == "help": SystemLib.ogHelp('ogCreateGptPartitions', 'ogCreateGptPartitions int_ndisk str_parttype:int_partsize ...', 'ogCreateGptPartitions 1 NTFS:10000000 EXT3:5000000 LINUX-SWAP:1000000') return # Error si no se reciben menos de 2 parámetros. if len(args) < 2: SystemLib.ogRaiseError(OG_ERR_FORMAT) return # Nº total de sectores, para evitar desbordamiento (evitar redondeo). ND = args[0] DISK = ogDiskToDev(ND) if DISK is None: return # Se calcula el ultimo sector del disco (total de sectores usables) SECTORS = ogGetLastSector(ND) # Se recalcula el nº de sectores del disco si existe partición de caché. CACHEPART = CacheLib.ogFindCache() if ND == CACHEPART.split()[0]: CACHESIZE = int(ogGetCacheSize()) * 2 if CACHESIZE: SECTORS -= CACHESIZE # Si el disco es GPT empieza en el sector 2048 por defecto, pero podria cambiarse ALIGN = subprocess.getoutput(f"sgdisk -D {DISK} 2>/dev/null") START = ALIGN PART = 1 # Leer parámetros con definición de particionado. args = args[1:] # Shift while args: # Si PART es la cache, nos la saltamos y seguimos con el siguiente numero para conservar los datos de la partición de caché. if f"{ND} {PART}" == CACHEPART and CACHESIZE: PART += 1 # Leer formato de cada parámetro - Tipo:Tamaño TYPE = args[0].split(":")[0] SIZE = int(args[0].split(":")[1]) # Error si la partición es extendida (no válida en discos GPT). if TYPE == "EXTENDED": SystemLib.ogRaiseError(OG_ERR_PARTITION, "EXTENDED") return # Comprobar si existe la particion actual, capturamos su tamaño para ver si cambio o no PARTSIZE = ogGetPartitionSize(ND, PART) # En sgdisk no se pueden redimensionar las particiones, es necesario borrarlas y volver a crealas if PARTSIZE: DELOPTIONS += f" -d{PART}" # Creamos la particion # Obtener identificador de tipo de partición válido. ID = ogTypeToId(TYPE, "GPT") if TYPE != "CACHE" and ID: OPTIONS += f" -n{PART}:{START}:+{SIZE} -t{PART}:{ID} " START += SIZE # Error si se supera el nº total de sectores. if START > SECTORS: SystemLib.ogRaiseError(OG_ERR_FORMAT, f"{START//2} > {SECTORS//2}") return PART += 1 args = args[1:] # Desmontar los sistemas de archivos del disco antes de realizar las operaciones. ogUnmountAll(ND) if CACHESIZE: CacheLib.ogUnmountCache() # Si la tabla de particiones no es valida, volver a generarla. ogCreatePartitionTable(ND) # Definir particiones y notificar al kernel. # Borramos primero las particiones y luego creamos las nuevas subprocess.run(["sgdisk"] + DELOPTIONS.split() + OPTIONS.split() + [DISK], stderr=subprocess.DEVNULL) subprocess.run(["partprobe", DISK], stderr=subprocess.DEVNULL) if CACHESIZE: CacheLib.ogMountCache() return 0 #/** # ogCreatePartitionTable int_ndisk [str_tabletype] #@brief Genera una tabla de particiones en caso de que no sea valida, si es valida no hace nada. #@param int_ndisk nº de orden del disco #@param str_tabletype tipo de tabla de particiones (opcional) #@return (por determinar) #@exception OG_ERR_FORMAT Formato incorrecto. #@exception OG_ERR_NOTFOUND Disco o particion no corresponden con un dispositivo. #@note tabletype: { MSDOS, GPT }, MSDOS por defecto #@note Requisitos: fdisk, gdisk, parted #*/ ## def ogCreatePartitionTable(*args): # Variables locales DISK = PTTYPE = CREATE = CREATEPTT = None # Si se solicita, mostrar ayuda. if len(args) == 1 and args[0] == "help": SystemLib.ogHelp('ogCreatePartitionTable', 'ogCreatePartitionTable int_ndisk [str_partype]', 'ogCreatePartitionTable 1 GPT', 'ogCreatePartitionTable 1') return # Error si no se reciben 1 o 2 parámetros. if len(args) == 1: CREATEPTT = "" elif len(args) == 2: CREATEPTT = args[1] else: SystemLib.ogRaiseError(OG_ERR_FORMAT) return # Capturamos el tipo de tabla de particiones actual DISK = ogDiskToDev(args[0]) if DISK is None: return PTTYPE = ogGetPartitionTableType(args[0]) or "MSDOS" # Por defecto para discos vacíos. CREATEPTT = CREATEPTT or PTTYPE # Si la tabla actual y la que se indica son iguales, se comprueba si hay que regenerarla. if CREATEPTT == PTTYPE: if PTTYPE == "GPT": try: result = subprocess.run( ["sgdisk", "-p", DISK], stderr=subprocess.PIPE, stdout=subprocess.DEVNULL ) if result.returncode != 0: CREATE = "GPT" except subprocess.CalledProcessError: CREATE = "GPT" elif PTTYPE == "MSDOS": try: result = subprocess.run( ["parted", "-s", DISK, "print"], stderr=subprocess.PIPE, stdout=subprocess.DEVNULL ) if result.returncode != 0: CREATE = "MSDOS" except subprocess.CalledProcessError: CREATE = "MSDOS" else: CREATE = CREATEPTT # Dependiendo del valor de CREATE, creamos la tabla de particiones en cada caso. if CREATE == "GPT": if PTTYPE == "MSDOS": subprocess.run(["sgdisk", "-go", DISK]) else: subprocess.run(["gdisk", DISK], input="2\nw\nY\n", text=True) subprocess.run(["partprobe", DISK], stderr=subprocess.DEVNULL) elif CREATE == "MSDOS": if PTTYPE == "GPT": subprocess.run(["sgdisk", "-Z", DISK]) subprocess.run(["fdisk", DISK], input="o\nn\np\n\n\n\nd\n\nw", text=True) subprocess.run(["partprobe", DISK], stderr=subprocess.DEVNULL) return #/** # ogDeletePartitionTable ndisk #@brief Borra la tabla de particiones del disco. #@param int_ndisk nº de orden del disco #@return la informacion propia del fdisk #*/ ## def ogDeletePartitionTable(*args): # Variables locales DISK = None # Si se solicita, mostrar ayuda. if len(args) == 1 and args[0] == "help": SystemLib.ogHelp('ogDeletePartitionTable', 'ogDeletePartitionTable int_ndisk', 'ogDeletePartitionTable 1') return # Error si no se reciben 1 parámetros. if len(args) != 1: SystemLib.ogRaiseError(OG_ERR_FORMAT) return # Obteniendo Identificador linux del disco. DISK = ogDiskToDev(args[0]) if DISK is None: return # Crear una tabla de particiones vacía. PTTYPE = ogGetPartitionTableType(args[0]) if PTTYPE == "GPT": subprocess.run(["sgdisk", "-o", DISK]) elif PTTYPE == "MSDOS": subprocess.run(["fdisk", DISK], input="o\nw", text=True) return #/** # ogDevToDisk path_device | LABEL="str_label" | UUID="str_uuid" #@brief Devuelve el nº de orden de dicso (y partición) correspondiente al nombre de fichero de dispositivo o a la etiqueta o UUID del sistema de archivos asociado. #@param path_device Camino del fichero de dispositivo. #@param str_label etiqueta de sistema de archivos. #@param str_uuid UUID de sistema de archivos. #@return int_ndisk (para dispositivo de disco) #@return int_ndisk int_npartition (para dispositivo de partición). #@exception OG_ERR_FORMAT Formato incorrecto. #@exception OG_ERR_NOTFOUND Dispositivo no detectado. #@note Solo se acepta en cada llamada 1 de los 3 tipos de parámetros. #*/ ## def ogDevToDisk(arg_dev): CACHEFILE = "/var/cache/disks.cfg" if '=' in arg_dev: # arg_dev is "FOO=bar" cmd = None if arg_dev.startswith("LABEL="): cmd = ['blkid', '-L', arg_dev[6:]] elif arg_dev.startswith("PARTLABEL="): cmd = ['realpath', '/dev/disk/by-partlabel/'+arg_dev[11:]] elif arg_dev.startswith("PARTUUID="): cmd = ['realpath', '/dev/disk/by-partuuid/'+arg_dev[10:]] elif arg_dev.startswith("UUID="): cmd = ['blkid', '-U', arg_dev[5:]] if not cmd: SystemLib.ogRaiseError([], ogGlobals.OG_ERR_FORMAT, arg_dev) return DEV = subprocess.run (cmd, capture_output=True, text=True).stdout.strip() else: # arg_dev is "/dev/something" DEV = arg_dev if not os.path.exists(DEV): SystemLib.ogRaiseError([], ogGlobals.OG_ERR_NOTFOUND, arg_dev) return # Error si no es fichero de bloques o directorio (para LVM). m = os.stat (DEV, follow_symlinks=True).st_mode if not stat.S_ISBLK (m) and not stat.S_ISDIR (m): SystemLib.ogRaiseError([], ogGlobals.OG_ERR_NOTFOUND, arg_dev) return # Buscar en fichero de caché de discos. PART = None if os.path.exists(CACHEFILE): with open(CACHEFILE, 'r') as f: for line in f: parts = line.strip().split(':') if len(parts) == 2 and parts[1] == DEV: PART = parts[0] break if PART: return PART # Si no se encuentra, procesa todos los discos para devolver su nº de orden y de partición. disks = ogDiskToDev() n = 1 for d in disks: NVME_PREFIX = "p" if "nvme" in d else "" if DEV.startswith(d): return f"{n} {DEV[len(d) + len(NVME_PREFIX):]}" n += 1 SystemLib.ogRaiseError([], ogGlobals.OG_ERR_NOTFOUND, arg_dev) return def _getAllDisks(): ret = [] all_disks = subprocess.run("lsblk -n -e 1,2 -x MAJ:MIN 2>/dev/null || lsblk -n -e 1,2", shell=True, capture_output=True, text=True).stdout.splitlines() for line in all_disks: parts = line.split() if parts[5] == "disk": parts[0].replace ('!', '/') ret.append(f"/dev/{parts[0]}") return ret def _getAllVolGroups(): vgs = subprocess.run(['vgs', '-a', '--noheadings'], capture_output=True, text=True).stdout.splitlines() ret = [f"/dev/{line.split()[0]}" for line in vgs] return ret def _getMPath(): ret = alldisks2remove = [] try: mpath = subprocess.run(['multipath', '-l', '-v', '1'], capture_output=True, text=True).stdout.splitlines() ret = [f"/dev/mapper/{line.split()[0]}" for line in mpath] for line in subprocess.getoutput("multipath -ll").splitlines(): if line.split()[5] == "ready": alldisks2remove.append (f"/dev/{line.split()[2]}") except FileNotFoundError: pass except subprocess.CalledProcessError: pass return ret, alldisks2remove def _getAllZFSVols(): zfsvols = subprocess.run(['blkid'], capture_output=True, text=True).stdout.splitlines() return [line.split(":")[0] for line in zfsvols if "zfs" in line] #/** # ogDiskToDev [int_ndisk [int_npartition]] #@brief Devuelve la equivalencia entre el nº de orden del dispositivo (dicso o partición) y el nombre de fichero de dispositivo correspondiente. #@param int_ndisk nº de orden del disco #@param int_npartition nº de orden de la partición #@return Para 0 parametros: Devuelve los nombres de ficheros de los dispositivos sata/ata/usb linux encontrados. #@return Para 1 parametros: Devuelve la ruta del disco duro indicado. #@return Para 2 parametros: Devuelve la ruta de la particion indicada. #@exception OG_ERR_FORMAT Formato incorrecto. #@exception OG_ERR_NOTFOUND Dispositivo no detectado. #*/ ## def ogDiskToDev (arg_disk=None, arg_part=None): CACHEFILE = "/var/cache/disks.cfg" try: if arg_part != None: arg_part = int (arg_part) except: SystemLib.ogRaiseError([], ogGlobals.OG_ERR_FORMAT, f"{arg_disk} {arg_part}") return try: if arg_disk != None: arg_disk = int (arg_disk) except: SystemLib.ogRaiseError([], ogGlobals.OG_ERR_FORMAT, arg_disk) return # Borrar fichero de caché de configuración si hay cambios en las particiones. proc_partitions = Path ('/proc/partitions').read_text() tmp_partitions = Path ('/tmp/.partitions').read_text() if os.path.exists ('/tmp/.partitions') else '' if proc_partitions != tmp_partitions: # Guardar copia de las particiones definidas para comprobar cambios. shutil.copy2('/proc/partitions', '/tmp/.partitions') try: os.remove(CACHEFILE) except FileNotFoundError: pass # Si existe una correspondencia con disco/dispositivo en el caché; mostrarlo y salir. if arg_disk and os.path.exists (CACHEFILE): with open(CACHEFILE, 'r') as f: args_joined = ' '.join (map (str, filter (None, [arg_disk,arg_part]))) for line in f: parts = line.strip().split(':') if len(parts) == 2 and parts[0] == args_joined: return parts[1] # Continuar para detectar nuevos dispositivos. ALLDISKS = _getAllDisks() VOLGROUPS = _getAllVolGroups() ALLDISKS += VOLGROUPS MPATH, ALLDISKS_to_remove =_getMPath() for d in ALLDISKS_to_remove: if d in ALLDISKS: ALLDISKS.remove (d) ZFSVOLS = _getAllZFSVols() ALLDISKS += ZFSVOLS # No params: return all disks if arg_disk is None: return ALLDISKS # arg_disk is set: return it if arg_part is None: if arg_disk > len (ALLDISKS): SystemLib.ogRaiseError([], ogGlobals.OG_ERR_NOTFOUND, arg_disk) return disk = ALLDISKS[arg_disk-1] if not os.path.exists(disk): SystemLib.ogRaiseError([], ogGlobals.OG_ERR_NOTFOUND, arg_disk) return # Actualizar caché de configuración y mostrar dispositivo. with open(CACHEFILE, 'a') as f: f.write(f"{arg_disk}:{disk}\n") return disk # arg_disk and arg_part are set: there are several possibilities if arg_disk > len (ALLDISKS): SystemLib.ogRaiseError([], ogGlobals.OG_ERR_NOTFOUND, arg_disk) return disk = ALLDISKS[arg_disk-1] if not os.path.exists(disk): SystemLib.ogRaiseError([], ogGlobals.OG_ERR_NOTFOUND, arg_disk) return # Plain partition part = f"{disk}{arg_part}" if os.path.exists(part): # Actualizar caché de configuración y mostrar dispositivo. with open(CACHEFILE, 'a') as f: f.write(f"{arg_disk} {arg_part}:{part}\n") return part # RAID/Multipath (tener en cuenta enlace simbólico). part = f"{disk}p{arg_part}" if os.path.exists(part) and stat.S_ISBLK (os.stat (part, follow_symlinks=True).st_mode): # Actualizar caché de configuración y mostrar dispositivo. with open(CACHEFILE, 'a') as f: f.write(f"{arg_disk} {arg_part}:{part}\n") return part part = "" # Logical volume if disk in VOLGROUPS: lvscan = subprocess.run (['lvscan', '-a'], capture_output=True, text=True).stdout.splitlines() i = 0 for line in lvscan: parts = line.split("'") if parts[1].startswith(f"{disk}/") and i == arg_part: part = parts[1] break i += 1 # ZFS volume if disk in ZFSVOLS: subprocess.run(["zpool", "import", "-f", "-R", "/mnt", "-N", "-a"]) zpool = subprocess.run(['blkid', '-s', 'LABEL', '-o', 'value', disk], capture_output=True, text=True).stdout zfs_list = subprocess.run(['zfs', 'list', '-Hp', '-o', 'name,canmount,mountpoint', '-r', zpool], capture_output=True, text=True).stdout.splitlines() i = 0 for line in zfs_list: parts = line.split() if parts[1] == "on" and parts[2] != "none": if i == int(args[1]): part = parts[0] break i += 1 if not part: SystemLib.ogRaiseError([], ogGlobals.OG_ERR_NOTFOUND, f"{arg_disk} {arg_part}") return # Actualizar caché de configuración y mostrar dispositivo. with open(CACHEFILE, 'a') as f: f.write(f"{arg_disk} {arg_part}:{part}\n") return part #/** # ogGetDiskSize int_ndisk #@brief Muestra el tamaño en KB de un disco. #@param int_ndisk nº de orden del disco #@return int_size - Tamaño en KB del disco. #@exception OG_ERR_FORMAT formato incorrecto. #@exception OG_ERR_NOTFOUND disco o particion no detectado (no es un dispositivo). #@note Requisitos: sfdisk, awk #*/ ## def ogGetDiskSize(*args): # Variables locales DISK = SIZE = None # Si se solicita, mostrar ayuda. if len(args) == 1 and args[0] == "help": SystemLib.ogHelp('ogGetDiskSize', 'ogGetDiskSize int_ndisk', 'ogGetDiskSize 1 => 244198584') return # Error si no se recibe 1 parámetro. if len(args) != 1: SystemLib.ogRaiseError(OG_ERR_FORMAT) return # Obtener el tamaño del disco. DISK = ogDiskToDev(args[0]) if DISK is None: return SIZE = subprocess.getoutput(f"lsblk -n -b -o SIZE {DISK}") # Mostrar salida. if SIZE: print(SIZE) return #/** # ogGetDiskType path_device #@brief Muestra el tipo de disco (real, RAID, meta-disco, USB, etc.). #@param path_device Dispositivo #@exception OG_ERR_FORMAT formato incorrecto. #@exception OG_ERR_NOTFOUND disco no detectado o no es un dispositivo de bloques. #@note Requisitos: udevadm #*/ ## def ogGetDiskType(*args): # Variables locales DEV = MAJOR = TYPE = None # Si se solicita, mostrar ayuda. if len(args) == 1 and args[0] == "help": SystemLib.ogHelp('ogGetDiskType', 'ogGetDiskType path_device', 'ogGetDiskType /dev/sdb => USB') return # Error si no se recibe 1 parámetro. if len(args) != 1: SystemLib.ogRaiseError(OG_ERR_FORMAT) return # Obtener el driver del dispositivo de bloques. DEV = args[0].split("/dev/")[1] MAJOR = subprocess.getoutput(f"awk -v D='{DEV}' '{{if ($4==D) print $1;}}' /proc/partitions") TYPE = subprocess.getoutput(f"awk -v D={MAJOR} '/Block/ {{bl=1}} {{if ($1==D&&bl) print toupper($2)}}' /proc/devices") # Devolver mnemónico del driver de dispositivo. if TYPE == "SD": TYPE = "DISK" if subprocess.getoutput(f"udevadm info -q property {args[0]} 2>/dev/null | grep -q '^ID_BUS=usb'"): TYPE = "USB" elif TYPE == "BLKEXT": TYPE = "NVM" elif TYPE == "SR" or TYPE.startswith("IDE"): TYPE = "CDROM" # FIXME Comprobar discos IDE. elif TYPE == "MD" or TYPE.startswith("CCISS"): TYPE = "RAID" elif TYPE == "DEVICE-MAPPER": TYPE = "MAPPER" # FIXME Comprobar LVM y RAID. print(TYPE) return #/** # ogGetEsp #@brief Devuelve números de disco y partición para la partición EFI (ESP). #*/ ## def ogGetEsp(): devices = subprocess.run (['blkid', '-o', 'device'], capture_output=True, text=True).stdout.splitlines() devices.sort() for d in devices: # Previene error para /dev/loop0 PART = ogDevToDisk (d) if not PART: continue # En discos NVMe blkid devuelve una salida del tipo: # >/dev/loop0 # >/dev/nvme0n1 # >/dev/nvme0n1p1 # al analizar la particion nvme0n1, PART solo tiene un argumento y hace que ogGetPartitionId lance un error if len (PART) > 1: disk, par = PART.split() if ogGetPartitionId (disk, par) == ogTypeToId ('EFI', 'GPT'): return PART return None #/** # ogGetLastSector int_ndisk [int_npart] #@brief Devuelve el último sector usable del disco o de una partición. #@param int_ndisk nº de orden del disco #@param int_npart nº de orden de la partición (opcional) #@return Último sector usable. #@exception OG_ERR_FORMAT Formato incorrecto. #@exception OG_ERR_NOTFOUND Disco o partición no corresponde con un dispositivo. #@note Requisitos: sfdisk, sgdisk #*/ ## def ogGetLastSector(*args): # Variables locales DISK = None PART = None LASTSECTOR = None # Si se solicita, mostrar ayuda. if len(args) == 1 and args[0] == "help": SystemLib.ogHelp("ogGetLastSector", "ogGetLastSector int_ndisk [int_npart]", "ogGetLastSector 1 => 488392064", "ogGetLastSector 1 1 => 102400062") return # Obtener último sector. if len(args) == 1: # Para un disco. DISK = ogDiskToDev(args[0]) if DISK is None: return LASTSECTOR = subprocess.getoutput(f"sgdisk -p {DISK} | awk '/last usable sector/ {{print($(NF))}}'") elif len(args) == 2: # Para una partición. DISK = ogDiskToDev(args[0]) if DISK is None: return PART = ogDiskToDev(args[0], args[1]) if PART is None: return LASTSECTOR = subprocess.getoutput(f"sgdisk -p {DISK} | awk -v P='{args[1]}' '{{if ($1==P) print $3}}'") else: # Error si se reciben más parámetros. SystemLib.ogRaiseError(OG_ERR_FORMAT) return print(LASTSECTOR) return #/** # ogGetPartitionActive int_ndisk #@brief Muestra que particion de un disco esta marcada como de activa. #@param int_ndisk nº de orden del disco #@exception OG_ERR_FORMAT Formato incorrecto. #@exception OG_ERR_NOTFOUND Disco o particion no corresponden con un dispositivo. #@note Requisitos: parted #@todo Queda definir formato para atributos (arranque, oculta, ...). #*/ ## def ogGetPartitionActive (disk): DISK = ogDiskToDev (disk) if DISK is None: return if 'LANG' in os.environ: lang = os.environ['LANG'] ret = None os.environ['LANG'] = 'C' lines = subprocess.run (['parted', '--script', '--machine', DISK, 'print'], capture_output=True, text=True).stdout.splitlines() for line in lines: parts = line.split (':') if len (parts) < 6: continue if 'boot' in parts[6]: ret = parts[0] break if lang is None: del os.environ['LANG'] else: os.environ['LAMG'] = lang return ret #/** # ogGetPartitionId int_ndisk int_npartition #@brief Devuelve el mnemónico con el tipo de partición. #@param int_ndisk nº de orden del disco #@param int_npartition nº de orden de la partición #@return Identificador de tipo de partición. #@exception OG_ERR_FORMAT Formato incorrecto. #@exception OG_ERR_NOTFOUND Disco o partición no corresponde con un dispositivo. #@note Requisitos: sfdisk #*/ ## def ogGetPartitionId (disk, par): DISK = ogDiskToDev (disk) if DISK is None: return pttype = ogGetPartitionTableType (disk) if 'GPT' == pttype: lines = subprocess.run (['sgdisk', '-p', DISK], capture_output=True, text=True).stdout.splitlines() start_index = next (i for i, line in enumerate(lines) if 'Number' in line) for l in lines[start_index:]: idx, start, end, sz, sz_units, code, *rest = l.split() if idx == par: fsid = code break if fsid == '8300' and f'{disk} {par}' == ogFindCache(): fsid = 'CA00' elif 'MSDOS' == pttype: fsid = subprocess.run (['sfdisk', '--part-type', DISK, par], capture_output=True, text=True).stdout.strip() elif 'LVM' == pttype: fsid = '10000' elif 'ZPOOL' == pttype: fsid = '10010' return fsid #/** # ogGetPartitionSize int_ndisk int_npartition #@brief Muestra el tamano en KB de una particion determinada. #@param int_ndisk nº de orden del disco #@param int_npartition nº de orden de la partición #@return int_partsize - Tamaño en KB de la partición. #@exception OG_ERR_FORMAT formato incorrecto. #@exception OG_ERR_NOTFOUND disco o particion no detectado (no es un dispositivo). #@note Requisitos: sfdisk, awk #*/ ## def ogGetPartitionSize (disk, par): PART = ogDiskToDev (disk, par) if PART is None: return sz = subprocess.run (['partx', '-gbo', 'SIZE', PART], capture_output=True, text=True).stdout.strip() if sz: return int (int (sz) / 1024) sz = subprocess.run (['lvs', '--noheadings', '-o', 'lv_size', '--units', 'k', PART], capture_output=True, text=True).stdout.strip if sz: return int (sz) return FileSystemLib.ogGetFsSize (disk, par) #/** # ogGetPartitionsNumber int_ndisk #@brief Detecta el numero de particiones del disco duro indicado. #@param int_ndisk nº de orden del disco #@return Devuelve el numero paritiones del disco duro indicado #@warning Salidas de errores no determinada #@attention Requisitos: parted #@note Notas sin especificar #*/ ## def ogGetPartitionsNumber(*args): # Variables locales DISK = None # Si se solicita, mostrar ayuda. if len(args) == 1 and args[0] == "help": SystemLib.ogHelp('ogGetPartitionsNumber', 'ogGetPartitionsNumber int_ndisk', 'ogGetPartitionsNumber 1 => 3') return # Error si no se recibe 1 parámetro. if len(args) != 1: SystemLib.ogRaiseError(OG_ERR_FORMAT) return # Contar el número de veces que aparece el disco en su lista de particiones. DISK = ogDiskToDev(args[0]) if DISK is None: return PTTYPE = ogGetPartitionTableType(args[0]) if PTTYPE in ["GPT", "MSDOS"]: output = subprocess.getoutput(f"partx -gso NR {DISK} 2>/dev/null | awk -v p=0 '{{p=$1}} END {{print p}}'") elif PTTYPE == "LVM": output = subprocess.getoutput(f"lvs --noheadings {DISK} 2>/dev/null | wc -l") elif PTTYPE == "ZPOOL": subprocess.run(["zpool", "list"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) # Check if zpool command exists subprocess.run(["modprobe", "zfs"]) # Load zfs module if not already loaded subprocess.run(["zpool", "import", "-f", "-R", "/mnt", "-N", "-a"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) # Import all zpools output = subprocess.getoutput(f"zfs list -Hp -o name,canmount,mountpoint -r $(blkid -s LABEL -o value {DISK}) | awk '$2==\"on\" && $3!=\"none\" {{c++}} END {{print c}}'") else: output = None print(output) return #/** # ogGetPartitionTableType int_ndisk #@brief Devuelve el tipo de tabla de particiones del disco (GPT o MSDOS) #@param int_ndisk nº de orden del disco #@return str_tabletype - Tipo de tabla de paritiones #@warning Salidas de errores no determinada #@note tabletype = { MSDOS, GPT } #@note Requisitos: blkid, parted, vgs #*/ ## def ogGetPartitionTableType (disk): DISK = ogDiskToDev (disk) if DISK is None: return m = os.stat (DISK, follow_symlinks=True).st_mode if stat.S_ISBLK (m): lines = subprocess.run (['parted', '--script', '--machine', DISK, 'print'], capture_output=True, text=True).stdout.splitlines() for l in lines: elems = l.split (':') if DISK == elems[0]: type = elems[5].upper() break # Comprobar si es volumen lógico. if os.path.isdir (DISK) and 0 == subprocess.run (['vgs', DISK]).returncode: type = 'LVM' # Comprobar si es pool de ZFS. if not type or 'UNKNOWN' == type: if 'zfs' in subprocess.run (['blkid', '-s', 'TYPE', DISK], capture_output=True, text=True).stdout: type = 'ZPOOL' return type #/** # ogGetPartitionType int_ndisk int_npartition #@brief Devuelve el mnemonico con el tipo de partición. #@param int_ndisk nº de orden del disco #@param int_npartition nº de orden de la partición #@return Mnemonico #@note Mnemonico: valor devuelto por ogIdToType. #@exception OG_ERR_FORMAT Formato incorrecto. #@exception OG_ERR_NOTFOUND Disco o particion no corresponden con un dispositivo. #*/ ## def ogGetPartitionType(*args): # Variables locales ID = None TYPE = None # Si se solicita, mostrar ayuda. if len(args) == 1 and args[0] == "help": SystemLib.ogHelp('ogGetPartitionType', 'ogGetPartitionType int_ndisk int_npartition', 'ogGetPartitionType 1 1 => NTFS') return # Error si no se reciben 2 parámetros. if len(args) != 2: SystemLib.ogRaiseError(OG_ERR_FORMAT) return # Detectar id. de tipo de partición y codificar al mnemónico. ID = ogGetPartitionId(args[0], args[1]) if ID is None: return TYPE = ogIdToType(ID) print(TYPE) return #/** # ogHidePartition int_ndisk int_npartition #@brief Oculta un apartición visible. #@param int_ndisk nº de orden del disco #@param int_npartition nº de orden de la partición #@return (nada) #@exception OG_ERR_FORMAT formato incorrecto. #@exception OG_ERR_NOTFOUND disco o particion no detectado (no es un dispositivo). #@exception OG_ERR_PARTITION tipo de partición no reconocido. #*/ ## def ogHidePartition(*args): # Variables locales PART = None TYPE = None NEWTYPE = None # Si se solicita, mostrar ayuda. if len(args) == 1 and args[0] == "help": SystemLib.ogHelp('ogHidePartition', 'ogHidePartition int_ndisk int_npartition', 'ogHidePartition 1 1') return # Error si no se reciben 2 parámetros. if len(args) != 2: SystemLib.ogRaiseError(OG_ERR_FORMAT) return # Obtener el dispositivo de la partición. PART = ogDiskToDev(args[0], args[1]) if PART is None: return # Obtener tipo de partición. TYPE = ogGetPartitionType(args[0], args[1]) if TYPE == "NTFS": NEWTYPE = "HNTFS" elif TYPE == "FAT32": NEWTYPE = "HFAT32" elif TYPE == "FAT16": NEWTYPE = "HFAT16" elif TYPE == "FAT12": NEWTYPE = "HFAT12" elif TYPE == "WINDOWS": NEWTYPE = "WIN-RESERV" else: SystemLib.ogRaiseError(OG_ERR_PARTITION, TYPE) return # Cambiar tipo de partición. ogSetPartitionType(args[0], args[1], NEWTYPE) return #/** # ogIdToType int_idpart #@brief Devuelve el identificador correspondiente a un tipo de partición. #@param int_idpart identificador de tipo de partición. #@return str_parttype mnemónico de tipo de partición. #@exception OG_ERR_FORMAT Formato incorrecto. #*/ ## def ogIdToType(ID): # Obtener valor hexadecimal de 4 caracteres rellenado con 0 por delante. ID = ID.zfill(4) TYPE = None # Asignar el tipo de partición según el ID. if ID == "0000": TYPE = "EMPTY" elif ID == "0001": TYPE = "FAT12" elif ID in ["0005", "000f"]: TYPE = "EXTENDED" elif ID in ["0006", "000e"]: TYPE = "FAT16" elif ID == "0007": TYPE = "NTFS" elif ID in ["000b", "000c"]: TYPE = "FAT32" elif ID == "0011": TYPE = "HFAT12" elif ID == "0012": TYPE = "COMPAQDIAG" elif ID in ["0016", "001e"]: TYPE = "HFAT16" elif ID == "0017": TYPE = "HNTFS" elif ID in ["001b", "001c"]: TYPE = "HFAT32" elif ID == "0042": TYPE = "WIN-DYNAMIC" elif ID in ["0082", "8200"]: TYPE = "LINUX-SWAP" elif ID in ["0083", "8300"]: TYPE = "LINUX" elif ID in ["008e", "8E00"]: TYPE = "LINUX-LVM" elif ID in ["00a5", "a503"]: TYPE = "FREEBSD" elif ID == "00a6": TYPE = "OPENBSD" elif ID == "00a7": TYPE = "CACHE" # (compatibilidad con Brutalix) elif ID in ["00af", "af00"]: TYPE = "HFS" elif ID in ["00be", "be00"]: TYPE = "SOLARIS-BOOT" elif ID in ["00bf", "bf00145"]: TYPE = "SOLARIS" elif ID in ["00ca", "ca00"]: TYPE = "CACHE" elif ID == "00da": TYPE = "DATA" elif ID == "00ee": TYPE = "GPT" elif ID in ["00ef", "ef00"]: TYPE = "EFI" elif ID == "00fb": TYPE = "VMFS" elif ID in ["00fd", "fd00"]: TYPE = "LINUX-RAID" elif ID == "0700": TYPE = "WINDOWS" elif ID == "0c01": TYPE = "WIN-RESERV" elif ID == "7f00": TYPE = "CHROMEOS-KRN" elif ID == "7f01": TYPE = "CHROMEOS" elif ID == "7f02": TYPE = "CHROMEOS-RESERV" elif ID == "8301": TYPE = "LINUX-RESERV" elif ID == "a500": TYPE = "FREEBSD-DISK" elif ID == "a501": TYPE = "FREEBSD-BOOT" elif ID == "a502": TYPE = "FREEBSD-SWAP" elif ID == "ab00": TYPE = "HFS-BOOT" elif ID == "af01": TYPE = "HFS-RAID" elif ID == "bf02": TYPE = "SOLARIS-SWAP" elif ID == "bf03": TYPE = "SOLARIS-DISK" elif ID == "ef01": TYPE = "MBR" elif ID == "ef02": TYPE = "BIOS-BOOT" elif ID == "10000": TYPE = "LVM-LV" elif ID == "10010": TYPE = "ZFS-VOL" else: TYPE = "UNKNOWN" return TYPE # ogIsDiskLocked int_ndisk #@brief Comprueba si un disco está bloqueado por una operación de uso exclusivo. #@param int_ndisk nº de orden del disco #@return Código de salida: 0 - bloqueado, 1 - sin bloquear o error. #@note Los ficheros de bloqueo se localizan en \c /var/lock/dev, siendo \c dev el dispositivo de la partición o de su disco, sustituyendo el carácter "/" por "-". #*/ ## def ogIsDiskLocked(*args): # Variables locales DISK = None LOCKFILE = None # Si se solicita, mostrar ayuda. if len(args) == 1 and args[0] == "help": SystemLib.ogHelp('ogIsDiskLocked', 'ogIsDiskLocked int_ndisk', 'if ogIsDiskLocked(1): ...') return # Falso, en caso de error. if len(args) != 1: return False DISK = ogDiskToDev(args[0], 2) if DISK is None: return False # Comprobar existencia de fichero de bloqueo para el disco. LOCKFILE = f"/var/lock/lock{DISK.replace('/', '-')}" return os.path.isfile(LOCKFILE) #/** # ogListPartitions int_ndisk #@brief Lista las particiones definidas en un disco. #@param int_ndisk nº de orden del disco #@return str_parttype:int_partsize ... #@exception OG_ERR_FORMAT formato incorrecto. #@exception OG_ERR_NOTFOUND disco o particion no detectado (no es un dispositivo). #@note Requisitos: \c parted \c awk #@attention El nº de partición se indica por el orden de los párametros \c parttype:partsize #@attention Las tuplas de valores están separadas por espacios. #*/ ## def ogListPartitions(*args): # Variables locales DISK = None PART = None NPARTS = None TYPE = None SIZE = None # Si se solicita, mostrar ayuda. if len(args) == 1 and args[0] == "help": SystemLib.ogHelp('ogListPartitions', 'ogListPartitions int_ndisk', 'ogListPartitions 1 => NTFS:10000000 EXT3:5000000 LINUX-SWAP:1000000') return # Error si no se recibe 1 parámetro. if len(args) != 1: SystemLib.ogRaiseError(OG_ERR_FORMAT) return # Procesar la salida de parted. DISK = ogDiskToDev(args[0]) if DISK is None: return NPARTS = ogGetPartitionsNumber(args[0]) for PART in range(1, NPARTS + 1): TYPE = ogGetPartitionType(args[0], PART) or "EMPTY" SIZE = ogGetPartitionSize(args[0], PART) or 0 print(f"{TYPE}:{SIZE} ", end="") print() return #/** # ogListPrimaryPartitions int_ndisk #@brief Metafunción que lista las particiones primarias no vacías de un disco. #@param int_ndisk nº de orden del disco #@see ogListPartitions #*/ ## def ogListPrimaryPartitions(*args): # Variables locales PTTYPE = None PARTS = None # Si se solicita, mostrar ayuda. if len(args) == 1 and args[0] == "help": SystemLib.ogHelp('ogListPrimaryPartitions', 'ogListPrimaryPartitions int_ndisk', 'ogListPrimaryPartitions 1 => NTFS:10000000 EXT3:5000000 EXTENDED:1000000') return PTTYPE = ogGetPartitionTableType(args[0]) if PTTYPE is None: return PARTS = ogListPartitions(*args) if PARTS is None: return if PTTYPE == "GPT": print(PARTS.rstrip(" EMPTY:0")) elif PTTYPE == "MSDOS": print(PARTS.split(" ")[0:4]) return #/** # ogListLogicalPartitions int_ndisk #@brief Metafunción que lista las particiones lógicas de una tabla tipo MSDOS. #@param int_ndisk nº de orden del disco #@see ogListPartitions #*/ ## def ogListLogicalPartitions(*args): # Variables locales PTTYPE = None PARTS = None # Si se solicita, mostrar ayuda. if len(args) == 1 and args[0] == "help": SystemLib.ogHelp('ogListLogicalPartitions', 'ogListLogicalPartitions int_ndisk', 'ogListLogicalPartitions 1 => LINUX-SWAP:999998') return PTTYPE = ogGetPartitionTableType(args[0]) if PTTYPE is None: return PARTS = ogListPartitions(*args) if PARTS is None: return return PARTS.split(" ")[4:] #/** # ogLockDisk int_ndisk #@brief Genera un fichero de bloqueo para un disco en uso exlusivo. #@param int_ndisk nº de orden del disco #@return (nada) #@exception OG_ERR_FORMAT Formato incorrecto. #@exception OG_ERR_NOTFOUND Disco o particion no corresponden con un dispositivo. #@note El fichero de bloqueo se localiza en \c /var/lock/disk, siendo \c disk el dispositivo del disco, sustituyendo el carácter "/" por "-". #*/ ## def ogLockDisk(*args): # Variables locales DISK = None LOCKFILE = None # Si se solicita, mostrar ayuda. if len(args) == 1 and args[0] == "help": SystemLib.ogHelp('ogLockDisk', 'ogLockDisk int_ndisk', 'ogLockDisk 1') return # Error si no se recibe 1 parámetro. if len(args) != 1: SystemLib.ogRaiseError(OG_ERR_FORMAT) return # Obtener partición. DISK = ogDiskToDev(args[0]) if DISK is None: return # Crear archivo de bloqueo exclusivo. LOCKFILE = f"/var/lock/lock{DISK.replace('/', '-')}" open(LOCKFILE, 'a').close() #/** # ogSetPartitionActive int_ndisk int_npartition #@brief Establece cual es la partición activa de un disco. #@param int_ndisk nº de orden del disco #@param int_npartition nº de orden de la partición #@return (nada). #@exception OG_ERR_FORMAT Formato incorrecto. #@exception OG_ERR_NOTFOUND Disco o partición no corresponden con un dispositivo. #@note Requisitos: parted #*/ ## def ogSetPartitionActive(*args): # Variables locales DISK = None PART = None # Si se solicita, mostrar ayuda. if len(args) == 1 and args[0] == "help": SystemLib.ogHelp('ogSetPartitionActive', 'ogSetPartitionActive int_ndisk int_npartition', 'ogSetPartitionActive 1 1') return # Error si no se reciben 2 parámetros. if len(args) != 2: SystemLib.ogRaiseError(OG_ERR_FORMAT) return # Comprobar que el disco existe y activar la partición indicada. DISK = ogDiskToDev(args[0]) if DISK is None: return PART = ogDiskToDev(args[0], args[1]) if PART is None: return subprocess.run(["parted", "-s", DISK, "set", args[1], "boot", "on"], stderr=subprocess.DEVNULL) return #/** # ogSetPartitionId int_ndisk int_npartition hex_partid #@brief Cambia el identificador de la partición. #@param int_ndisk nº de orden del disco #@param int_npartition nº de orden de la partición #@param hex_partid identificador de tipo de partición #@return (nada) #@exception OG_ERR_FORMAT Formato incorrecto. #@exception OG_ERR_NOTFOUND Disco o partición no corresponden con un dispositivo. #@exception OG_ERR_OUTOFLIMIT Valor no válido. #@exception OG_ERR_PARTITION Error al cambiar el id. de partición. #@attention Requisitos: fdisk, sgdisk #*/ ## def ogSetPartitionId(*args): # Variables locales DISK = None PART = None PTTYPE = None ID = None # Si se solicita, mostrar ayuda. if len(args) == 1 and args[0] == "help": SystemLib.ogHelp('ogSetPartitionId', 'ogSetPartitionId int_ndisk int_npartition hex_partid', 'ogSetPartitionId 1 1 7') return # Error si no se reciben 3 parámetros. if len(args) != 3: SystemLib.ogRaiseError(OG_ERR_FORMAT) return # Sustituye nº de disco y nº partición por su dispositivo. DISK = ogDiskToDev(args[0]) if DISK is None: return PART = ogDiskToDev(args[0], args[1]) if PART is None: return # Error si el id. de partición no es hexadecimal. ID = args[2].upper() if not re.match("^[0-9A-F]+$", ID): SystemLib.ogRaiseError(OG_ERR_OUTOFLIMIT, args[2]) return # Elección del tipo de partición. PTTYPE = ogGetPartitionTableType(args[0]) if PTTYPE == "GPT": subprocess.run(["sgdisk", f"-t{args[1]}:{ID}", DISK], stderr=subprocess.DEVNULL) elif PTTYPE == "MSDOS": subprocess.run(["sfdisk", f"--id", DISK, args[1], ID], stderr=subprocess.DEVNULL) else: SystemLib.ogRaiseError(OG_ERR_OUTOFLIMIT, f"{args[0]},{PTTYPE}") return # MSDOS) Correcto si fdisk sin error o con error pero realiza Syncing if subprocess.run(["partprobe", DISK], stderr=subprocess.DEVNULL).returncode == 0: return else: SystemLib.ogRaiseError(OG_ERR_PARTITION, f"{args[0]},{args[1]},{args[2]}") return #/** # ogSetPartitionSize int_ndisk int_npartition int_size #@brief Muestra el tamano en KB de una particion determinada. #@param int_ndisk nº de orden del disco #@param int_npartition nº de orden de la partición #@param int_size tamaño de la partición (en KB) #@return (nada) #@exception OG_ERR_FORMAT formato incorrecto. #@exception OG_ERR_NOTFOUND disco o particion no detectado (no es un dispositivo). #@note Requisitos: sfdisk, awk #@todo Compruebar que el tamaño sea numérico positivo y evitar que pueda solaparse con la siguiente partición. #*/ ## def ogSetPartitionSize(*args): # Variables locales DISK = None PART = None SIZE = None # Si se solicita, mostrar ayuda. if len(args) == 1 and args[0] == "help": SystemLib.ogHelp('ogSetPartitionSize', 'ogSetPartitionSize int_ndisk int_npartition int_size', 'ogSetPartitionSize 1 1 10000000') return # Error si no se reciben 3 parámetros. if len(args) != 3: SystemLib.ogRaiseError(OG_ERR_FORMAT) return # Obtener el tamaño de la partición. DISK = ogDiskToDev(args[0]) if DISK is None: return PART = ogDiskToDev(args[0], args[1]) if PART is None: return # Convertir tamaño en KB a sectores de 512 B. SIZE = int(args[2]) * 2 # Redefinir el tamaño de la partición. subprocess.run(["sfdisk", "-f", "-uS", f"-N{args[1]}", DISK], input=f",{SIZE}", text=True, stderr=subprocess.DEVNULL) subprocess.run(["partprobe", DISK], stderr=subprocess.DEVNULL) return #/** # ogSetPartitionType int_ndisk int_npartition str_type #@brief Cambia el identificador de la partición. #@param int_ndisk nº de orden del disco #@param int_npartition nº de orden de la partición #@param str_type mnemónico de tipo de partición #@return (nada) #@attention Requisitos: fdisk, sgdisk #*/ ## def ogSetPartitionType(*args): # Variables locales DISK = None PART = None PTTYPE = None TYPE = None # Si se solicita, mostrar ayuda. if len(args) == 1 and args[0] == "help": SystemLib.ogHelp('ogSetPartitionType', 'ogSetPartitionType int_ndisk int_npartition str_type', 'ogSetPartitionType 1 1 NTFS') return # Error si no se reciben 3 parámetros. if len(args) != 3: SystemLib.ogRaiseError(OG_ERR_FORMAT) return # Sustituye nº de disco por su dispositivo. DISK = ogDiskToDev(args[0]) if DISK is None: return PART = ogDiskToDev(args[0], args[1]) if PART is None: return # Elección del tipo de partición. PTTYPE = ogGetPartitionTableType(args[0]) if PTTYPE is None: return TYPE = args[2] ID = ogTypeToId(TYPE, PTTYPE) if ID is None: SystemLib.ogRaiseError(OG_ERR_FORMAT, f"{TYPE},{PTTYPE}") return ogSetPartitionId(args[0], args[1], ID) return #/** # ogTypeToId str_parttype [str_tabletype] #@brief Devuelve el identificador correspondiente a un tipo de partición. #@param str_parttype mnemónico de tipo de partición. #@param str_tabletype mnemónico de tipo de tabla de particiones (MSDOS por defecto). #@return int_idpart identificador de tipo de partición. #@exception OG_ERR_FORMAT Formato incorrecto. #@note tabletype = { MSDOS, GPT }, (MSDOS, por defecto) #*/ ## #ogTypeToId ('LINUX') => "83" #ogTypeToId ('LINUX', 'MSDOS') => "83" def ogTypeToId (type, pttype='MSDOS'): data = { 'GPT': { 'EMPTY': '0', 'WINDOWS': '0700', 'NTFS': '0700', 'EXFAT': '0700', 'FAT32': '0700', 'FAT16': '0700', 'FAT12': '0700', 'HNTFS': '0700', 'HFAT32': '0700', 'HFAT16': '0700', 'HFAT12': '0700', 'WIN-RESERV': '0C01', 'CHROMEOS-KRN': '7F00', 'CHROMEOS': '7F01', 'CHROMEOS-RESERV': '7F02', 'LINUX-SWAP': '8200', 'LINUX': '8300', 'EXT2': '8300', 'EXT3': '8300', 'EXT4': '8300', 'REISERFS': '8300', 'REISER4': '8300', 'XFS': '8300', 'JFS': '8300', 'LINUX-RESERV': '8301', 'LINUX-LVM': '8E00', 'FREEBSD-DISK': 'A500', 'FREEBSD-BOOT': 'A501', 'FREEBSD-SWAP': 'A502', 'FREEBSD': 'A503', 'HFS-BOOT': 'AB00', 'HFS': 'AF00', 'HFS+': 'AF00', 'HFSPLUS': 'AF00', 'HFS-RAID': 'AF01', 'SOLARIS-BOOT': 'BE00', 'SOLARIS': 'BF00', 'SOLARIS-SWAP': 'BF02', 'SOLARIS-DISK': 'BF03', 'CACHE': 'CA00', 'EFI': 'EF00', 'LINUX-RAID': 'FD00', }, 'MSDOS': { 'EMPTY': '0', 'FAT12': '1', 'EXTENDED': '5', 'FAT16': '6', 'WINDOWS': '7', 'NTFS': '7', 'EXFAT': '7', 'FAT32': 'b', 'HFAT12': '11', 'HFAT16': '16', 'HNTFS': '17', 'HFAT32': '1b', 'LINUX-SWAP': '82', 'LINUX': '83', 'EXT2': '83', 'EXT3': '83', 'EXT4': '83', 'REISERFS': '83', 'REISER4': '83', 'XFS': '83', 'JFS': '83', 'LINUX-LVM': '8e', 'FREEBSD': 'a5', 'OPENBSD': 'a6', 'HFS': 'af', 'HFS+': 'af', 'SOLARIS-BOOT': 'be', 'SOLARIS': 'bf', 'CACHE': 'ca', 'DATA': 'da', 'GPT': 'ee', 'EFI': 'ef', 'VMFS': 'fb', 'LINUX-RAID': 'fd', }, 'LVM': { 'LVM-LV': '10000', }, 'ZVOL': { 'ZFS-VOL': '10010', }, } if pttype.upper() not in data: return None if type.upper() not in data[pttype.upper()]: return None return data[pttype.upper()][type.upper()] #/** # ogUnhidePartition int_ndisk int_npartition #@brief Hace visible una partición oculta. #@param int_ndisk nº de orden del disco #@param int_npartition nº de orden de la partición #@return (nada) #@exception OG_ERR_FORMAT formato incorrecto. #@exception OG_ERR_NOTFOUND disco o particion no detectado (no es un dispositivo). #@exception OG_ERR_PARTITION tipo de partición no reconocido. #*/ ## def ogUnhidePartition(*args): # Variables locales PART = None TYPE = None NEWTYPE = None # Si se solicita, mostrar ayuda. if len(args) == 1 and args[0] == "help": SystemLib.ogHelp('ogUnhidePartition', 'ogUnhidePartition int_ndisk int_npartition', 'ogUnhidePartition 1 1') return # Error si no se reciben 2 parámetros. if len(args) != 2: SystemLib.ogRaiseError(OG_ERR_FORMAT) return PART = ogDiskToDev(args[0], args[1]) if PART is None: return # Obtener tipo de partición. TYPE = ogGetPartitionType(args[0], args[1]) if TYPE == "HNTFS": NEWTYPE = "NTFS" elif TYPE == "HFAT32": NEWTYPE = "FAT32" elif TYPE == "HFAT16": NEWTYPE = "FAT16" elif TYPE == "HFAT12": NEWTYPE = "FAT12" elif TYPE == "WIN-RESERV": NEWTYPE = "WINDOWS" else: SystemLib.ogRaiseError(OG_ERR_PARTITION, TYPE) return # Cambiar tipo de partición. ogSetPartitionType(args[0], args[1], NEWTYPE) return #/** # ogUnlockDisk int_ndisk #@brief Elimina el fichero de bloqueo para un disco. #@param int_ndisk nº de orden del disco #@return (nada) #@exception OG_ERR_FORMAT Formato incorrecto. #@exception OG_ERR_NOTFOUND Disco o particion no corresponden con un dispositivo. #@note El fichero de bloqueo se localiza en \c /var/lock/disk, siendo \c disk el dispositivo del disco, sustituyendo el carácter "/" por "-". #*/ ## def ogUnlockDisk(*args): # Variables locales DISK = None LOCKFILE = None # Si se solicita, mostrar ayuda. if len(args) == 1 and args[0] == "help": SystemLib.ogHelp('ogUnlockDisk', 'ogUnlockDisk int_ndisk', 'ogUnlockDisk 1') return # Error si no se recibe 1 parámetro. if len(args) != 1: SystemLib.ogRaiseError(OG_ERR_FORMAT) return # Obtener partición. DISK = ogDiskToDev(args[0]) if DISK is None: return # Borrar archivo de bloqueo exclusivo. LOCKFILE = f"/var/lock/lock{DISK.replace('/', '-')}" os.remove(LOCKFILE) return #/** # ogUpdatePartitionTable #@brief Fuerza al kernel releer la tabla de particiones de los discos duros #@param no requiere #@return informacion propia de la herramienta #@note Requisitos: \c partprobe #@warning pendiente estructurar la funcion a opengnsys #*/ ## def ogUpdatePartitionTable(): for disk in ogDiskToDev(): subprocess.run(["partprobe", disk])