src: isolate libfdisk operations to enable mount operations

python-libfdisk does not close file descriptor until the cxt
object goes out of scope.

Define get_partition_data and get_disk_data functions to isolate
the python-libfdisk logic and return the data as an object.

Improve error handling of libfdisk operaions in refresh.
master v1.3.2-20
Alejandro Sirgo Rica 2024-10-07 13:54:35 +02:00
parent 97c836e0e4
commit e8ddbbd075
3 changed files with 71 additions and 34 deletions

View File

@ -57,35 +57,30 @@ class OgLiveOperations:
except OSError as e:
raise OgError('Cannot restart browser') from e
def _refresh_payload_disk(self, cxt, part_setup, num_disk):
def _refresh_payload_disk(self, disk_data, part_setup, num_disk):
part_setup['disk'] = str(num_disk)
part_setup['disk_type'] = 'DISK'
part_setup['partition'] = '0'
part_setup['filesystem'] = ''
part_setup['os'] = ''
part_setup['size'] = str(cxt.nsectors * cxt.sector_size // 1024)
part_setup['size'] = str(disk_data.nsectors * disk_data.sector_size // 1024)
part_setup['used_size'] = 0
part_setup['free_size'] = 0
if not cxt.label:
if not disk_data.label_name:
part_setup['code'] = '0'
else:
part_setup['code'] = '2' if cxt.label.name == 'gpt' else '1'
part_setup['code'] = '2' if disk_data.label_name == 'gpt' else '1'
def _refresh_payload_partition(self, cxt, pa, part_setup, disk):
parttype = cxt.partition_to_string(pa, fdisk.FDISK_FIELD_TYPEID)
fstype = cxt.partition_to_string(pa, fdisk.FDISK_FIELD_FSTYPE)
padev = cxt.partition_to_string(pa, fdisk.FDISK_FIELD_DEVICE)
size = cxt.partition_to_string(pa, fdisk.FDISK_FIELD_SIZE)
def _refresh_payload_partition(self, disk_data, pa, part_setup, disk):
partnum = pa.partno + 1
source = padev
target = padev.replace('dev', 'mnt')
target = pa.padev.replace('dev', 'mnt')
if cxt.label.name == 'gpt':
code = GUID_MAP.get(parttype, 0x0)
if disk_data.label_name == 'gpt':
code = GUID_MAP.get(pa.parttype, 0x0)
else:
code = int(parttype, base=16)
code = int(pa.parttype, base=16)
if mount_mkdir(source, target):
if mount_mkdir(pa.padev, target):
probe_result = os_probe(target)
part_setup['os'] = probe_result
@ -101,10 +96,10 @@ class OgLiveOperations:
part_setup['disk_type'] = ''
part_setup['partition'] = str(partnum)
part_setup['filesystem'] = fstype.upper() if fstype else 'EMPTY'
part_setup['filesystem'] = pa.fstype.upper() if pa.fstype else 'EMPTY'
# part_setup['code'] = hex(code).removeprefix('0x')
part_setup['code'] = hex(code)[2:]
part_setup['size'] = str(int(size) // 1024)
part_setup['size'] = str(int(pa.size) // 1024)
if (part_setup['filesystem'] == 'VFAT'):
part_setup['filesystem'] = 'FAT32'
@ -112,9 +107,8 @@ class OgLiveOperations:
if (part_setup['filesystem'] == 'SWAP'):
part_setup['filesystem'] = 'LINUX-SWAP'
def _refresh_part_setup_cache(self, cxt, pa, part_setup, cache):
padev = cxt.partition_to_string(pa, fdisk.FDISK_FIELD_DEVICE)
if padev == cache:
def _refresh_part_setup_cache(self, pa, part_setup, cache):
if pa.padev == cache:
part_setup['filesystem'] = 'CACHE'
part_setup['code'] = 'ca'
@ -620,19 +614,19 @@ class OgLiveOperations:
raise OgError(f'Invalid disk number {disk}, {len(get_disks())} disks available.')
diskname = get_disks()[disk-1]
cxt = fdisk.Context(f'/dev/{diskname}', details=True)
partitions = get_partition_data(device=f'/dev/{diskname}')
pa = None
for i, p in enumerate(cxt.partitions):
for p in partitions:
if (p.partno + 1) == partition:
pa = cxt.partitions[i]
pa = p
if pa is None:
self._restartBrowser(self._url)
raise OgError(f'Target partition {partition} not found in /dev/{diskname}')
padev = cxt.partition_to_string(pa, fdisk.FDISK_FIELD_DEVICE)
fstype = cxt.partition_to_string(pa, fdisk.FDISK_FIELD_FSTYPE)
padev = pa.padev
fstype = pa.fstype
if not fstype:
raise OgError(f'No filesystem detected in {padev}. Aborting image creation')
@ -810,17 +804,19 @@ class OgLiveOperations:
logging.debug('refresh: processing %s', disk)
part_setup = {}
try:
cxt = fdisk.Context(device=f'/dev/{disk}', details=True)
except:
partitions = get_partition_data(device=f'/dev/{disk}')
disk_data = get_disk_data(device=f'/dev/{disk}')
except Exception as e:
logging.error(e)
continue
self._refresh_payload_disk(cxt, part_setup, num_disk)
self._refresh_payload_disk(disk_data, part_setup, num_disk)
json_body['disk_setup'].append(part_setup)
for pa in cxt.partitions:
for pa in partitions:
part_setup = part_setup.copy()
self._refresh_payload_partition(cxt, pa, part_setup, disk)
self._refresh_part_setup_cache(cxt, pa, part_setup, cache)
self._refresh_payload_partition(disk_data, pa, part_setup, disk)
self._refresh_part_setup_cache(pa, part_setup, cache)
json_body['partition_setup'].append(part_setup)
json_body['cache'] = self._get_cache_contents()

View File

@ -16,6 +16,47 @@ from src.log import OgError
import fdisk
class ogPartitionData:
def __init__(self, parttype, fstype, padev, size, partno):
self.parttype = parttype
self.fstype = fstype
self.padev = padev
self.size = size
self.partno = partno
class ogDiskData:
def __init__(self, nsectors, sector_size, label_name):
self.nsectors = nsectors
self.sector_size = sector_size
self.label_name = label_name
def get_partition_data(device):
res = []
try:
cxt = fdisk.Context(device=device, details=True)
except Exception as e:
raise OgError(f'Partition query error: {e}') from e
for i, p in enumerate(cxt.partitions):
pd = ogPartitionData(
parttype = cxt.partition_to_string(p, fdisk.FDISK_FIELD_TYPEID),
fstype = cxt.partition_to_string(p, fdisk.FDISK_FIELD_FSTYPE),
padev = cxt.partition_to_string(p, fdisk.FDISK_FIELD_DEVICE),
size = cxt.partition_to_string(p, fdisk.FDISK_FIELD_SIZE),
partno = p.partno)
res.append(pd)
return res
def get_disk_data(device):
try:
cxt = fdisk.Context(device=device, details=True)
except Exception as e:
raise OgError(f'Partition query error: {e}') from e
return ogDiskData(
nsectors = cxt.nsectors,
sector_size = cxt.sector_size,
label_name = cxt.label.name if cxt.label else "")
def get_disks():
"""
Walks /sys/block/ and returns files starting with 'sd',

View File

@ -158,11 +158,11 @@ def configure_swap(disk, mountpoint, fstab):
swap_entry = entry
diskname = get_disks()[disk-1]
cxt = fdisk.Context(f'/dev/{diskname}')
partitions = get_partition_data(device=f'/dev/{diskname}')
swap_device = ''
for pa in cxt.partitions:
if cxt.partition_to_string(pa, fdisk.FDISK_FIELD_FSTYPE) == 'swap':
for pa in partitions:
if pa.fstype == 'swap':
swap_device = get_formatted_device(disk, pa.partno + 1)
break