import filecmp import subprocess import shutil import os import stat from pathlib import Path from CacheLib import * from FileSystemLib import * from SystemLib import * print (">>>>>>>>>>>>>>>>>>>> Load ", __name__, " <<<<<<<<<<<<<<<<<<<<<<") def parted(*args): parted_path = shutil.which("parted") if parted_path: try: result = subprocess.run( [parted_path] + list(args), timeout=3, capture_output=True, text=True ) return result.stdout except subprocess.TimeoutExpired: return "Error: Command 'parted' timed out" else: return "Error: 'parted' command not found" def ogCreatePartitions(*args): # Variables locales ND = DISK = PTTYPE = PART = SECTORS = START = SIZE = TYPE = CACHEPART = None IODISCO = IOSIZE = CACHESIZE = EXTSTART = EXTSIZE = NVME_PREFIX = tmpsfdisk = None # Si se solicita, mostrar ayuda. if len(args) == 1 and args[0] == "help": ogHelp('ogCreatePartitions', 'ogCreatePartitions int_ndisk str_parttype:int_partsize ...', 'ogCreatePartitions 1 NTFS:10000000 EXT3:5000000 LINUX-SWAP:1000000') return # Error si no se reciben al menos 2 parámetros. if len(args) < 2: ogRaiseError(OG_ERR_FORMAT) return # Nº total de sectores, para evitar desbordamiento (evitar redondeo). ND = args[0] DISK = ogDiskToDev(ND) if DISK is None: return PTTYPE = ogGetPartitionTableType(ND) or "MSDOS" # Por defecto para discos vacíos. if PTTYPE == "GPT": ogCreateGptPartitions(*args) return elif PTTYPE != "MSDOS": ogRaiseError(OG_ERR_PARTITION, PTTYPE) return SECTORS = ogGetLastSector(ND) # Se recalcula el nº de sectores del disco 1, si existe partición de caché. CACHEPART = ogFindCache() if CACHEPART and ND == CACHEPART.split()[0]: CACHESIZE = int(ogGetCacheSize()) * 2 # Sector de inicio (la partición 1 empieza en el sector 63). IODISCO = ogDiskToDev(ND) IOSIZE = subprocess.getoutput(f"fdisk -l {IODISCO} | awk '/I\\/O/ {{print $4}}'") if IOSIZE == "4096": START = 4096 SECTORS -= 8192 if CACHESIZE: SECTORS = SECTORS - CACHESIZE + 2048 - (SECTORS - CACHESIZE) % 2048 - 1 else: START = 63 if CACHESIZE: SECTORS -= CACHESIZE PART = 1 # Fichero temporal de entrada para "sfdisk" tmpsfdisk = f"/tmp/sfdisk{os.getpid()}" try: with open(tmpsfdisk, 'w') as f: f.write("unit: sectors\n\n") NVME_PREFIX = "p" if "nvme" in DISK else "" # Generar fichero de entrada para "sfdisk" con las particiones. args = args[1:] # Shift while args: # Conservar los datos de la partición de caché. if f"{ND} {PART}" == CACHEPART and CACHESIZE: with open(tmpsfdisk, 'a') as f: f.write(f"{DISK}{NVME_PREFIX}{PART} : start={SECTORS + 1}, size={CACHESIZE}, Id=ca\n") PART += 1 # Leer formato de cada parámetro - Tipo:Tamaño TYPE = args[0].split(":")[0] SIZE = int(args[0].split(":")[1]) # Obtener identificador de tipo de partición válido. ID = ogTypeToId(TYPE, "MSDOS") with open(tmpsfdisk, 'a') as f: f.write(f"{DISK}{NVME_PREFIX}{PART} : start={START}, size={SIZE}, Id={ID}\n") START += SIZE SECTORS -= SIZE PART += 1 args = args[1:] # Ejecutar "sfdisk" con el fichero temporal. subprocess.run(["sfdisk", DISK], input=open(tmpsfdisk, 'r').read(), text=True) subprocess.run(["partprobe", DISK]) finally: # Eliminar fichero temporal. if os.path.exists(tmpsfdisk): os.remove(tmpsfdisk) if CACHESIZE: ogMountCache() return 0 def ogCreateGptPartitions(*args): # Variables locales ND = DISK = PART = SECTORS = ALIGN = START = SIZE = TYPE = CACHEPART = CACHESIZE = DELOPTIONS = OPTIONS = None # Si se solicita, mostrar ayuda. if len(args) == 1 and args[0] == "help": ogHelp('ogCreateGptPartitions', 'ogCreateGptPartitions int_ndisk str_parttype:int_partsize ...', 'ogCreateGptPartitions 1 NTFS:10000000 EXT3:5000000 LINUX-SWAP:1000000') return # Error si no se reciben menos de 2 parámetros. if len(args) < 2: ogRaiseError(OG_ERR_FORMAT) return # Nº total de sectores, para evitar desbordamiento (evitar redondeo). ND = args[0] DISK = ogDiskToDev(ND) if DISK is None: return # Se calcula el ultimo sector del disco (total de sectores usables) SECTORS = ogGetLastSector(ND) # Se recalcula el nº de sectores del disco si existe partición de caché. CACHEPART = ogFindCache() if ND == CACHEPART.split()[0]: CACHESIZE = int(ogGetCacheSize()) * 2 if CACHESIZE: SECTORS -= CACHESIZE # Si el disco es GPT empieza en el sector 2048 por defecto, pero podria cambiarse ALIGN = subprocess.getoutput(f"sgdisk -D {DISK} 2>/dev/null") START = ALIGN PART = 1 # Leer parámetros con definición de particionado. args = args[1:] # Shift while args: # Si PART es la cache, nos la saltamos y seguimos con el siguiente numero para conservar los datos de la partición de caché. if f"{ND} {PART}" == CACHEPART and CACHESIZE: PART += 1 # Leer formato de cada parámetro - Tipo:Tamaño TYPE = args[0].split(":")[0] SIZE = int(args[0].split(":")[1]) # Error si la partición es extendida (no válida en discos GPT). if TYPE == "EXTENDED": ogRaiseError(OG_ERR_PARTITION, "EXTENDED") return # Comprobar si existe la particion actual, capturamos su tamaño para ver si cambio o no PARTSIZE = ogGetPartitionSize(ND, PART) # En sgdisk no se pueden redimensionar las particiones, es necesario borrarlas y volver a crealas if PARTSIZE: DELOPTIONS += f" -d{PART}" # Creamos la particion # Obtener identificador de tipo de partición válido. ID = ogTypeToId(TYPE, "GPT") if TYPE != "CACHE" and ID: OPTIONS += f" -n{PART}:{START}:+{SIZE} -t{PART}:{ID} " START += SIZE # Error si se supera el nº total de sectores. if START > SECTORS: ogRaiseError(OG_ERR_FORMAT, f"{START//2} > {SECTORS//2}") return PART += 1 args = args[1:] # Desmontar los sistemas de archivos del disco antes de realizar las operaciones. ogUnmountAll(ND) if CACHESIZE: ogUnmountCache() # Si la tabla de particiones no es valida, volver a generarla. ogCreatePartitionTable(ND) # Definir particiones y notificar al kernel. # Borramos primero las particiones y luego creamos las nuevas subprocess.run(["sgdisk"] + DELOPTIONS.split() + OPTIONS.split() + [DISK], stderr=subprocess.DEVNULL) subprocess.run(["partprobe", DISK], stderr=subprocess.DEVNULL) if CACHESIZE: ogMountCache() return 0 def ogCreatePartitionTable(*args): # Variables locales DISK = PTTYPE = CREATE = CREATEPTT = None # Si se solicita, mostrar ayuda. if len(args) == 1 and args[0] == "help": ogHelp('ogCreatePartitionTable', 'ogCreatePartitionTable int_ndisk [str_partype]', 'ogCreatePartitionTable 1 GPT', 'ogCreatePartitionTable 1') return # Error si no se reciben 1 o 2 parámetros. if len(args) == 1: CREATEPTT = "" elif len(args) == 2: CREATEPTT = args[1] else: ogRaiseError(OG_ERR_FORMAT) return # Capturamos el tipo de tabla de particiones actual DISK = ogDiskToDev(args[0]) if DISK is None: return PTTYPE = ogGetPartitionTableType(args[0]) or "MSDOS" # Por defecto para discos vacíos. CREATEPTT = CREATEPTT or PTTYPE # Si la tabla actual y la que se indica son iguales, se comprueba si hay que regenerarla. if CREATEPTT == PTTYPE: if PTTYPE == "GPT": try: result = subprocess.run( ["sgdisk", "-p", DISK], stderr=subprocess.PIPE, stdout=subprocess.DEVNULL ) if result.returncode != 0: CREATE = "GPT" except subprocess.CalledProcessError: CREATE = "GPT" elif PTTYPE == "MSDOS": try: result = subprocess.run( ["parted", "-s", DISK, "print"], stderr=subprocess.PIPE, stdout=subprocess.DEVNULL ) if result.returncode != 0: CREATE = "MSDOS" except subprocess.CalledProcessError: CREATE = "MSDOS" else: CREATE = CREATEPTT # Dependiendo del valor de CREATE, creamos la tabla de particiones en cada caso. if CREATE == "GPT": if PTTYPE == "MSDOS": subprocess.run(["sgdisk", "-go", DISK]) else: subprocess.run(["gdisk", DISK], input="2\nw\nY\n", text=True) subprocess.run(["partprobe", DISK], stderr=subprocess.DEVNULL) elif CREATE == "MSDOS": if PTTYPE == "GPT": subprocess.run(["sgdisk", "-Z", DISK]) subprocess.run(["fdisk", DISK], input="o\nn\np\n\n\n\nd\n\nw", text=True) subprocess.run(["partprobe", DISK], stderr=subprocess.DEVNULL) return def ogDeletePartitionTable(*args): # Variables locales DISK = None # Si se solicita, mostrar ayuda. if len(args) == 1 and args[0] == "help": ogHelp('ogDeletePartitionTable', 'ogDeletePartitionTable int_ndisk', 'ogDeletePartitionTable 1') return # Error si no se reciben 1 parámetros. if len(args) != 1: ogRaiseError(OG_ERR_FORMAT) return # Obteniendo Identificador linux del disco. DISK = ogDiskToDev(args[0]) if DISK is None: return # Crear una tabla de particiones vacía. PTTYPE = ogGetPartitionTableType(args[0]) if PTTYPE == "GPT": subprocess.run(["sgdisk", "-o", DISK]) elif PTTYPE == "MSDOS": subprocess.run(["fdisk", DISK], input="o\nw", text=True) return def ogDevToDisk(dev): # Variables locales CACHEFILE = "/var/cache/disks.cfg" PART = None n = 1 # Si se solicita, mostrar ayuda. if dev == "help": ogHelp("ogDevToDisk", "ogDevToDisk path_device | LABEL=str_label | UUID=str_uuid", "ogDevToDisk /dev/sda => 1", "ogDevToDisk /dev/sda1 => 1 1", "ogDevToDisk LABEL=CACHE => 1 4") return # Error si no se recibe 1 parámetro. if len(dev) != 1: ogRaiseError(OG_ERR_FORMAT) return # Obtener dispositivo a partir de camino, etiqueta o UUID. DEV = dev[0] if DEV.startswith("LABEL="): DEV = subprocess.getoutput(f"blkid -L {DEV[6:]}") elif DEV.startswith("PARTLABEL="): DEV = subprocess.getoutput(f"realpath /dev/disk/by-partlabel/{DEV[11:]} 2>/dev/null") elif DEV.startswith("PARTUUID="): DEV = subprocess.getoutput(f"realpath /dev/disk/by-partuuid/{DEV[10:]} 2>/dev/null") elif DEV.startswith("UUID="): DEV = subprocess.getoutput(f"blkid -U {DEV[5:]}") # Error si no es fichero de bloques o directorio (para LVM). if not os.path.exists(DEV): ogRaiseError(OG_ERR_NOTFOUND, dev) return # Buscar en fichero de caché de discos. if os.path.exists(CACHEFILE): with open(CACHEFILE, 'r') as f: for line in f: parts = line.strip().split(':') if len(parts) == 2 and parts[1] == DEV: PART = parts[0] break if PART: return PART # Si no se encuentra, procesa todos los discos para devolver su nº de orden y de partición. disks = ogDiskToDev() for d in disks: NVME_PREFIX = "p" if "nvme" in d else "" if DEV.startswith(d): return f"{n} {DEV[len(d) + len(NVME_PREFIX):]}" n += 1 ogRaiseError(OG_ERR_NOTFOUND, dev) return OG_ERR_NOTFOUND def _getAllDisks(): ret = [] all_disks = subprocess.run("lsblk -n -e 1,2 -x MAJ:MIN 2>/dev/null || lsblk -n -e 1,2", shell=True, capture_output=True, text=True).stdout.splitlines() for line in all_disks: parts = line.split() if parts[5] == "disk": parts[0].replace ('!', '/') ret.append(f"/dev/{parts[0]}") return ret def _getAllVolGroups(): vgs = subprocess.run(['vgs', '-a', '--noheadings'], capture_output=True, text=True).stdout.splitlines() ret = [f"/dev/{line.split()[0]}" for line in vgs] return ret def _getMPath(): ret = alldisks2remove = [] try: mpath = subprocess.run(['multipath', '-l', '-v', '1'], capture_output=True, text=True).stdout.splitlines() ret = [f"/dev/mapper/{line.split()[0]}" for line in mpath] for line in subprocess.getoutput("multipath -ll").splitlines(): if line.split()[5] == "ready": alldisks2remove.append (f"/dev/{line.split()[2]}") except FileNotFoundError: pass except subprocess.CalledProcessError: pass return ret, alldisks2remove def _getAllZFSVols(): zfsvols = subprocess.run(['blkid'], capture_output=True, text=True).stdout.splitlines() return [line.split(":")[0] for line in zfsvols if "zfs" in line] #/** # ogDiskToDev [int_ndisk [int_npartition]] #@brief Devuelve la equivalencia entre el nº de orden del dispositivo (dicso o partición) y el nombre de fichero de dispositivo correspondiente. #@param int_ndisk nº de orden del disco #@param int_npartition nº de orden de la partición #@return Para 0 parametros: Devuelve los nombres de ficheros de los dispositivos sata/ata/usb linux encontrados. #@return Para 1 parametros: Devuelve la ruta del disco duro indicado. #@return Para 2 parametros: Devuelve la ruta de la particion indicada. #@exception OG_ERR_FORMAT Formato incorrecto. #@exception OG_ERR_NOTFOUND Dispositivo no detectado. #*/ ## def ogDiskToDev (arg_disk=None, arg_part=None): CACHEFILE = "/var/cache/disks.cfg" try: if arg_part != None: arg_part = int (arg_part) except: ogRaiseError([], ogGlobals.OG_ERR_FORMAT, f"{arg_disk} {arg_part}") return try: if arg_disk != None: arg_disk = int (arg_disk) except: ogRaiseError([], ogGlobals.OG_ERR_FORMAT, arg_disk) return # Borrar fichero de caché de configuración si hay cambios en las particiones. proc_partitions = Path ('/proc/partitions').read_text() tmp_partitions = Path ('/tmp/.partitions').read_text() if os.path.exists ('/tmp/.partitions') else '' if proc_partitions != tmp_partitions: # Guardar copia de las particiones definidas para comprobar cambios. shutil.copy2('/proc/partitions', '/tmp/.partitions') try: os.remove(CACHEFILE) except FileNotFoundError: pass # Si existe una correspondencia con disco/dispositivo en el caché; mostrarlo y salir. if arg_disk and os.path.exists (CACHEFILE): with open(CACHEFILE, 'r') as f: args_joined = ' '.join (map (str, filter (None, [arg_disk,arg_part]))) for line in f: parts = line.strip().split(':') if len(parts) == 2 and parts[0] == args_joined: return parts[1] # Continuar para detectar nuevos dispositivos. ALLDISKS = _getAllDisks() VOLGROUPS = _getAllVolGroups() ALLDISKS += VOLGROUPS MPATH, ALLDISKS_to_remove =_getMPath() for d in ALLDISKS_to_remove: if d in ALLDISKS: ALLDISKS.remove (d) ZFSVOLS = _getAllZFSVols() ALLDISKS += ZFSVOLS # No params: return all disks if arg_disk is None: return " ".join(ALLDISKS) # arg_disk is set: return it if arg_part is None: if arg_disk > len (ALLDISKS): ogRaiseError([], ogGlobals.OG_ERR_NOTFOUND, arg_disk) return disk = ALLDISKS[arg_disk-1] if not os.path.exists(disk): ogRaiseError([], ogGlobals.OG_ERR_NOTFOUND, arg_disk) return # Actualizar caché de configuración y mostrar dispositivo. with open(CACHEFILE, 'a') as f: f.write(f"{arg_disk}:{disk}\n") return disk # arg_disk and arg_part are set: there are several possibilities disk = ALLDISKS[arg_disk-1] if not os.path.exists(disk): ogRaiseError([], ogGlobals.OG_ERR_NOTFOUND, arg_disk) return # Plain partition part = f"{disk}{arg_part}" if os.path.exists(part): # Actualizar caché de configuración y mostrar dispositivo. with open(CACHEFILE, 'a') as f: f.write(f"{arg_disk} {arg_part}:{part}\n") return part # RAID/Multipath (tener en cuenta enlace simbólico). part = f"{disk}p{arg_part}" if os.path.exists(part) and stat.S_ISBLK (os.stat (part, follow_symlinks=True).st_mode): # Actualizar caché de configuración y mostrar dispositivo. with open(CACHEFILE, 'a') as f: f.write(f"{arg_disk} {arg_part}:{part}\n") return part part = "" # Logical volume if disk in VOLGROUPS: lvscan = subprocess.run (['lvscan', '-a'], capture_output=True, text=True).stdout.splitlines() i = 0 for line in lvscan: parts = line.split("'") if parts[1].startswith(f"{disk}/") and i == arg_part: part = parts[1] break i += 1 # ZFS volume if disk in ZFSVOLS: subprocess.run(["zpool", "import", "-f", "-R", "/mnt", "-N", "-a"]) zpool = subprocess.run(['blkid', '-s', 'LABEL', '-o', 'value', disk], capture_output=True, text=True).stdout zfs_list = subprocess.run(['zfs', 'list', '-Hp', '-o', 'name,canmount,mountpoint', '-r', zpool], capture_output=True, text=True).stdout.splitlines() i = 0 for line in zfs_list: parts = line.split() if parts[1] == "on" and parts[2] != "none": if i == int(args[1]): part = parts[0] break i += 1 if not part: ogRaiseError([], ogGlobals.OG_ERR_NOTFOUND, f"{arg_disk} {arg_part}") return # Actualizar caché de configuración y mostrar dispositivo. with open(CACHEFILE, 'a') as f: f.write(f"{arg_disk} {arg_part}:{part}\n") return part def ogGetDiskSize(*args): # Variables locales DISK = SIZE = None # Si se solicita, mostrar ayuda. if len(args) == 1 and args[0] == "help": ogHelp('ogGetDiskSize', 'ogGetDiskSize int_ndisk', 'ogGetDiskSize 1 => 244198584') return # Error si no se recibe 1 parámetro. if len(args) != 1: ogRaiseError(OG_ERR_FORMAT) return # Obtener el tamaño del disco. DISK = ogDiskToDev(args[0]) if DISK is None: return SIZE = subprocess.getoutput(f"lsblk -n -b -o SIZE {DISK}") # Mostrar salida. if SIZE: print(SIZE) return def ogGetDiskType(*args): # Variables locales DEV = MAJOR = TYPE = None # Si se solicita, mostrar ayuda. if len(args) == 1 and args[0] == "help": ogHelp('ogGetDiskType', 'ogGetDiskType path_device', 'ogGetDiskType /dev/sdb => USB') return # Error si no se recibe 1 parámetro. if len(args) != 1: ogRaiseError(OG_ERR_FORMAT) return # Obtener el driver del dispositivo de bloques. DEV = args[0].split("/dev/")[1] MAJOR = subprocess.getoutput(f"awk -v D='{DEV}' '{{if ($4==D) print $1;}}' /proc/partitions") TYPE = subprocess.getoutput(f"awk -v D={MAJOR} '/Block/ {{bl=1}} {{if ($1==D&&bl) print toupper($2)}}' /proc/devices") # Devolver mnemónico del driver de dispositivo. if TYPE == "SD": TYPE = "DISK" if subprocess.getoutput(f"udevadm info -q property {args[0]} 2>/dev/null | grep -q '^ID_BUS=usb'"): TYPE = "USB" elif TYPE == "BLKEXT": TYPE = "NVM" elif TYPE == "SR" or TYPE.startswith("IDE"): TYPE = "CDROM" # FIXME Comprobar discos IDE. elif TYPE == "MD" or TYPE.startswith("CCISS"): TYPE = "RAID" elif TYPE == "DEVICE-MAPPER": TYPE = "MAPPER" # FIXME Comprobar LVM y RAID. print(TYPE) return def ogGetEsp(): for d in subprocess.getoutput("blkid -o device").split(): # Previene error para /dev/loop0 PART = ogDevToDisk([d]) # En discos NVMe blkid devuelve una salida del tipo: # >/dev/loop0 # >/dev/nvme0n1 # >/dev/nvme0n1p1 # al analizar la particion nvme0n1, PART solo tiene un argumento y hace que ogGetPartitionId lance un error LEN = len(PART) if LEN > 1: if ogGetPartitionId(PART) == ogTypeToId("EFI", "GPT"): return PART return None def ogGetLastSector(*args): # Variables locales DISK = None PART = None LASTSECTOR = None # Si se solicita, mostrar ayuda. if len(args) == 1 and args[0] == "help": ogHelp("ogGetLastSector", "ogGetLastSector int_ndisk [int_npart]", "ogGetLastSector 1 => 488392064", "ogGetLastSector 1 1 => 102400062") return # Obtener último sector. if len(args) == 1: # Para un disco. DISK = ogDiskToDev(args[0]) if DISK is None: return LASTSECTOR = subprocess.getoutput(f"sgdisk -p {DISK} | awk '/last usable sector/ {{print($(NF))}}'") elif len(args) == 2: # Para una partición. DISK = ogDiskToDev(args[0]) if DISK is None: return PART = ogDiskToDev(args[0], args[1]) if PART is None: return LASTSECTOR = subprocess.getoutput(f"sgdisk -p {DISK} | awk -v P='{args[1]}' '{{if ($1==P) print $3}}'") else: # Error si se reciben más parámetros. ogRaiseError(OG_ERR_FORMAT) return print(LASTSECTOR) return def ogGetPartitionActive(*args): # Variables locales DISK = None # Si se solicita, mostrar ayuda. if len(args) == 1 and args[0] == "help": ogHelp('ogGetPartitionActive', 'ogGetPartitionActive int_ndisk', 'ogGetPartitionActive 1 => 1') return # Error si no se recibe 1 parámetro. if len(args) != 1: ogRaiseError(OG_ERR_FORMAT) return # Comprobar que el disco existe y listar su partición activa. DISK = ogDiskToDev(args[0]) if DISK is None: return output = subprocess.getoutput(f"parted -sm {DISK} print 2>/dev/null | awk -F: '$7~/boot/ {{print $1}}'") print(output) return def ogGetPartitionId(*args): # Variables locales DISK = None ID = None # Si se solicita, mostrar ayuda. if len(args) == 1 and args[0] == "help": ogHelp('ogGetPartitionId', 'ogGetPartitionId int_ndisk int_npartition', 'ogGetPartitionId 1 1 => 7') return # Error si no se reciben 2 parámetros. if len(args) != 2: ogRaiseError(OG_ERR_FORMAT) return # Detectar y mostrar el id. de tipo de partición. DISK = ogDiskToDev(args[0]) if DISK is None: return PTTYPE = ogGetPartitionTableType(args[0]) if PTTYPE == "GPT": ID = subprocess.getoutput(f"sgdisk -p {DISK} 2>/dev/null | awk -v p={args[1]} '{{if ($1==p) print $6;}}'") if ID == "8300" and f"{args[0]} {args[1]}" == ogFindCache(): ID = "CA00" elif PTTYPE == "MSDOS": ID = subprocess.getoutput(f"sfdisk --id {DISK} {args[1]} 2>/dev/null") elif PTTYPE == "LVM": ID = "10000" elif PTTYPE == "ZPOOL": ID = "10010" print(ID) return def ogGetPartitionSize(*args): # Variables locales PART = None SIZE = None # Si se solicita, mostrar ayuda. if len(args) == 1 and args[0] == "help": ogHelp('ogGetPartitionSize', 'ogGetPartitionSize int_ndisk int_npartition', 'ogGetPartitionSize 1 1 => 10000000') return # Error si no se reciben 2 parámetros. if len(args) != 2: ogRaiseError(OG_ERR_FORMAT) return # Devolver tamaño de partición, del volumen lógico o del sistema de archivos (para ZFS). PART = ogDiskToDev(args[0], args[1]) if PART is None: return SIZE = subprocess.getoutput(f"partx -gbo SIZE {PART} 2>/dev/null | awk '{{print int($1/1024)}}'") if not SIZE: SIZE = subprocess.getoutput(f"lvs --noheadings -o lv_size --units k {PART} | awk '{{printf \"%d\",$0}}'") if not SIZE: SIZE = ogGetFsSize(args[0], args[1]) print(SIZE or 0) return def ogGetPartitionsNumber(*args): # Variables locales DISK = None # Si se solicita, mostrar ayuda. if len(args) == 1 and args[0] == "help": ogHelp('ogGetPartitionsNumber', 'ogGetPartitionsNumber int_ndisk', 'ogGetPartitionsNumber 1 => 3') return # Error si no se recibe 1 parámetro. if len(args) != 1: ogRaiseError(OG_ERR_FORMAT) return # Contar el número de veces que aparece el disco en su lista de particiones. DISK = ogDiskToDev(args[0]) if DISK is None: return PTTYPE = ogGetPartitionTableType(args[0]) if PTTYPE in ["GPT", "MSDOS"]: output = subprocess.getoutput(f"partx -gso NR {DISK} 2>/dev/null | awk -v p=0 '{{p=$1}} END {{print p}}'") elif PTTYPE == "LVM": output = subprocess.getoutput(f"lvs --noheadings {DISK} 2>/dev/null | wc -l") elif PTTYPE == "ZPOOL": subprocess.run(["zpool", "list"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) # Check if zpool command exists subprocess.run(["modprobe", "zfs"]) # Load zfs module if not already loaded subprocess.run(["zpool", "import", "-f", "-R", "/mnt", "-N", "-a"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) # Import all zpools output = subprocess.getoutput(f"zfs list -Hp -o name,canmount,mountpoint -r $(blkid -s LABEL -o value {DISK}) | awk '$2==\"on\" && $3!=\"none\" {{c++}} END {{print c}}'") else: output = None print(output) return def ogGetPartitionTableType(*args): # Variables locales DISK = None TYPE = None # Si se solicita, mostrar ayuda. if len(args) == 1 and args[0] == "help": ogHelp('ogGetPartitionTableType', 'ogGetPartitionTableType int_ndisk', 'ogGetPartitionTableType 1 => MSDOS') return # Error si no se recibe 1 parámetro. if len(args) != 1: ogRaiseError(OG_ERR_FORMAT) return # Sustituye n de disco por su dispositivo. DISK = ogDiskToDev(args[0]) if DISK is None: return # Comprobar tabla de particiones. if os.path.exists(DISK): output = subprocess.getoutput(f"parted -sm {DISK} print 2>/dev/null | awk -F: -v D={DISK} '{{ if($1 == D) print toupper($6)}}'") if not output: output = subprocess.getoutput(f"parted -sm {DISK} print 2>/dev/null | awk -F: -v D={DISK} '{{ if($1 == D) print toupper($6)}}'") # Comprobar si es volumen lógico. if os.path.isdir(DISK) and subprocess.run(["vgs", DISK], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL).returncode == 0: output = "LVM" # Comprobar si es pool de ZFS. if not output or output == "UNKNOWN": blkid_output = subprocess.getoutput(f"blkid -s TYPE {DISK} | grep zfs") if blkid_output: output = "ZPOOL" # Mostrar salida. if output: print(output) return def ogGetPartitionType(*args): # Variables locales ID = None TYPE = None # Si se solicita, mostrar ayuda. if len(args) == 1 and args[0] == "help": ogHelp('ogGetPartitionType', 'ogGetPartitionType int_ndisk int_npartition', 'ogGetPartitionType 1 1 => NTFS') return # Error si no se reciben 2 parámetros. if len(args) != 2: ogRaiseError(OG_ERR_FORMAT) return # Detectar id. de tipo de partición y codificar al mnemónico. ID = ogGetPartitionId(args[0], args[1]) if ID is None: return TYPE = ogIdToType(ID) print(TYPE) return def ogHidePartition(*args): # Variables locales PART = None TYPE = None NEWTYPE = None # Si se solicita, mostrar ayuda. if len(args) == 1 and args[0] == "help": ogHelp('ogHidePartition', 'ogHidePartition int_ndisk int_npartition', 'ogHidePartition 1 1') return # Error si no se reciben 2 parámetros. if len(args) != 2: ogRaiseError(OG_ERR_FORMAT) return # Obtener el dispositivo de la partición. PART = ogDiskToDev(args[0], args[1]) if PART is None: return # Obtener tipo de partición. TYPE = ogGetPartitionType(args[0], args[1]) if TYPE == "NTFS": NEWTYPE = "HNTFS" elif TYPE == "FAT32": NEWTYPE = "HFAT32" elif TYPE == "FAT16": NEWTYPE = "HFAT16" elif TYPE == "FAT12": NEWTYPE = "HFAT12" elif TYPE == "WINDOWS": NEWTYPE = "WIN-RESERV" else: ogRaiseError(OG_ERR_PARTITION, TYPE) return # Cambiar tipo de partición. ogSetPartitionType(args[0], args[1], NEWTYPE) return def ogIdToType(ID): # Obtener valor hexadecimal de 4 caracteres rellenado con 0 por delante. ID = ID.zfill(4) TYPE = None # Asignar el tipo de partición según el ID. if ID == "0000": TYPE = "EMPTY" elif ID == "0001": TYPE = "FAT12" elif ID in ["0005", "000f"]: TYPE = "EXTENDED" elif ID in ["0006", "000e"]: TYPE = "FAT16" elif ID == "0007": TYPE = "NTFS" elif ID in ["000b", "000c"]: TYPE = "FAT32" elif ID == "0011": TYPE = "HFAT12" elif ID == "0012": TYPE = "COMPAQDIAG" elif ID in ["0016", "001e"]: TYPE = "HFAT16" elif ID == "0017": TYPE = "HNTFS" elif ID in ["001b", "001c"]: TYPE = "HFAT32" elif ID == "0042": TYPE = "WIN-DYNAMIC" elif ID in ["0082", "8200"]: TYPE = "LINUX-SWAP" elif ID in ["0083", "8300"]: TYPE = "LINUX" elif ID in ["008e", "8E00"]: TYPE = "LINUX-LVM" elif ID in ["00a5", "a503"]: TYPE = "FREEBSD" elif ID == "00a6": TYPE = "OPENBSD" elif ID == "00a7": TYPE = "CACHE" # (compatibilidad con Brutalix) elif ID in ["00af", "af00"]: TYPE = "HFS" elif ID in ["00be", "be00"]: TYPE = "SOLARIS-BOOT" elif ID in ["00bf", "bf00145"]: TYPE = "SOLARIS" elif ID in ["00ca", "ca00"]: TYPE = "CACHE" elif ID == "00da": TYPE = "DATA" elif ID == "00ee": TYPE = "GPT" elif ID in ["00ef", "ef00"]: TYPE = "EFI" elif ID == "00fb": TYPE = "VMFS" elif ID in ["00fd", "fd00"]: TYPE = "LINUX-RAID" elif ID == "0700": TYPE = "WINDOWS" elif ID == "0c01": TYPE = "WIN-RESERV" elif ID == "7f00": TYPE = "CHROMEOS-KRN" elif ID == "7f01": TYPE = "CHROMEOS" elif ID == "7f02": TYPE = "CHROMEOS-RESERV" elif ID == "8301": TYPE = "LINUX-RESERV" elif ID == "a500": TYPE = "FREEBSD-DISK" elif ID == "a501": TYPE = "FREEBSD-BOOT" elif ID == "a502": TYPE = "FREEBSD-SWAP" elif ID == "ab00": TYPE = "HFS-BOOT" elif ID == "af01": TYPE = "HFS-RAID" elif ID == "bf02": TYPE = "SOLARIS-SWAP" elif ID == "bf03": TYPE = "SOLARIS-DISK" elif ID == "ef01": TYPE = "MBR" elif ID == "ef02": TYPE = "BIOS-BOOT" elif ID == "10000": TYPE = "LVM-LV" elif ID == "10010": TYPE = "ZFS-VOL" else: TYPE = "UNKNOWN" return TYPE def ogIsDiskLocked(*args): # Variables locales DISK = None LOCKFILE = None # Si se solicita, mostrar ayuda. if len(args) == 1 and args[0] == "help": ogHelp('ogIsDiskLocked', 'ogIsDiskLocked int_ndisk', 'if ogIsDiskLocked(1): ...') return # Falso, en caso de error. if len(args) != 1: return False DISK = ogDiskToDev(args[0], 2) if DISK is None: return False # Comprobar existencia de fichero de bloqueo para el disco. LOCKFILE = f"/var/lock/lock{DISK.replace('/', '-')}" return os.path.isfile(LOCKFILE) def ogListPartitions(*args): # Variables locales DISK = None PART = None NPARTS = None TYPE = None SIZE = None # Si se solicita, mostrar ayuda. if len(args) == 1 and args[0] == "help": ogHelp('ogListPartitions', 'ogListPartitions int_ndisk', 'ogListPartitions 1 => NTFS:10000000 EXT3:5000000 LINUX-SWAP:1000000') return # Error si no se recibe 1 parámetro. if len(args) != 1: ogRaiseError(OG_ERR_FORMAT) return # Procesar la salida de parted. DISK = ogDiskToDev(args[0]) if DISK is None: return NPARTS = ogGetPartitionsNumber(args[0]) for PART in range(1, NPARTS + 1): TYPE = ogGetPartitionType(args[0], PART) or "EMPTY" SIZE = ogGetPartitionSize(args[0], PART) or 0 print(f"{TYPE}:{SIZE} ", end="") print() return def ogListPrimaryPartitions(*args): # Variables locales PTTYPE = None PARTS = None # Si se solicita, mostrar ayuda. if len(args) == 1 and args[0] == "help": ogHelp('ogListPrimaryPartitions', 'ogListPrimaryPartitions int_ndisk', 'ogListPrimaryPartitions 1 => NTFS:10000000 EXT3:5000000 EXTENDED:1000000') return PTTYPE = ogGetPartitionTableType(args[0]) if PTTYPE is None: return PARTS = ogListPartitions(*args) if PARTS is None: return if PTTYPE == "GPT": print(PARTS.rstrip(" EMPTY:0")) elif PTTYPE == "MSDOS": print(PARTS.split(" ")[0:4]) return def ogListLogicalPartitions(*args): # Variables locales PTTYPE = None PARTS = None # Si se solicita, mostrar ayuda. if len(args) == 1 and args[0] == "help": ogHelp('ogListLogicalPartitions', 'ogListLogicalPartitions int_ndisk', 'ogListLogicalPartitions 1 => LINUX-SWAP:999998') return PTTYPE = ogGetPartitionTableType(args[0]) if PTTYPE is None: return PARTS = ogListPartitions(*args) if PARTS is None: return return PARTS.split(" ")[4:] def ogLockDisk(*args): # Variables locales DISK = None LOCKFILE = None # Si se solicita, mostrar ayuda. if len(args) == 1 and args[0] == "help": ogHelp('ogLockDisk', 'ogLockDisk int_ndisk', 'ogLockDisk 1') return # Error si no se recibe 1 parámetro. if len(args) != 1: ogRaiseError(OG_ERR_FORMAT) return # Obtener partición. DISK = ogDiskToDev(args[0]) if DISK is None: return # Crear archivo de bloqueo exclusivo. LOCKFILE = f"/var/lock/lock{DISK.replace('/', '-')}" open(LOCKFILE, 'a').close() def ogSetPartitionActive(*args): # Variables locales DISK = None PART = None # Si se solicita, mostrar ayuda. if len(args) == 1 and args[0] == "help": ogHelp('ogSetPartitionActive', 'ogSetPartitionActive int_ndisk int_npartition', 'ogSetPartitionActive 1 1') return # Error si no se reciben 2 parámetros. if len(args) != 2: ogRaiseError(OG_ERR_FORMAT) return # Comprobar que el disco existe y activar la partición indicada. DISK = ogDiskToDev(args[0]) if DISK is None: return PART = ogDiskToDev(args[0], args[1]) if PART is None: return subprocess.run(["parted", "-s", DISK, "set", args[1], "boot", "on"], stderr=subprocess.DEVNULL) return def ogSetPartitionId(*args): # Variables locales DISK = None PART = None PTTYPE = None ID = None # Si se solicita, mostrar ayuda. if len(args) == 1 and args[0] == "help": ogHelp('ogSetPartitionId', 'ogSetPartitionId int_ndisk int_npartition hex_partid', 'ogSetPartitionId 1 1 7') return # Error si no se reciben 3 parámetros. if len(args) != 3: ogRaiseError(OG_ERR_FORMAT) return # Sustituye nº de disco y nº partición por su dispositivo. DISK = ogDiskToDev(args[0]) if DISK is None: return PART = ogDiskToDev(args[0], args[1]) if PART is None: return # Error si el id. de partición no es hexadecimal. ID = args[2].upper() if not re.match("^[0-9A-F]+$", ID): ogRaiseError(OG_ERR_OUTOFLIMIT, args[2]) return # Elección del tipo de partición. PTTYPE = ogGetPartitionTableType(args[0]) if PTTYPE == "GPT": subprocess.run(["sgdisk", f"-t{args[1]}:{ID}", DISK], stderr=subprocess.DEVNULL) elif PTTYPE == "MSDOS": subprocess.run(["sfdisk", f"--id", DISK, args[1], ID], stderr=subprocess.DEVNULL) else: ogRaiseError(OG_ERR_OUTOFLIMIT, f"{args[0]},{PTTYPE}") return # MSDOS) Correcto si fdisk sin error o con error pero realiza Syncing if subprocess.run(["partprobe", DISK], stderr=subprocess.DEVNULL).returncode == 0: return else: ogRaiseError(OG_ERR_PARTITION, f"{args[0]},{args[1]},{args[2]}") return def ogSetPartitionSize(*args): # Variables locales DISK = None PART = None SIZE = None # Si se solicita, mostrar ayuda. if len(args) == 1 and args[0] == "help": ogHelp('ogSetPartitionSize', 'ogSetPartitionSize int_ndisk int_npartition int_size', 'ogSetPartitionSize 1 1 10000000') return # Error si no se reciben 3 parámetros. if len(args) != 3: ogRaiseError(OG_ERR_FORMAT) return # Obtener el tamaño de la partición. DISK = ogDiskToDev(args[0]) if DISK is None: return PART = ogDiskToDev(args[0], args[1]) if PART is None: return # Convertir tamaño en KB a sectores de 512 B. SIZE = int(args[2]) * 2 # Redefinir el tamaño de la partición. subprocess.run(["sfdisk", "-f", "-uS", f"-N{args[1]}", DISK], input=f",{SIZE}", text=True, stderr=subprocess.DEVNULL) subprocess.run(["partprobe", DISK], stderr=subprocess.DEVNULL) return def ogSetPartitionType(*args): # Variables locales DISK = None PART = None PTTYPE = None TYPE = None # Si se solicita, mostrar ayuda. if len(args) == 1 and args[0] == "help": ogHelp('ogSetPartitionType', 'ogSetPartitionType int_ndisk int_npartition str_type', 'ogSetPartitionType 1 1 NTFS') return # Error si no se reciben 3 parámetros. if len(args) != 3: ogRaiseError(OG_ERR_FORMAT) return # Sustituye nº de disco por su dispositivo. DISK = ogDiskToDev(args[0]) if DISK is None: return PART = ogDiskToDev(args[0], args[1]) if PART is None: return # Elección del tipo de partición. PTTYPE = ogGetPartitionTableType(args[0]) if PTTYPE is None: return TYPE = args[2] ID = ogTypeToId(TYPE, PTTYPE) if ID is None: ogRaiseError(OG_ERR_FORMAT, f"{TYPE},{PTTYPE}") return ogSetPartitionId(args[0], args[1], ID) return def ogTypeToId(TYPE, PTTYPE="MSDOS"): # Asociar id. de partición para su mnemónico. ID = "" # Elección del tipo de partición. if PTTYPE == "GPT": if TYPE == "EMPTY": ID = "0" elif TYPE in ["WINDOWS", "NTFS", "EXFAT", "FAT32", "FAT16", "FAT12", "HNTFS", "HFAT32", "HFAT16", "HFAT12"]: ID = "0700" elif TYPE == "WIN-RESERV": ID = "0C01" elif TYPE == "CHROMEOS-KRN": ID = "7F00" elif TYPE == "CHROMEOS": ID = "7F01" elif TYPE == "CHROMEOS-RESERV": ID = "7F02" elif TYPE == "LINUX-SWAP": ID = "8200" elif TYPE in ["LINUX", "EXT2", "EXT3", "EXT4", "REISERFS", "REISER4", "XFS", "JFS"]: ID = "8300" elif TYPE == "LINUX-RESERV": ID = "8301" elif TYPE == "LINUX-LVM": ID = "8E00" elif TYPE == "FREEBSD-DISK": ID = "A500" elif TYPE == "FREEBSD-BOOT": ID = "A501" elif TYPE == "FREEBSD-SWAP": ID = "A502" elif TYPE == "FREEBSD": ID = "A503" elif TYPE == "HFS-BOOT": ID = "AB00" elif TYPE in ["HFS", "HFS+"]: ID = "AF00" elif TYPE == "HFS-RAID": ID = "AF01" elif TYPE == "SOLARIS-BOOT": ID = "BE00" elif TYPE == "SOLARIS": ID = "BF00" elif TYPE == "SOLARIS-SWAP": ID = "BF02" elif TYPE == "SOLARIS-DISK": ID = "BF03" elif TYPE == "CACHE": ID = "CA00" elif TYPE == "EFI": ID = "EF00" elif TYPE == "LINUX-RAID": ID = "FD00" elif PTTYPE == "MSDOS": if TYPE == "EMPTY": ID = "0" elif TYPE == "FAT12": ID = "1" elif TYPE == "EXTENDED": ID = "5" elif TYPE == "FAT16": ID = "6" elif TYPE in ["WINDOWS", "NTFS", "EXFAT"]: ID = "7" elif TYPE == "FAT32": ID = "b" elif TYPE == "HFAT12": ID = "11" elif TYPE == "HFAT16": ID = "16" elif TYPE == "HNTFS": ID = "17" elif TYPE == "HFAT32": ID = "1b" elif TYPE == "LINUX-SWAP": ID = "82" elif TYPE in ["LINUX", "EXT2", "EXT3", "EXT4", "REISERFS", "REISER4", "XFS", "JFS"]: ID = "83" elif TYPE == "LINUX-LVM": ID = "8e" elif TYPE == "FREEBSD": ID = "a5" elif TYPE == "OPENBSD": ID = "a6" elif TYPE in ["HFS", "HFS+"]: ID = "af" elif TYPE == "SOLARIS-BOOT": ID = "be" elif TYPE == "SOLARIS": ID = "bf" elif TYPE == "CACHE": ID = "ca" elif TYPE == "DATA": ID = "da" elif TYPE == "GPT": ID = "ee" elif TYPE == "EFI": ID = "ef" elif TYPE == "VMFS": ID = "fb" elif TYPE == "LINUX-RAID": ID = "fd" elif PTTYPE == "LVM": if TYPE == "LVM-LV": ID = "10000" elif PTTYPE == "ZVOL": if TYPE == "ZFS-VOL": ID = "10010" return ID def ogUnhidePartition(*args): # Variables locales PART = None TYPE = None NEWTYPE = None # Si se solicita, mostrar ayuda. if len(args) == 1 and args[0] == "help": ogHelp('ogUnhidePartition', 'ogUnhidePartition int_ndisk int_npartition', 'ogUnhidePartition 1 1') return # Error si no se reciben 2 parámetros. if len(args) != 2: ogRaiseError(OG_ERR_FORMAT) return PART = ogDiskToDev(args[0], args[1]) if PART is None: return # Obtener tipo de partición. TYPE = ogGetPartitionType(args[0], args[1]) if TYPE == "HNTFS": NEWTYPE = "NTFS" elif TYPE == "HFAT32": NEWTYPE = "FAT32" elif TYPE == "HFAT16": NEWTYPE = "FAT16" elif TYPE == "HFAT12": NEWTYPE = "FAT12" elif TYPE == "WIN-RESERV": NEWTYPE = "WINDOWS" else: ogRaiseError(OG_ERR_PARTITION, TYPE) return # Cambiar tipo de partición. ogSetPartitionType(args[0], args[1], NEWTYPE) return def ogUnlockDisk(*args): # Variables locales DISK = None LOCKFILE = None # Si se solicita, mostrar ayuda. if len(args) == 1 and args[0] == "help": ogHelp('ogUnlockDisk', 'ogUnlockDisk int_ndisk', 'ogUnlockDisk 1') return # Error si no se recibe 1 parámetro. if len(args) != 1: ogRaiseError(OG_ERR_FORMAT) return # Obtener partición. DISK = ogDiskToDev(args[0]) if DISK is None: return # Borrar archivo de bloqueo exclusivo. LOCKFILE = f"/var/lock/lock{DISK.replace('/', '-')}" os.remove(LOCKFILE) return def ogUpdatePartitionTable(): for disk in ogDiskToDev(): subprocess.run(["partprobe", disk])