ogclone-engine/client/lib/engine/bin/DiskLib.py

1368 lines
43 KiB
Python

import filecmp
import subprocess
import shutil
import os
from CacheLib import *
from FileSystemLib import *
from SystemLib import *
print (">>>>>>>>>>>>>>>>>>>> Load ", __name__, " <<<<<<<<<<<<<<<<<<<<<<")
def parted(*args):
parted_path = shutil.which("parted")
if parted_path:
try:
result = subprocess.run(
[parted_path] + list(args),
timeout=3,
capture_output=True,
text=True
)
return result.stdout
except subprocess.TimeoutExpired:
return "Error: Command 'parted' timed out"
else:
return "Error: 'parted' command not found"
def ogCreatePartitions(*args):
# Variables locales
ND = DISK = PTTYPE = PART = SECTORS = START = SIZE = TYPE = CACHEPART = None
IODISCO = IOSIZE = CACHESIZE = EXTSTART = EXTSIZE = NVME_PREFIX = tmpsfdisk = None
# Si se solicita, mostrar ayuda.
if len(args) == 1 and args[0] == "help":
ogHelp('ogCreatePartitions', 'ogCreatePartitions int_ndisk str_parttype:int_partsize ...',
'ogCreatePartitions 1 NTFS:10000000 EXT3:5000000 LINUX-SWAP:1000000')
return
# Error si no se reciben al menos 2 parámetros.
if len(args) < 2:
ogRaiseError(OG_ERR_FORMAT)
return
# Nº total de sectores, para evitar desbordamiento (evitar redondeo).
ND = args[0]
DISK = ogDiskToDev(ND)
if DISK is None:
return
PTTYPE = ogGetPartitionTableType(ND) or "MSDOS" # Por defecto para discos vacíos.
if PTTYPE == "GPT":
ogCreateGptPartitions(*args)
return
elif PTTYPE != "MSDOS":
ogRaiseError(OG_ERR_PARTITION, PTTYPE)
return
SECTORS = ogGetLastSector(ND)
# Se recalcula el nº de sectores del disco 1, si existe partición de caché.
CACHEPART = ogFindCache()
if CACHEPART and ND == CACHEPART.split()[0]:
CACHESIZE = int(ogGetCacheSize()) * 2
# Sector de inicio (la partición 1 empieza en el sector 63).
IODISCO = ogDiskToDev(ND)
IOSIZE = subprocess.getoutput(f"fdisk -l {IODISCO} | awk '/I\\/O/ {{print $4}}'")
if IOSIZE == "4096":
START = 4096
SECTORS -= 8192
if CACHESIZE:
SECTORS = SECTORS - CACHESIZE + 2048 - (SECTORS - CACHESIZE) % 2048 - 1
else:
START = 63
if CACHESIZE:
SECTORS -= CACHESIZE
PART = 1
# Fichero temporal de entrada para "sfdisk"
tmpsfdisk = f"/tmp/sfdisk{os.getpid()}"
try:
with open(tmpsfdisk, 'w') as f:
f.write("unit: sectors\n\n")
NVME_PREFIX = "p" if "nvme" in DISK else ""
# Generar fichero de entrada para "sfdisk" con las particiones.
args = args[1:] # Shift
while args:
# Conservar los datos de la partición de caché.
if f"{ND} {PART}" == CACHEPART and CACHESIZE:
with open(tmpsfdisk, 'a') as f:
f.write(f"{DISK}{NVME_PREFIX}{PART} : start={SECTORS + 1}, size={CACHESIZE}, Id=ca\n")
PART += 1
# Leer formato de cada parámetro - Tipo:Tamaño
TYPE = args[0].split(":")[0]
SIZE = int(args[0].split(":")[1])
# Obtener identificador de tipo de partición válido.
ID = ogTypeToId(TYPE, "MSDOS")
with open(tmpsfdisk, 'a') as f:
f.write(f"{DISK}{NVME_PREFIX}{PART} : start={START}, size={SIZE}, Id={ID}\n")
START += SIZE
SECTORS -= SIZE
PART += 1
args = args[1:]
# Ejecutar "sfdisk" con el fichero temporal.
subprocess.run(["sfdisk", DISK], input=open(tmpsfdisk, 'r').read(), text=True)
subprocess.run(["partprobe", DISK])
finally:
# Eliminar fichero temporal.
if os.path.exists(tmpsfdisk):
os.remove(tmpsfdisk)
if CACHESIZE:
ogMountCache()
return 0
def ogCreateGptPartitions(*args):
# Variables locales
ND = DISK = PART = SECTORS = ALIGN = START = SIZE = TYPE = CACHEPART = CACHESIZE = DELOPTIONS = OPTIONS = None
# Si se solicita, mostrar ayuda.
if len(args) == 1 and args[0] == "help":
ogHelp('ogCreateGptPartitions', 'ogCreateGptPartitions int_ndisk str_parttype:int_partsize ...',
'ogCreateGptPartitions 1 NTFS:10000000 EXT3:5000000 LINUX-SWAP:1000000')
return
# Error si no se reciben menos de 2 parámetros.
if len(args) < 2:
ogRaiseError(OG_ERR_FORMAT)
return
# Nº total de sectores, para evitar desbordamiento (evitar redondeo).
ND = args[0]
DISK = ogDiskToDev(ND)
if DISK is None:
return
# Se calcula el ultimo sector del disco (total de sectores usables)
SECTORS = ogGetLastSector(ND)
# Se recalcula el nº de sectores del disco si existe partición de caché.
CACHEPART = ogFindCache()
if ND == CACHEPART.split()[0]:
CACHESIZE = int(ogGetCacheSize()) * 2
if CACHESIZE:
SECTORS -= CACHESIZE
# Si el disco es GPT empieza en el sector 2048 por defecto, pero podria cambiarse
ALIGN = subprocess.getoutput(f"sgdisk -D {DISK} 2>/dev/null")
START = ALIGN
PART = 1
# Leer parámetros con definición de particionado.
args = args[1:] # Shift
while args:
# Si PART es la cache, nos la saltamos y seguimos con el siguiente numero para conservar los datos de la partición de caché.
if f"{ND} {PART}" == CACHEPART and CACHESIZE:
PART += 1
# Leer formato de cada parámetro - Tipo:Tamaño
TYPE = args[0].split(":")[0]
SIZE = int(args[0].split(":")[1])
# Error si la partición es extendida (no válida en discos GPT).
if TYPE == "EXTENDED":
ogRaiseError(OG_ERR_PARTITION, "EXTENDED")
return
# Comprobar si existe la particion actual, capturamos su tamaño para ver si cambio o no
PARTSIZE = ogGetPartitionSize(ND, PART)
# En sgdisk no se pueden redimensionar las particiones, es necesario borrarlas y volver a crealas
if PARTSIZE:
DELOPTIONS += f" -d{PART}"
# Creamos la particion
# Obtener identificador de tipo de partición válido.
ID = ogTypeToId(TYPE, "GPT")
if TYPE != "CACHE" and ID:
OPTIONS += f" -n{PART}:{START}:+{SIZE} -t{PART}:{ID} "
START += SIZE
# Error si se supera el nº total de sectores.
if START > SECTORS:
ogRaiseError(OG_ERR_FORMAT, f"{START//2} > {SECTORS//2}")
return
PART += 1
args = args[1:]
# Desmontar los sistemas de archivos del disco antes de realizar las operaciones.
ogUnmountAll(ND)
if CACHESIZE:
ogUnmountCache()
# Si la tabla de particiones no es valida, volver a generarla.
ogCreatePartitionTable(ND)
# Definir particiones y notificar al kernel.
# Borramos primero las particiones y luego creamos las nuevas
subprocess.run(["sgdisk"] + DELOPTIONS.split() + OPTIONS.split() + [DISK], stderr=subprocess.DEVNULL)
subprocess.run(["partprobe", DISK], stderr=subprocess.DEVNULL)
if CACHESIZE:
ogMountCache()
return 0
def ogCreatePartitionTable(*args):
# Variables locales
DISK = PTTYPE = CREATE = CREATEPTT = None
# Si se solicita, mostrar ayuda.
if len(args) == 1 and args[0] == "help":
ogHelp('ogCreatePartitionTable', 'ogCreatePartitionTable int_ndisk [str_partype]',
'ogCreatePartitionTable 1 GPT', 'ogCreatePartitionTable 1')
return
# Error si no se reciben 1 o 2 parámetros.
if len(args) == 1:
CREATEPTT = ""
elif len(args) == 2:
CREATEPTT = args[1]
else:
ogRaiseError(OG_ERR_FORMAT)
return
# Capturamos el tipo de tabla de particiones actual
DISK = ogDiskToDev(args[0])
if DISK is None:
return
PTTYPE = ogGetPartitionTableType(args[0]) or "MSDOS" # Por defecto para discos vacíos.
CREATEPTT = CREATEPTT or PTTYPE
# Si la tabla actual y la que se indica son iguales, se comprueba si hay que regenerarla.
if CREATEPTT == PTTYPE:
if PTTYPE == "GPT":
try:
result = subprocess.run(
["sgdisk", "-p", DISK],
stderr=subprocess.PIPE,
stdout=subprocess.DEVNULL
)
if result.returncode != 0:
CREATE = "GPT"
except subprocess.CalledProcessError:
CREATE = "GPT"
elif PTTYPE == "MSDOS":
try:
result = subprocess.run(
["parted", "-s", DISK, "print"],
stderr=subprocess.PIPE,
stdout=subprocess.DEVNULL
)
if result.returncode != 0:
CREATE = "MSDOS"
except subprocess.CalledProcessError:
CREATE = "MSDOS"
else:
CREATE = CREATEPTT
# Dependiendo del valor de CREATE, creamos la tabla de particiones en cada caso.
if CREATE == "GPT":
if PTTYPE == "MSDOS":
subprocess.run(["sgdisk", "-go", DISK])
else:
subprocess.run(["gdisk", DISK], input="2\nw\nY\n", text=True)
subprocess.run(["partprobe", DISK], stderr=subprocess.DEVNULL)
elif CREATE == "MSDOS":
if PTTYPE == "GPT":
subprocess.run(["sgdisk", "-Z", DISK])
subprocess.run(["fdisk", DISK], input="o\nn\np\n\n\n\nd\n\nw", text=True)
subprocess.run(["partprobe", DISK], stderr=subprocess.DEVNULL)
return
def ogDeletePartitionTable(*args):
# Variables locales
DISK = None
# Si se solicita, mostrar ayuda.
if len(args) == 1 and args[0] == "help":
ogHelp('ogDeletePartitionTable', 'ogDeletePartitionTable int_ndisk', 'ogDeletePartitionTable 1')
return
# Error si no se reciben 1 parámetros.
if len(args) != 1:
ogRaiseError(OG_ERR_FORMAT)
return
# Obteniendo Identificador linux del disco.
DISK = ogDiskToDev(args[0])
if DISK is None:
return
# Crear una tabla de particiones vacía.
PTTYPE = ogGetPartitionTableType(args[0])
if PTTYPE == "GPT":
subprocess.run(["sgdisk", "-o", DISK])
elif PTTYPE == "MSDOS":
subprocess.run(["fdisk", DISK], input="o\nw", text=True)
return
def ogDevToDisk(dev):
# Variables locales
CACHEFILE = "/var/cache/disks.cfg"
PART = None
n = 1
# Si se solicita, mostrar ayuda.
if dev == "help":
ogHelp("ogDevToDisk", "ogDevToDisk path_device | LABEL=str_label | UUID=str_uuid",
"ogDevToDisk /dev/sda => 1",
"ogDevToDisk /dev/sda1 => 1 1",
"ogDevToDisk LABEL=CACHE => 1 4")
return
# Error si no se recibe 1 parámetro.
if len(dev) != 1:
ogRaiseError(OG_ERR_FORMAT)
return
# Obtener dispositivo a partir de camino, etiqueta o UUID.
DEV = dev[0]
if DEV.startswith("LABEL="):
DEV = subprocess.getoutput(f"blkid -L {DEV[6:]}")
elif DEV.startswith("PARTLABEL="):
DEV = subprocess.getoutput(f"realpath /dev/disk/by-partlabel/{DEV[11:]} 2>/dev/null")
elif DEV.startswith("PARTUUID="):
DEV = subprocess.getoutput(f"realpath /dev/disk/by-partuuid/{DEV[10:]} 2>/dev/null")
elif DEV.startswith("UUID="):
DEV = subprocess.getoutput(f"blkid -U {DEV[5:]}")
# Error si no es fichero de bloques o directorio (para LVM).
if not os.path.exists(DEV):
ogRaiseError(OG_ERR_NOTFOUND, dev)
return
# Buscar en fichero de caché de discos.
if os.path.exists(CACHEFILE):
with open(CACHEFILE, 'r') as f:
for line in f:
parts = line.strip().split(':')
if len(parts) == 2 and parts[1] == DEV:
PART = parts[0]
break
if PART:
return PART
# Si no se encuentra, procesa todos los discos para devolver su nº de orden y de partición.
disks = ogDiskToDev()
for d in disks:
NVME_PREFIX = "p" if "nvme" in d else ""
if DEV.startswith(d):
return f"{n} {DEV[len(d) + len(NVME_PREFIX):]}"
n += 1
ogRaiseError(OG_ERR_NOTFOUND, dev)
return OG_ERR_NOTFOUND
def ogDiskToDev(*args):
# Variables locales
CACHEFILE = "/var/cache/disks.cfg"
ALLDISKS = []
MPATH = VOLGROUPS = ZFSVOLS = DISK = PART = ZPOOL = None
i = 1
# Si se solicita, mostrar ayuda.
if len(args) == 1 and args[0] == "help":
ogHelp('ogDiskToDev', 'ogDiskToDev int_ndisk [int_npartition]',
'ogDiskToDev => /dev/sda /dev/sdb',
'ogDiskToDev 1 => /dev/sda',
'ogDiskToDev 1 1 => /dev/sda1')
return
# Borrar fichero de caché de configuración si hay cambios en las particiones.
if not filecmp.cmp('/proc/partitions', '/tmp/.partitions'):
# Guardar copia de las particiones definidas para comprobar cambios.
shutil.copy2('/proc/partitions', '/tmp/.partitions')
os.remove(CACHEFILE)
# Si existe una correspondencia con disco/dispositivo en el caché; mostrarlo y salir.
with open(CACHEFILE, 'r') as f:
for line in f:
parts = line.strip().split(':')
if len(parts) == 2 and parts[0] == ":".join(args):
print(parts[1])
return
# Continuar para detectar nuevos dispositivos.
# Listar dispositivos de discos.
all_disks = subprocess.getoutput("lsblk -n -e 1,2 -x MAJ:MIN 2>/dev/null || lsblk -n -e 1,2").splitlines()
for line in all_disks:
parts = line.split()
if parts[5] == "disk":
ALLDISKS.append(f"/dev/{parts[0]}")
# Listar volúmenes lógicos.
VOLGROUPS = subprocess.getoutput("vgs -a --noheadings 2>/dev/null").splitlines()
VOLGROUPS = [f"/dev/{line.split()[0]}" for line in VOLGROUPS]
# Detectar caminos múltiples (ignorar mensaje si no está configurado Multipath).
try:
MPATH = subprocess.getoutput("multipath -l -v 1 2>/dev/null").splitlines()
MPATH = [f"/dev/mapper/{line.split()[0]}" for line in MPATH]
# Quitar de la lista los discos que forman parte de Multipath.
for line in subprocess.getoutput("multipath -ll").splitlines():
if line.split()[5] == "ready":
disk = f"/dev/{line.split()[2]}"
if disk in ALLDISKS:
ALLDISKS.remove(disk)
except subprocess.CalledProcessError:
MPATH = []
# Detectar volúmenes ZFS.
ZFSVOLS = subprocess.getoutput("blkid").splitlines()
ZFSVOLS = [line.split(":")[0] for line in ZFSVOLS if "zfs" in line]
# Mostrar salidas segun el número de parametros.
if len(args) == 0:
print(" ".join(ALLDISKS))
elif len(args) == 1:
# Error si el parámetro no es un número positivo.
if not args[0].isdigit() or int(args[0]) <= 0:
ogRaiseError(OG_ERR_FORMAT, args[0])
return
disk = ALLDISKS[int(args[0])-1]
# Error si el fichero no existe.
if not os.path.exists(disk):
ogRaiseError(OG_ERR_NOTFOUND, args[0])
return
# Actualizar caché de configuración y mostrar dispositivo.
with open(CACHEFILE, 'a') as f:
f.write(f"{args[0]}:{disk}\n")
print(disk)
elif len(args) == 2:
# Error si los 2 parámetros no son números positivos.
if not args[0].isdigit() or int(args[0]) <= 0 or not args[1].isdigit() or int(args[1]) <= 0:
ogRaiseError(OG_ERR_FORMAT, f"{args[0]} {args[1]}")
return
disk = ALLDISKS[int(args[0])-1]
# Error si el fichero no existe.
if not os.path.exists(disk):
ogRaiseError(OG_ERR_NOTFOUND, args[0])
return
part = f"{disk}{args[1]}"
# Comprobar si es partición.
if os.path.exists(part):
# Actualizar caché de configuración y mostrar dispositivo.
with open(CACHEFILE, 'a') as f:
f.write(f"{args[0]} {args[1]}:{part}\n")
print(part)
else:
# Comprobar si RAID o Multipath (tener en cuenta enlace simbólico).
part = f"{disk}p{args[1]}"
if os.path.exists(part) and os.stat(part, follow_symlinks=True).st_mode[0] == "b":
# Actualizar caché de configuración y mostrar dispositivo.
with open(CACHEFILE, 'a') as f:
f.write(f"{args[0]} {args[1]}:{part}\n")
print(part)
else:
part = ""
# Comprobar si volumen lógico.
if disk in VOLGROUPS:
lvscan = subprocess.getoutput("lvscan -a 2>/dev/null").splitlines()
for line in lvscan:
parts = line.split("'")
if parts[1].startswith(f"{disk}/") and i == int(args[1]):
part = parts[1]
break
i += 1
# Comprobar si volumen ZFS que puede ser montado.
if disk in ZFSVOLS:
subprocess.run(["zpool", "import", "-f", "-R", "/mnt", "-N", "-a"], stderr=subprocess.DEVNULL)
zpool = subprocess.getoutput(f"blkid -s LABEL -o value {disk}")
zfs_list = subprocess.getoutput(f"zfs list -Hp -o name,canmount,mountpoint -r {zpool}").splitlines()
for line in zfs_list:
parts = line.split()
if parts[1] == "on" and parts[2] != "none":
if i == int(args[1]):
part = parts[0]
break
i += 1
# Salir si no se encuentra dispositivo.
if not part:
ogRaiseError(OG_ERR_NOTFOUND, f"{args[0]} {args[1]}")
return
# Devolver camino al dispositivo.
# Actualizar caché de configuración y mostrar dispositivo.
with open(CACHEFILE, 'a') as f:
f.write(f"{args[0]} {args[1]}:{part}\n")
print(part)
else:
ogRaiseError(OG_ERR_FORMAT)
return
def ogGetDiskSize(*args):
# Variables locales
DISK = SIZE = None
# Si se solicita, mostrar ayuda.
if len(args) == 1 and args[0] == "help":
ogHelp('ogGetDiskSize', 'ogGetDiskSize int_ndisk', 'ogGetDiskSize 1 => 244198584')
return
# Error si no se recibe 1 parámetro.
if len(args) != 1:
ogRaiseError(OG_ERR_FORMAT)
return
# Obtener el tamaño del disco.
DISK = ogDiskToDev(args[0])
if DISK is None:
return
SIZE = subprocess.getoutput(f"lsblk -n -b -o SIZE {DISK}")
# Mostrar salida.
if SIZE:
print(SIZE)
return
def ogGetDiskType(*args):
# Variables locales
DEV = MAJOR = TYPE = None
# Si se solicita, mostrar ayuda.
if len(args) == 1 and args[0] == "help":
ogHelp('ogGetDiskType', 'ogGetDiskType path_device', 'ogGetDiskType /dev/sdb => USB')
return
# Error si no se recibe 1 parámetro.
if len(args) != 1:
ogRaiseError(OG_ERR_FORMAT)
return
# Obtener el driver del dispositivo de bloques.
DEV = args[0].split("/dev/")[1]
MAJOR = subprocess.getoutput(f"awk -v D='{DEV}' '{{if ($4==D) print $1;}}' /proc/partitions")
TYPE = subprocess.getoutput(f"awk -v D={MAJOR} '/Block/ {{bl=1}} {{if ($1==D&&bl) print toupper($2)}}' /proc/devices")
# Devolver mnemónico del driver de dispositivo.
if TYPE == "SD":
TYPE = "DISK"
if subprocess.getoutput(f"udevadm info -q property {args[0]} 2>/dev/null | grep -q '^ID_BUS=usb'"):
TYPE = "USB"
elif TYPE == "BLKEXT":
TYPE = "NVM"
elif TYPE == "SR" or TYPE.startswith("IDE"):
TYPE = "CDROM" # FIXME Comprobar discos IDE.
elif TYPE == "MD" or TYPE.startswith("CCISS"):
TYPE = "RAID"
elif TYPE == "DEVICE-MAPPER":
TYPE = "MAPPER" # FIXME Comprobar LVM y RAID.
print(TYPE)
return
def ogGetEsp():
for d in subprocess.getoutput("blkid -o device").split():
# Previene error para /dev/loop0
PART = ogDevToDisk([d])
# En discos NVMe blkid devuelve una salida del tipo:
# >/dev/loop0
# >/dev/nvme0n1
# >/dev/nvme0n1p1
# al analizar la particion nvme0n1, PART solo tiene un argumento y hace que ogGetPartitionId lance un error
LEN = len(PART)
if LEN > 1:
if ogGetPartitionId(PART) == ogTypeToId("EFI", "GPT"):
return PART
return None
def ogGetLastSector(*args):
# Variables locales
DISK = None
PART = None
LASTSECTOR = None
# Si se solicita, mostrar ayuda.
if len(args) == 1 and args[0] == "help":
ogHelp("ogGetLastSector", "ogGetLastSector int_ndisk [int_npart]",
"ogGetLastSector 1 => 488392064",
"ogGetLastSector 1 1 => 102400062")
return
# Obtener último sector.
if len(args) == 1: # Para un disco.
DISK = ogDiskToDev(args[0])
if DISK is None:
return
LASTSECTOR = subprocess.getoutput(f"sgdisk -p {DISK} | awk '/last usable sector/ {{print($(NF))}}'")
elif len(args) == 2: # Para una partición.
DISK = ogDiskToDev(args[0])
if DISK is None:
return
PART = ogDiskToDev(args[0], args[1])
if PART is None:
return
LASTSECTOR = subprocess.getoutput(f"sgdisk -p {DISK} | awk -v P='{args[1]}' '{{if ($1==P) print $3}}'")
else: # Error si se reciben más parámetros.
ogRaiseError(OG_ERR_FORMAT)
return
print(LASTSECTOR)
return
def ogGetPartitionActive(*args):
# Variables locales
DISK = None
# Si se solicita, mostrar ayuda.
if len(args) == 1 and args[0] == "help":
ogHelp('ogGetPartitionActive', 'ogGetPartitionActive int_ndisk', 'ogGetPartitionActive 1 => 1')
return
# Error si no se recibe 1 parámetro.
if len(args) != 1:
ogRaiseError(OG_ERR_FORMAT)
return
# Comprobar que el disco existe y listar su partición activa.
DISK = ogDiskToDev(args[0])
if DISK is None:
return
output = subprocess.getoutput(f"parted -sm {DISK} print 2>/dev/null | awk -F: '$7~/boot/ {{print $1}}'")
print(output)
return
def ogGetPartitionId(*args):
# Variables locales
DISK = None
ID = None
# Si se solicita, mostrar ayuda.
if len(args) == 1 and args[0] == "help":
ogHelp('ogGetPartitionId', 'ogGetPartitionId int_ndisk int_npartition', 'ogGetPartitionId 1 1 => 7')
return
# Error si no se reciben 2 parámetros.
if len(args) != 2:
ogRaiseError(OG_ERR_FORMAT)
return
# Detectar y mostrar el id. de tipo de partición.
DISK = ogDiskToDev(args[0])
if DISK is None:
return
PTTYPE = ogGetPartitionTableType(args[0])
if PTTYPE == "GPT":
ID = subprocess.getoutput(f"sgdisk -p {DISK} 2>/dev/null | awk -v p={args[1]} '{{if ($1==p) print $6;}}'")
if ID == "8300" and f"{args[0]} {args[1]}" == ogFindCache():
ID = "CA00"
elif PTTYPE == "MSDOS":
ID = subprocess.getoutput(f"sfdisk --id {DISK} {args[1]} 2>/dev/null")
elif PTTYPE == "LVM":
ID = "10000"
elif PTTYPE == "ZPOOL":
ID = "10010"
print(ID)
return
def ogGetPartitionSize(*args):
# Variables locales
PART = None
SIZE = None
# Si se solicita, mostrar ayuda.
if len(args) == 1 and args[0] == "help":
ogHelp('ogGetPartitionSize', 'ogGetPartitionSize int_ndisk int_npartition', 'ogGetPartitionSize 1 1 => 10000000')
return
# Error si no se reciben 2 parámetros.
if len(args) != 2:
ogRaiseError(OG_ERR_FORMAT)
return
# Devolver tamaño de partición, del volumen lógico o del sistema de archivos (para ZFS).
PART = ogDiskToDev(args[0], args[1])
if PART is None:
return
SIZE = subprocess.getoutput(f"partx -gbo SIZE {PART} 2>/dev/null | awk '{{print int($1/1024)}}'")
if not SIZE:
SIZE = subprocess.getoutput(f"lvs --noheadings -o lv_size --units k {PART} | awk '{{printf \"%d\",$0}}'")
if not SIZE:
SIZE = ogGetFsSize(args[0], args[1])
print(SIZE or 0)
return
def ogGetPartitionsNumber(*args):
# Variables locales
DISK = None
# Si se solicita, mostrar ayuda.
if len(args) == 1 and args[0] == "help":
ogHelp('ogGetPartitionsNumber', 'ogGetPartitionsNumber int_ndisk', 'ogGetPartitionsNumber 1 => 3')
return
# Error si no se recibe 1 parámetro.
if len(args) != 1:
ogRaiseError(OG_ERR_FORMAT)
return
# Contar el número de veces que aparece el disco en su lista de particiones.
DISK = ogDiskToDev(args[0])
if DISK is None:
return
PTTYPE = ogGetPartitionTableType(args[0])
if PTTYPE in ["GPT", "MSDOS"]:
output = subprocess.getoutput(f"partx -gso NR {DISK} 2>/dev/null | awk -v p=0 '{{p=$1}} END {{print p}}'")
elif PTTYPE == "LVM":
output = subprocess.getoutput(f"lvs --noheadings {DISK} 2>/dev/null | wc -l")
elif PTTYPE == "ZPOOL":
subprocess.run(["zpool", "list"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) # Check if zpool command exists
subprocess.run(["modprobe", "zfs"]) # Load zfs module if not already loaded
subprocess.run(["zpool", "import", "-f", "-R", "/mnt", "-N", "-a"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) # Import all zpools
output = subprocess.getoutput(f"zfs list -Hp -o name,canmount,mountpoint -r $(blkid -s LABEL -o value {DISK}) | awk '$2==\"on\" && $3!=\"none\" {{c++}} END {{print c}}'")
else:
output = None
print(output)
return
def ogGetPartitionTableType(*args):
# Variables locales
DISK = None
TYPE = None
# Si se solicita, mostrar ayuda.
if len(args) == 1 and args[0] == "help":
ogHelp('ogGetPartitionTableType', 'ogGetPartitionTableType int_ndisk', 'ogGetPartitionTableType 1 => MSDOS')
return
# Error si no se recibe 1 parámetro.
if len(args) != 1:
ogRaiseError(OG_ERR_FORMAT)
return
# Sustituye n de disco por su dispositivo.
DISK = ogDiskToDev(args[0])
if DISK is None:
return
# Comprobar tabla de particiones.
if os.path.exists(DISK):
output = subprocess.getoutput(f"parted -sm {DISK} print 2>/dev/null | awk -F: -v D={DISK} '{{ if($1 == D) print toupper($6)}}'")
if not output:
output = subprocess.getoutput(f"parted -sm {DISK} print 2>/dev/null | awk -F: -v D={DISK} '{{ if($1 == D) print toupper($6)}}'")
# Comprobar si es volumen lógico.
if os.path.isdir(DISK) and subprocess.run(["vgs", DISK], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL).returncode == 0:
output = "LVM"
# Comprobar si es pool de ZFS.
if not output or output == "UNKNOWN":
blkid_output = subprocess.getoutput(f"blkid -s TYPE {DISK} | grep zfs")
if blkid_output:
output = "ZPOOL"
# Mostrar salida.
if output:
print(output)
return
def ogGetPartitionType(*args):
# Variables locales
ID = None
TYPE = None
# Si se solicita, mostrar ayuda.
if len(args) == 1 and args[0] == "help":
ogHelp('ogGetPartitionType', 'ogGetPartitionType int_ndisk int_npartition', 'ogGetPartitionType 1 1 => NTFS')
return
# Error si no se reciben 2 parámetros.
if len(args) != 2:
ogRaiseError(OG_ERR_FORMAT)
return
# Detectar id. de tipo de partición y codificar al mnemónico.
ID = ogGetPartitionId(args[0], args[1])
if ID is None:
return
TYPE = ogIdToType(ID)
print(TYPE)
return
def ogHidePartition(*args):
# Variables locales
PART = None
TYPE = None
NEWTYPE = None
# Si se solicita, mostrar ayuda.
if len(args) == 1 and args[0] == "help":
ogHelp('ogHidePartition', 'ogHidePartition int_ndisk int_npartition', 'ogHidePartition 1 1')
return
# Error si no se reciben 2 parámetros.
if len(args) != 2:
ogRaiseError(OG_ERR_FORMAT)
return
# Obtener el dispositivo de la partición.
PART = ogDiskToDev(args[0], args[1])
if PART is None:
return
# Obtener tipo de partición.
TYPE = ogGetPartitionType(args[0], args[1])
if TYPE == "NTFS":
NEWTYPE = "HNTFS"
elif TYPE == "FAT32":
NEWTYPE = "HFAT32"
elif TYPE == "FAT16":
NEWTYPE = "HFAT16"
elif TYPE == "FAT12":
NEWTYPE = "HFAT12"
elif TYPE == "WINDOWS":
NEWTYPE = "WIN-RESERV"
else:
ogRaiseError(OG_ERR_PARTITION, TYPE)
return
# Cambiar tipo de partición.
ogSetPartitionType(args[0], args[1], NEWTYPE)
return
def ogIdToType(ID):
# Obtener valor hexadecimal de 4 caracteres rellenado con 0 por delante.
ID = ID.zfill(4)
TYPE = None
# Asignar el tipo de partición según el ID.
if ID == "0000":
TYPE = "EMPTY"
elif ID == "0001":
TYPE = "FAT12"
elif ID in ["0005", "000f"]:
TYPE = "EXTENDED"
elif ID in ["0006", "000e"]:
TYPE = "FAT16"
elif ID == "0007":
TYPE = "NTFS"
elif ID in ["000b", "000c"]:
TYPE = "FAT32"
elif ID == "0011":
TYPE = "HFAT12"
elif ID == "0012":
TYPE = "COMPAQDIAG"
elif ID in ["0016", "001e"]:
TYPE = "HFAT16"
elif ID == "0017":
TYPE = "HNTFS"
elif ID in ["001b", "001c"]:
TYPE = "HFAT32"
elif ID == "0042":
TYPE = "WIN-DYNAMIC"
elif ID in ["0082", "8200"]:
TYPE = "LINUX-SWAP"
elif ID in ["0083", "8300"]:
TYPE = "LINUX"
elif ID in ["008e", "8E00"]:
TYPE = "LINUX-LVM"
elif ID in ["00a5", "a503"]:
TYPE = "FREEBSD"
elif ID == "00a6":
TYPE = "OPENBSD"
elif ID == "00a7":
TYPE = "CACHE" # (compatibilidad con Brutalix)
elif ID in ["00af", "af00"]:
TYPE = "HFS"
elif ID in ["00be", "be00"]:
TYPE = "SOLARIS-BOOT"
elif ID in ["00bf", "bf00145"]:
TYPE = "SOLARIS"
elif ID in ["00ca", "ca00"]:
TYPE = "CACHE"
elif ID == "00da":
TYPE = "DATA"
elif ID == "00ee":
TYPE = "GPT"
elif ID in ["00ef", "ef00"]:
TYPE = "EFI"
elif ID == "00fb":
TYPE = "VMFS"
elif ID in ["00fd", "fd00"]:
TYPE = "LINUX-RAID"
elif ID == "0700":
TYPE = "WINDOWS"
elif ID == "0c01":
TYPE = "WIN-RESERV"
elif ID == "7f00":
TYPE = "CHROMEOS-KRN"
elif ID == "7f01":
TYPE = "CHROMEOS"
elif ID == "7f02":
TYPE = "CHROMEOS-RESERV"
elif ID == "8301":
TYPE = "LINUX-RESERV"
elif ID == "a500":
TYPE = "FREEBSD-DISK"
elif ID == "a501":
TYPE = "FREEBSD-BOOT"
elif ID == "a502":
TYPE = "FREEBSD-SWAP"
elif ID == "ab00":
TYPE = "HFS-BOOT"
elif ID == "af01":
TYPE = "HFS-RAID"
elif ID == "bf02":
TYPE = "SOLARIS-SWAP"
elif ID == "bf03":
TYPE = "SOLARIS-DISK"
elif ID == "ef01":
TYPE = "MBR"
elif ID == "ef02":
TYPE = "BIOS-BOOT"
elif ID == "10000":
TYPE = "LVM-LV"
elif ID == "10010":
TYPE = "ZFS-VOL"
else:
TYPE = "UNKNOWN"
return TYPE
def ogIsDiskLocked(*args):
# Variables locales
DISK = None
LOCKFILE = None
# Si se solicita, mostrar ayuda.
if len(args) == 1 and args[0] == "help":
ogHelp('ogIsDiskLocked', 'ogIsDiskLocked int_ndisk', 'if ogIsDiskLocked(1): ...')
return
# Falso, en caso de error.
if len(args) != 1:
return False
DISK = ogDiskToDev(args[0], 2)
if DISK is None:
return False
# Comprobar existencia de fichero de bloqueo para el disco.
LOCKFILE = f"/var/lock/lock{DISK.replace('/', '-')}"
return os.path.isfile(LOCKFILE)
def ogListPartitions(*args):
# Variables locales
DISK = None
PART = None
NPARTS = None
TYPE = None
SIZE = None
# Si se solicita, mostrar ayuda.
if len(args) == 1 and args[0] == "help":
ogHelp('ogListPartitions', 'ogListPartitions int_ndisk', 'ogListPartitions 1 => NTFS:10000000 EXT3:5000000 LINUX-SWAP:1000000')
return
# Error si no se recibe 1 parámetro.
if len(args) != 1:
ogRaiseError(OG_ERR_FORMAT)
return
# Procesar la salida de parted.
DISK = ogDiskToDev(args[0])
if DISK is None:
return
NPARTS = ogGetPartitionsNumber(args[0])
for PART in range(1, NPARTS + 1):
TYPE = ogGetPartitionType(args[0], PART) or "EMPTY"
SIZE = ogGetPartitionSize(args[0], PART) or 0
print(f"{TYPE}:{SIZE} ", end="")
print()
return
def ogListPrimaryPartitions(*args):
# Variables locales
PTTYPE = None
PARTS = None
# Si se solicita, mostrar ayuda.
if len(args) == 1 and args[0] == "help":
ogHelp('ogListPrimaryPartitions', 'ogListPrimaryPartitions int_ndisk', 'ogListPrimaryPartitions 1 => NTFS:10000000 EXT3:5000000 EXTENDED:1000000')
return
PTTYPE = ogGetPartitionTableType(args[0])
if PTTYPE is None:
return
PARTS = ogListPartitions(*args)
if PARTS is None:
return
if PTTYPE == "GPT":
print(PARTS.rstrip(" EMPTY:0"))
elif PTTYPE == "MSDOS":
print(PARTS.split(" ")[0:4])
return
def ogListLogicalPartitions(*args):
# Variables locales
PTTYPE = None
PARTS = None
# Si se solicita, mostrar ayuda.
if len(args) == 1 and args[0] == "help":
ogHelp('ogListLogicalPartitions', 'ogListLogicalPartitions int_ndisk', 'ogListLogicalPartitions 1 => LINUX-SWAP:999998')
return
PTTYPE = ogGetPartitionTableType(args[0])
if PTTYPE is None:
return
PARTS = ogListPartitions(*args)
if PARTS is None:
return
return PARTS.split(" ")[4:]
def ogLockDisk(*args):
# Variables locales
DISK = None
LOCKFILE = None
# Si se solicita, mostrar ayuda.
if len(args) == 1 and args[0] == "help":
ogHelp('ogLockDisk', 'ogLockDisk int_ndisk', 'ogLockDisk 1')
return
# Error si no se recibe 1 parámetro.
if len(args) != 1:
ogRaiseError(OG_ERR_FORMAT)
return
# Obtener partición.
DISK = ogDiskToDev(args[0])
if DISK is None:
return
# Crear archivo de bloqueo exclusivo.
LOCKFILE = f"/var/lock/lock{DISK.replace('/', '-')}"
open(LOCKFILE, 'a').close()
def ogSetPartitionActive(*args):
# Variables locales
DISK = None
PART = None
# Si se solicita, mostrar ayuda.
if len(args) == 1 and args[0] == "help":
ogHelp('ogSetPartitionActive', 'ogSetPartitionActive int_ndisk int_npartition', 'ogSetPartitionActive 1 1')
return
# Error si no se reciben 2 parámetros.
if len(args) != 2:
ogRaiseError(OG_ERR_FORMAT)
return
# Comprobar que el disco existe y activar la partición indicada.
DISK = ogDiskToDev(args[0])
if DISK is None:
return
PART = ogDiskToDev(args[0], args[1])
if PART is None:
return
subprocess.run(["parted", "-s", DISK, "set", args[1], "boot", "on"], stderr=subprocess.DEVNULL)
return
def ogSetPartitionId(*args):
# Variables locales
DISK = None
PART = None
PTTYPE = None
ID = None
# Si se solicita, mostrar ayuda.
if len(args) == 1 and args[0] == "help":
ogHelp('ogSetPartitionId', 'ogSetPartitionId int_ndisk int_npartition hex_partid', 'ogSetPartitionId 1 1 7')
return
# Error si no se reciben 3 parámetros.
if len(args) != 3:
ogRaiseError(OG_ERR_FORMAT)
return
# Sustituye nº de disco y nº partición por su dispositivo.
DISK = ogDiskToDev(args[0])
if DISK is None:
return
PART = ogDiskToDev(args[0], args[1])
if PART is None:
return
# Error si el id. de partición no es hexadecimal.
ID = args[2].upper()
if not re.match("^[0-9A-F]+$", ID):
ogRaiseError(OG_ERR_OUTOFLIMIT, args[2])
return
# Elección del tipo de partición.
PTTYPE = ogGetPartitionTableType(args[0])
if PTTYPE == "GPT":
subprocess.run(["sgdisk", f"-t{args[1]}:{ID}", DISK], stderr=subprocess.DEVNULL)
elif PTTYPE == "MSDOS":
subprocess.run(["sfdisk", f"--id", DISK, args[1], ID], stderr=subprocess.DEVNULL)
else:
ogRaiseError(OG_ERR_OUTOFLIMIT, f"{args[0]},{PTTYPE}")
return
# MSDOS) Correcto si fdisk sin error o con error pero realiza Syncing
if subprocess.run(["partprobe", DISK], stderr=subprocess.DEVNULL).returncode == 0:
return
else:
ogRaiseError(OG_ERR_PARTITION, f"{args[0]},{args[1]},{args[2]}")
return
def ogSetPartitionSize(*args):
# Variables locales
DISK = None
PART = None
SIZE = None
# Si se solicita, mostrar ayuda.
if len(args) == 1 and args[0] == "help":
ogHelp('ogSetPartitionSize', 'ogSetPartitionSize int_ndisk int_npartition int_size', 'ogSetPartitionSize 1 1 10000000')
return
# Error si no se reciben 3 parámetros.
if len(args) != 3:
ogRaiseError(OG_ERR_FORMAT)
return
# Obtener el tamaño de la partición.
DISK = ogDiskToDev(args[0])
if DISK is None:
return
PART = ogDiskToDev(args[0], args[1])
if PART is None:
return
# Convertir tamaño en KB a sectores de 512 B.
SIZE = int(args[2]) * 2
# Redefinir el tamaño de la partición.
subprocess.run(["sfdisk", "-f", "-uS", f"-N{args[1]}", DISK], input=f",{SIZE}", text=True, stderr=subprocess.DEVNULL)
subprocess.run(["partprobe", DISK], stderr=subprocess.DEVNULL)
return
def ogSetPartitionType(*args):
# Variables locales
DISK = None
PART = None
PTTYPE = None
TYPE = None
# Si se solicita, mostrar ayuda.
if len(args) == 1 and args[0] == "help":
ogHelp('ogSetPartitionType', 'ogSetPartitionType int_ndisk int_npartition str_type', 'ogSetPartitionType 1 1 NTFS')
return
# Error si no se reciben 3 parámetros.
if len(args) != 3:
ogRaiseError(OG_ERR_FORMAT)
return
# Sustituye nº de disco por su dispositivo.
DISK = ogDiskToDev(args[0])
if DISK is None:
return
PART = ogDiskToDev(args[0], args[1])
if PART is None:
return
# Elección del tipo de partición.
PTTYPE = ogGetPartitionTableType(args[0])
if PTTYPE is None:
return
TYPE = args[2]
ID = ogTypeToId(TYPE, PTTYPE)
if ID is None:
ogRaiseError(OG_ERR_FORMAT, f"{TYPE},{PTTYPE}")
return
ogSetPartitionId(args[0], args[1], ID)
return
def ogTypeToId(TYPE, PTTYPE="MSDOS"):
# Asociar id. de partición para su mnemónico.
ID = ""
# Elección del tipo de partición.
if PTTYPE == "GPT":
if TYPE == "EMPTY":
ID = "0"
elif TYPE in ["WINDOWS", "NTFS", "EXFAT", "FAT32", "FAT16", "FAT12", "HNTFS", "HFAT32", "HFAT16", "HFAT12"]:
ID = "0700"
elif TYPE == "WIN-RESERV":
ID = "0C01"
elif TYPE == "CHROMEOS-KRN":
ID = "7F00"
elif TYPE == "CHROMEOS":
ID = "7F01"
elif TYPE == "CHROMEOS-RESERV":
ID = "7F02"
elif TYPE == "LINUX-SWAP":
ID = "8200"
elif TYPE in ["LINUX", "EXT2", "EXT3", "EXT4", "REISERFS", "REISER4", "XFS", "JFS"]:
ID = "8300"
elif TYPE == "LINUX-RESERV":
ID = "8301"
elif TYPE == "LINUX-LVM":
ID = "8E00"
elif TYPE == "FREEBSD-DISK":
ID = "A500"
elif TYPE == "FREEBSD-BOOT":
ID = "A501"
elif TYPE == "FREEBSD-SWAP":
ID = "A502"
elif TYPE == "FREEBSD":
ID = "A503"
elif TYPE == "HFS-BOOT":
ID = "AB00"
elif TYPE in ["HFS", "HFS+"]:
ID = "AF00"
elif TYPE == "HFS-RAID":
ID = "AF01"
elif TYPE == "SOLARIS-BOOT":
ID = "BE00"
elif TYPE == "SOLARIS":
ID = "BF00"
elif TYPE == "SOLARIS-SWAP":
ID = "BF02"
elif TYPE == "SOLARIS-DISK":
ID = "BF03"
elif TYPE == "CACHE":
ID = "CA00"
elif TYPE == "EFI":
ID = "EF00"
elif TYPE == "LINUX-RAID":
ID = "FD00"
elif PTTYPE == "MSDOS":
if TYPE == "EMPTY":
ID = "0"
elif TYPE == "FAT12":
ID = "1"
elif TYPE == "EXTENDED":
ID = "5"
elif TYPE == "FAT16":
ID = "6"
elif TYPE in ["WINDOWS", "NTFS", "EXFAT"]:
ID = "7"
elif TYPE == "FAT32":
ID = "b"
elif TYPE == "HFAT12":
ID = "11"
elif TYPE == "HFAT16":
ID = "16"
elif TYPE == "HNTFS":
ID = "17"
elif TYPE == "HFAT32":
ID = "1b"
elif TYPE == "LINUX-SWAP":
ID = "82"
elif TYPE in ["LINUX", "EXT2", "EXT3", "EXT4", "REISERFS", "REISER4", "XFS", "JFS"]:
ID = "83"
elif TYPE == "LINUX-LVM":
ID = "8e"
elif TYPE == "FREEBSD":
ID = "a5"
elif TYPE == "OPENBSD":
ID = "a6"
elif TYPE in ["HFS", "HFS+"]:
ID = "af"
elif TYPE == "SOLARIS-BOOT":
ID = "be"
elif TYPE == "SOLARIS":
ID = "bf"
elif TYPE == "CACHE":
ID = "ca"
elif TYPE == "DATA":
ID = "da"
elif TYPE == "GPT":
ID = "ee"
elif TYPE == "EFI":
ID = "ef"
elif TYPE == "VMFS":
ID = "fb"
elif TYPE == "LINUX-RAID":
ID = "fd"
elif PTTYPE == "LVM":
if TYPE == "LVM-LV":
ID = "10000"
elif PTTYPE == "ZVOL":
if TYPE == "ZFS-VOL":
ID = "10010"
return ID
def ogUnhidePartition(*args):
# Variables locales
PART = None
TYPE = None
NEWTYPE = None
# Si se solicita, mostrar ayuda.
if len(args) == 1 and args[0] == "help":
ogHelp('ogUnhidePartition', 'ogUnhidePartition int_ndisk int_npartition', 'ogUnhidePartition 1 1')
return
# Error si no se reciben 2 parámetros.
if len(args) != 2:
ogRaiseError(OG_ERR_FORMAT)
return
PART = ogDiskToDev(args[0], args[1])
if PART is None:
return
# Obtener tipo de partición.
TYPE = ogGetPartitionType(args[0], args[1])
if TYPE == "HNTFS":
NEWTYPE = "NTFS"
elif TYPE == "HFAT32":
NEWTYPE = "FAT32"
elif TYPE == "HFAT16":
NEWTYPE = "FAT16"
elif TYPE == "HFAT12":
NEWTYPE = "FAT12"
elif TYPE == "WIN-RESERV":
NEWTYPE = "WINDOWS"
else:
ogRaiseError(OG_ERR_PARTITION, TYPE)
return
# Cambiar tipo de partición.
ogSetPartitionType(args[0], args[1], NEWTYPE)
return
def ogUnlockDisk(*args):
# Variables locales
DISK = None
LOCKFILE = None
# Si se solicita, mostrar ayuda.
if len(args) == 1 and args[0] == "help":
ogHelp('ogUnlockDisk', 'ogUnlockDisk int_ndisk', 'ogUnlockDisk 1')
return
# Error si no se recibe 1 parámetro.
if len(args) != 1:
ogRaiseError(OG_ERR_FORMAT)
return
# Obtener partición.
DISK = ogDiskToDev(args[0])
if DISK is None:
return
# Borrar archivo de bloqueo exclusivo.
LOCKFILE = f"/var/lock/lock{DISK.replace('/', '-')}"
os.remove(LOCKFILE)
return
def ogUpdatePartitionTable():
for disk in ogDiskToDev():
subprocess.run(["partprobe", disk])