ogclient/src/utils/postinstall.py

218 lines
7.1 KiB
Python

#
# Copyright (C) 2022 Soleta Networks <info@soleta.eu>
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Affero General Public License as published by the
# Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
import subprocess
import shutil
import logging
import hivex
from src.log import OgError
from src.utils.bcd import update_bcd
from src.utils.probe import *
from src.utils.disk import *
from src.utils.winreg import *
from src.utils.fs import *
from src.utils.uefi import *
from socket import gethostname
CONFIGUREOS_LEGACY_ENABLED = False
def set_windows_hostname(disk, partition, name):
logging.info(f'Configuring hostname')
if len(name) > 15:
logging.warning(f'Windows does nor permit hostnames that exceed 15 characters. Truncating {name}')
name = name[0:15]
byte_name = name.encode(WINDOWS_HIVE_ENCODING)
device = get_partition_device(disk, partition)
mountpoint = device.replace('dev', 'mnt')
if not mount_mkdir(device, mountpoint):
raise OgError(f'Unable to mount {device} into {mountpoint}')
try:
hive_path = mountpoint + WINDOWS_HIVE_SYSTEM
hive = hive_handler_open(hive_path, write=True)
root = hive.root()
select_node = get_node_child_from_path(hive, root, 'Select')
current_control_set_number = get_value_from_node(hive, select_node, 'Current')
control_set = f'ControlSet{current_control_set_number:03}'
computer_name_node = get_node_child_from_path(hive, root, f'{control_set}/Control/ComputerName/ComputerName')
name_value = {'key': 'ComputerName', 't': RegistryType.SZ.value, 'value': byte_name}
hive.node_set_value(computer_name_node, name_value)
parameters_node = get_node_child_from_path(hive, root, f'{control_set}/Services/Tcpip/Parameters')
hostname_value = {'key': 'HostName', 't': RegistryType.SZ.value, 'value': byte_name}
hive.node_set_value(parameters_node, hostname_value)
nvhostname_value = {'key': 'NV Hostname', 't': RegistryType.SZ.value, 'value': byte_name}
hive.node_set_value(parameters_node, nvhostname_value)
hive.commit(hive_path)
except Exception as e:
raise OgError(f'Unable to configure Windows hostname: {e}') from e
finally:
umount(mountpoint)
def configure_os_custom(disk, partition):
if not shutil.which('configureOsCustom'):
raise OgError('configureOsCustom not found')
cmd_configure = f"configureOsCustom {disk} {partition}"
try:
proc = subprocess.run(cmd_configure,
stdout=subprocess.PIPE,
encoding='utf-8',
shell=True,
check=True)
out = proc.stdout
except OSError as e:
raise OgError(f'Error processing configureOsCustom: {e}') from e
def windows_register_c_drive(disk, partition):
cmd_configure = f"ogWindowsRegisterPartition {disk} {partition} C {disk} {partition}"
try:
proc = subprocess.run(cmd_configure,
stdout=subprocess.PIPE,
encoding='utf-8',
shell=True,
check=True)
except OSError as e:
raise OgError(f'Error processing ogWindowsRegisterPartition: {e}') from e
def configure_mbr_boot_sector(disk, partition):
cmd_configure = f"ogFixBootSector {disk} {partition}"
try:
proc = subprocess.run(cmd_configure,
stdout=subprocess.PIPE,
encoding='utf-8',
shell=True,
check=True)
except OSError as e:
raise OgError(f'Error processing ogFixBootSector: {e}') from e
def configure_grub_in_mbr(disk, partition):
cmd_configure = f"ogGrubInstallMbr {disk} {partition} TRUE"
try:
proc = subprocess.run(cmd_configure,
stdout=subprocess.PIPE,
encoding='utf-8',
shell=True,
check=True)
except OSError as e:
raise OgError(f'Error processing ogGrubInstallMbr: {e}') from e
def configure_fstab(disk, partition):
cmd_configure = f"ogConfigureFstab {disk} {partition}"
try:
proc = subprocess.run(cmd_configure,
stdout=subprocess.PIPE,
encoding='utf-8',
shell=True,
check=True)
except OSError as e:
raise OgError(f'Error processing ogConfigureFstab: {e}') from e
def install_grub(disk, partition):
cmd_configure = f"ogGrubInstallPartition {disk} {partition}"
try:
proc = subprocess.run(cmd_configure,
stdout=subprocess.PIPE,
encoding='utf-8',
shell=True,
check=True)
except OSError as e:
raise OgError(f'Error processing ogGrubInstallPartition: {e}') from e
def configure_os_linux(disk, partition):
configure_fstab(disk, partition)
if is_uefi_supported():
_, _, esp_part_number = get_efi_partition(disk, enforce_gpt=True)
configure_grub_in_mbr(disk, esp_part_number)
install_grub(disk, partition)
def configure_os_windows(disk, partition):
hostname = gethostname()
set_windows_hostname(disk, partition, hostname)
if is_uefi_supported():
restore_windows_efi_bootloader(disk, partition)
_, _, esp_part_number = get_efi_partition(disk, enforce_gpt=True)
configure_grub_in_mbr(disk, esp_part_number)
else:
configure_mbr_boot_sector(disk, partition)
update_bcd(disk, partition)
windows_register_c_drive(disk, partition)
def configure_os_legacy(disk, partition):
cmd_configure = f"configureOs {disk} {partition}"
try:
proc = subprocess.run(cmd_configure,
stdout=subprocess.PIPE,
encoding='utf-8',
shell=True,
check=True)
out = proc.stdout
except OSError as e:
raise OgError(f'Error processing configureOs: {e}') from e
def configure_os(disk, partition):
if CONFIGUREOS_LEGACY_ENABLED:
configure_os_legacy(disk, partition)
if shutil.which('configureOsCustom'):
configure_os_custom(disk, partition)
return
device = get_partition_device(disk, partition)
mountpoint = device.replace('dev', 'mnt')
if not mount_mkdir(device, mountpoint):
raise OgError(f'Cannot probe OS family. Unable to mount {device} into {mountpoint}')
os_family = get_os_family(mountpoint)
umount(mountpoint)
extend_filesystem(disk, partition)
if os_family == OSFamily.WINDOWS:
configure_os_windows(disk, partition)
elif os_family == OSFamily.LINUX:
configure_os_linux(disk, partition)
if shutil.which('configureOsCustom'):
configure_os_custom(disk, partition)