mirror of https://git.48k.eu/ogclient
151 lines
5.3 KiB
Python
151 lines
5.3 KiB
Python
#
|
|
# Copyright (C) 2023 Soleta Networks <info@soleta.eu>
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify it under
|
|
# the terms of the GNU Affero General Public License as published by the
|
|
# Free Software Foundation; either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import shlex
|
|
import subprocess
|
|
import shutil
|
|
from src.utils.disk import *
|
|
from src.utils.fs import *
|
|
from src.utils.probe import *
|
|
|
|
import fdisk
|
|
|
|
EFIBOOTMGR_BIN='/opt/opengnsys/bin/efibootmgr'
|
|
|
|
def _find_bootentry(entries, label):
|
|
for entry in entries:
|
|
if entry['description'] == label:
|
|
return entry
|
|
else:
|
|
raise NameError('Boot entry {label} not found')
|
|
|
|
|
|
def _strip_boot_prefix(entry):
|
|
try:
|
|
num = entry['name'][4:]
|
|
except:
|
|
raise KeyError('Unable to strip "Boot" prefix from boot entry')
|
|
return num
|
|
|
|
|
|
def _check_efibootmgr_json():
|
|
"""
|
|
Checks if efibootmgr --json option is supported.
|
|
Returns True if supported, False otherwise.
|
|
"""
|
|
supported = True
|
|
try:
|
|
subprocess.run([EFIBOOTMGR_BIN, '--json'], stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL, check=True)
|
|
except subprocess.CalledProcessError:
|
|
supported = False
|
|
return supported
|
|
|
|
|
|
def is_uefi_supported():
|
|
return os.path.exists("/sys/firmware/efi")
|
|
|
|
|
|
def run_efibootmgr_json():
|
|
if _check_efibootmgr_json() is False:
|
|
raise RuntimeError(f'{EFIBOOTMGR_BIN} not available')
|
|
|
|
proc = subprocess.run([EFIBOOTMGR_BIN, '--json'], capture_output=True, text=True)
|
|
dict_json = json.loads(proc.stdout)
|
|
|
|
return dict_json
|
|
|
|
|
|
def efibootmgr_bootnext(description):
|
|
logging.info(f'Setting BootNext to value from boot entry with label {description}')
|
|
bootnext_cmd = '{efibootmgr} -n {bootnum}'
|
|
boot_entries = run_efibootmgr_json().get('vars', [])
|
|
|
|
entry = _find_bootentry(boot_entries, description)
|
|
num = _strip_boot_prefix(entry) # efibootmgr output uses BootXXXX for each entry, remove the "Boot" prefix.
|
|
bootnext_cmd = bootnext_cmd.format(bootnum=num, efibootmgr=EFIBOOTMGR_BIN)
|
|
subprocess.run(shlex.split(bootnext_cmd), check=True,
|
|
stdout=subprocess.DEVNULL)
|
|
|
|
|
|
def efibootmgr_delete_bootentry(label):
|
|
dict_json = run_efibootmgr_json()
|
|
efibootmgr_cmd = '{efibootmgr} -q -b {bootnum} -B'
|
|
|
|
for entry in dict_json.get('vars', []):
|
|
if entry['description'] == label:
|
|
num = entry['name'][4:] # Remove "Boot" prefix to extract num
|
|
efibootmgr_cmd = efibootmgr_cmd.format(bootnum=num, efibootmgr=EFIBOOTMGR_BIN)
|
|
subprocess.run(shlex.split(efibootmgr_cmd), check=True)
|
|
break
|
|
else:
|
|
logging.info(f'Cannot delete boot entry {label} because it was not found.')
|
|
|
|
|
|
def efibootmgr_create_bootentry(disk, part, loader, label, add_to_bootorder=True):
|
|
entries = run_efibootmgr_json().get('vars', [])
|
|
create_opt = '-c' if add_to_bootorder else '-C'
|
|
index_opt = f'-I {len(entries)}' # Requires efibootmgr version 18
|
|
efibootmgr_cmd = f'{EFIBOOTMGR_BIN} {create_opt} {index_opt} -d {disk} -p {part} -L {label} -l {loader}'
|
|
logging.info(f'{EFIBOOTMGR_BIN} command creating boot entry: {efibootmgr_cmd}')
|
|
try:
|
|
proc = subprocess.run(shlex.split(efibootmgr_cmd), check=True, text=True)
|
|
except OSError as e:
|
|
raise OSError(f'Unexpected error adding boot entry to nvram. UEFI firmware might be buggy') from e
|
|
|
|
|
|
def copy_windows_efi_bootloader(disk, partition):
|
|
device = get_partition_device(disk, partition)
|
|
mountpoint = device.replace('dev', 'mnt')
|
|
if not mount_mkdir(device, mountpoint):
|
|
raise RuntimeError(f'Cannot probe OS family. Unable to mount {device} into {mountpoint}')
|
|
|
|
os_family = get_os_family(mountpoint)
|
|
is_uefi = is_uefi_supported()
|
|
|
|
if not is_uefi or os_family != OSFamily.WINDOWS:
|
|
return
|
|
|
|
bootlabel = f'Part-{disk:02d}-{partition:02d}'
|
|
esp, esp_disk, esp_part_number = get_efi_partition(disk, enforce_gpt=True)
|
|
esp_mountpoint = esp.replace('dev', 'mnt')
|
|
if not mount_mkdir(esp, esp_mountpoint):
|
|
umount(mountpoint)
|
|
raise RuntimeError(f'Unable to mount detected EFI System Partition at {esp} into {esp_mountpoint}')
|
|
|
|
loader_paths = [f'{esp_mountpoint}/EFI/{bootlabel}/Boot/bootmgfw.efi',
|
|
f'{esp_mountpoint}/EFI/Microsoft/Boot/bootmgfw.efi']
|
|
try:
|
|
for efi_app in loader_paths:
|
|
logging.info(f'Searching for {efi_app}')
|
|
if os.path.exists(efi_app):
|
|
loader = efi_app
|
|
logging.info(f'Found bootloader at ESP partition: {loader}')
|
|
break
|
|
else:
|
|
raise RuntimeError(f'Unable to locate Windows EFI bootloader bootmgfw.efi')
|
|
|
|
loader_dir = os.path.dirname(loader)
|
|
destination_dir = f'{mountpoint}/ogBoot'
|
|
if os.path.exists(destination_dir):
|
|
try:
|
|
shutil.rmtree(destination_dir)
|
|
except Exception as e:
|
|
raise OSError(f'Failed to delete {destination_dir}: {e}') from e
|
|
logging.info(f'Copying {loader_dir} into {destination_dir}')
|
|
try:
|
|
shutil.copytree(loader_dir, destination_dir)
|
|
except Exception as e:
|
|
raise OSError(f'Failed to copy {loader_dir} into {destination_dir}: {e}') from e
|
|
finally:
|
|
umount(mountpoint)
|
|
umount(esp_mountpoint)
|