ogclient/src/utils/disk.py

193 lines
6.4 KiB
Python

#
# Copyright (C) 2022 Soleta Networks <info@soleta.eu>
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Affero General Public License as published by the
# Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
import os
import logging
import shlex
import subprocess
import json
from src.log import OgError
import fdisk
class ogPartitionData:
def __init__(self, parttype, fstype, padev, size, partno):
self.parttype = parttype
self.fstype = fstype
self.padev = padev
self.size = size
self.partno = partno
class ogDiskData:
def __init__(self, nsectors, sector_size, label_name):
self.nsectors = nsectors
self.sector_size = sector_size
self.label_name = label_name
def get_partition_data(device):
res = []
try:
cxt = fdisk.Context(device=device, details=True)
except Exception as e:
raise OgError(f'Partition query error for {device}: {e}') from e
for i, p in enumerate(cxt.partitions):
pd = ogPartitionData(
parttype = cxt.partition_to_string(p, fdisk.FDISK_FIELD_TYPEID),
fstype = cxt.partition_to_string(p, fdisk.FDISK_FIELD_FSTYPE),
padev = cxt.partition_to_string(p, fdisk.FDISK_FIELD_DEVICE),
size = cxt.partition_to_string(p, fdisk.FDISK_FIELD_SIZE),
partno = p.partno)
res.append(pd)
return res
def get_disk_data(device):
try:
cxt = fdisk.Context(device=device, details=True)
except Exception as e:
raise OgError(f'Disk query error for {device}: {e}') from e
return ogDiskData(
nsectors = cxt.nsectors,
sector_size = cxt.sector_size,
label_name = cxt.label.name if cxt.label else "")
def get_disks():
"""
Walks /sys/block/ and returns files starting with 'sd',
'nvme' or 'vd'
"""
return sorted([ dev for dev in os.listdir('/sys/block/')
if dev.startswith('sd')
or dev.startswith('nvme')
or dev.startswith('vd')])
def get_partition_device(disknum, partnum):
"""
Returns the device path, given a disk and partition number
"""
disk_index = disknum - 1
if disk_index < 0 or disk_index >= len(get_disks()):
raise OgError(f'Invalid disk number {disknum}, {len(get_disks())} disks available.')
disk = get_disks()[disk_index]
cxt = fdisk.Context(f'/dev/{disk}')
for pa in cxt.partitions:
if pa.partno == partnum - 1:
return cxt.partition_to_string(pa, fdisk.FDISK_FIELD_DEVICE)
raise OgError(f'No such partition with disk index {disknum} and partition index {partnum}')
def get_efi_partition(disknum, enforce_gpt):
"""
Look for an EFI System Partition at the n-th disk. If disknum is invalid an
exception is thrown.
If enforce_gpt is set to True the ESP will be ignored in a MBR partition
scheme.
Returns tuple with:
- Device name containing the ESP
- /dev/{device} string
- Partition number (starting at 1)
"""
disk_index = disknum - 1
if disk_index < 0 or disk_index >= len(get_disks()):
raise OgError(f'Invalid disk number {disknum} when trying to find EFI partition, {len(get_disks())} disks available.')
disk = get_disks()[disk_index]
cxt = fdisk.Context(f'/dev/{disk}')
if enforce_gpt and cxt.label.name == 'dos':
raise OgError(f'Windows EFI partition requires GPT partition scheme, but /dev/{disk} has DOS partition scheme')
logging.info('Searching EFI partition...')
for pa in cxt.partitions:
if pa.type.name == 'EFI System':
logging.info(f'EFI partition found at /dev/{disk}')
return cxt.partition_to_string(pa, fdisk.FDISK_FIELD_DEVICE), f'/dev/{disk}', pa.partno + 1
raise OgError(f'Cannot find EFI partition at /dev/{disk}')
def get_partition_id(disk_index, part_index):
device = get_partition_device(disk_index, part_index)
cmd = f'blkid -s PARTUUID -o value {device}'
proc = subprocess.run(shlex.split(cmd),
stdout=subprocess.PIPE,
encoding='utf-8')
if proc.returncode != 0:
raise OgError(f'failed to query partition UUID for {device}')
return proc.stdout.strip()
def get_disk_id(disk_index):
disk = get_disks()[disk_index - 1]
disk_path = f'/dev/{disk}'
cmd = f'blkid -s PTUUID -o value {disk_path}'
proc = subprocess.run(shlex.split(cmd),
stdout=subprocess.PIPE,
encoding='utf-8')
if proc.returncode != 0:
raise OgError(f'failed to query disk UUID for {disk_path}')
return proc.stdout.strip()
def get_filesystem_id(disk_index, part_index):
device = get_partition_device(disk_index, part_index)
cmd = f'blkid -s UUID -o value {device}'
proc = subprocess.run(shlex.split(cmd),
stdout=subprocess.PIPE,
encoding='utf-8')
if proc.returncode != 0:
raise OgError(f'failed to query filesystem UUID for {device}')
return proc.stdout.strip()
def get_sector_size(disk):
disk_index = disk - 1
if disk_index < 0 or disk_index >= len(get_disks()):
raise OgError(f'Invalid disk number {disk} when trying to find ESP, {len(get_disks())} disks available.')
device_name = get_disks()[disk_index]
file_path = f'/sys/class/block/{device_name}/queue/hw_sector_size'
try:
with open(file_path, 'r') as f:
data = f.read().strip()
except OSError as e:
raise OgError(f'Error while trying to read {file_path}: {e}') from e
return int(data)
def get_partition_start_offset(disk, partition):
disk_name = get_disks()[disk - 1]
disk_path = f'/dev/{disk_name}'
part_number = partition - 1
cmd = f'sfdisk -J {disk_path}'
proc = subprocess.run(shlex.split(cmd), capture_output=True, text=True)
if proc.returncode != 0:
raise OgError(f'Failed to query sfdisk')
try:
part_data_json = json.loads(proc.stdout)
except json.JSONDecodeError as e:
raise OgError(f'Invalid sfdisk output: {e}') from e
try:
part_data = part_data_json['partitiontable']['partitions']
start_offset = part_data[part_number]['start']
except KeyError as e:
raise OgError(f'Error while trying to parse sfdisk: {e}') from e
return start_offset