ogclient/src/utils/postinstall.py

265 lines
8.7 KiB
Python

#
# Copyright (C) 2022 Soleta Networks <info@soleta.eu>
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Affero General Public License as published by the
# Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
import subprocess
import shutil
import logging
import hivex
import shlex
from src.log import OgError
from src.utils.bcd import update_bcd
from src.utils.probe import *
from src.utils.disk import *
from src.utils.winreg import *
from src.utils.fs import *
from src.utils.uefi import *
from src.utils.fstab import *
from socket import gethostname
CONFIGUREOS_LEGACY_ENABLED = False
def set_windows_hostname(disk, partition, name):
logging.info(f'Setting Windows hostname to {name}')
if len(name) > 15:
logging.warning(f'Windows does not permit hostnames that exceed 15 characters. Truncating {name}')
name = name[0:15]
byte_name = name.encode(WINDOWS_HIVE_ENCODING)
device = get_partition_device(disk, partition)
mountpoint = device.replace('dev', 'mnt')
if not mount_mkdir(device, mountpoint):
raise OgError(f'Unable to mount {device} into {mountpoint}')
try:
hive_path = mountpoint + WINDOWS_HIVE_SYSTEM
hive = hive_handler_open(hive_path, write=True)
root = hive.root()
select_node = get_node_child_from_path(hive, root, 'Select')
current_control_set_number = get_value_from_node(hive, select_node, 'Current')
control_set = f'ControlSet{current_control_set_number:03}'
computer_name_node = get_node_child_from_path(hive, root, f'{control_set}/Control/ComputerName/ComputerName')
name_value = {'key': 'ComputerName', 't': RegistryType.SZ.value, 'value': byte_name}
hive.node_set_value(computer_name_node, name_value)
parameters_node = get_node_child_from_path(hive, root, f'{control_set}/Services/Tcpip/Parameters')
hostname_value = {'key': 'HostName', 't': RegistryType.SZ.value, 'value': byte_name}
hive.node_set_value(parameters_node, hostname_value)
nvhostname_value = {'key': 'NV Hostname', 't': RegistryType.SZ.value, 'value': byte_name}
hive.node_set_value(parameters_node, nvhostname_value)
hive.commit(hive_path)
except Exception as e:
raise OgError(f'Unable to set Windows hostname: {e}') from e
finally:
umount(mountpoint)
def set_linux_hostname(disk, partition, name):
logging.info(f'Setting Linux hostname to {name}')
if len(name) > 64:
logging.warning(f'Linux does not permit hostnames that exceed 64 characters. Truncating {name}')
name = name[0:64]
device = get_partition_device(disk, partition)
mountpoint = device.replace('dev', 'mnt')
if not mount_mkdir(device, mountpoint):
raise OgError(f'Unable to mount {device} into {mountpoint}')
try:
hostname_path = f'{mountpoint}/etc/hostname'
with open(hostname_path, 'w') as f:
f.write(name)
except OSError as e:
raise OgError(f'Unable to set Linux hostname: {e}') from e
finally:
umount(mountpoint)
def configure_os_custom(disk, partition):
command_path = shutil.which('configureOsCustom')
if not command_path:
raise OgError('configureOsCustom not found')
logging.info(f'Found configureOsCustom script, invoking it...')
cmd_configure = f"{command_path} {disk} {partition}"
proc = subprocess.run(shlex.split(cmd_configure),
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
encoding='utf-8',
check=True)
if proc.returncode != 0:
logging.warning(f'{cmd_configure} returned non-zero exit status {proc.returncode}')
def windows_register_c_drive(disk, partition):
device = get_partition_device(disk, partition)
mountpoint = device.replace('dev', 'mnt')
if not mount_mkdir(device, mountpoint):
raise OgError(f'Unable to mount {device} into {mountpoint}')
hive_path = f'{mountpoint}{WINDOWS_HIVE_SYSTEM}'
try:
hive = hive_handler_open(hive_path, write=True)
root = hive.root()
device_node = get_node_child_from_path(hive, root, 'MountedDevices')
if is_uefi_supported():
part_id = get_part_id_bytes(disk, partition)
value = b'DMIO:ID:' + part_id
else:
disk_id = get_disk_id_bytes(disk)
part_offset = get_part_id_bytes(disk, partition)
value = bytes(disk_id + part_offset)
device_value = {'key': '\DosDevices\C:', 't': RegistryType.BINARY.value, 'value': value}
hive.node_set_value(device_node, device_value)
hive.commit(hive_path)
finally:
umount(mountpoint)
def configure_mbr_boot_sector(disk, partition):
cmd_configure = f"ogFixBootSector {disk} {partition}"
proc = subprocess.run(cmd_configure,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
encoding='utf-8',
shell=True,
check=True)
if proc.returncode != 0:
logging.warning(f'{cmd_configure} returned non-zero exit status {proc.returncode}')
def configure_grub_in_mbr(disk, partition):
cmd_configure = f"ogGrubInstallMbr {disk} {partition} TRUE"
proc = subprocess.run(cmd_configure,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
encoding='utf-8',
shell=True,
check=True)
if proc.returncode != 0:
logging.warning(f'{cmd_configure} returned non-zero exit status {proc.returncode}')
def configure_fstab(disk, partition):
logging.info(f'Configuring /etc/fstab')
device = get_partition_device(disk, partition)
mountpoint = device.replace('dev', 'mnt')
if not mount_mkdir(device, mountpoint):
raise OgError(f'Unable to mount {device} into {mountpoint}')
try:
update_fstab(disk, partition, mountpoint)
finally:
umount(mountpoint)
def install_grub(disk, partition):
cmd_configure = f"ogGrubInstallPartition {disk} {partition}"
proc = subprocess.run(cmd_configure,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
encoding='utf-8',
shell=True,
check=True)
if proc.returncode != 0:
logging.warning(f'{cmd_configure} returned non-zero exit status {proc.returncode}')
def configure_os_linux(disk, partition):
hostname = gethostname()
set_linux_hostname(disk, partition, hostname)
configure_fstab(disk, partition)
if is_uefi_supported():
_, _, esp_part_number = get_efi_partition(disk, enforce_gpt=True)
configure_grub_in_mbr(disk, esp_part_number)
install_grub(disk, partition)
def configure_os_windows(disk, partition):
hostname = gethostname()
set_windows_hostname(disk, partition, hostname)
if is_uefi_supported():
restore_windows_efi_bootloader(disk, partition)
_, _, esp_part_number = get_efi_partition(disk, enforce_gpt=True)
configure_grub_in_mbr(disk, esp_part_number)
else:
configure_mbr_boot_sector(disk, partition)
update_bcd(disk, partition)
windows_register_c_drive(disk, partition)
def configure_os_legacy(disk, partition):
cmd_configure = f"configureOs {disk} {partition}"
try:
proc = subprocess.run(cmd_configure,
stdout=subprocess.PIPE,
encoding='utf-8',
shell=True,
check=True)
out = proc.stdout
except OSError as e:
raise OgError(f'Error processing configureOs: {e}') from e
def configure_os(disk, partition):
if CONFIGUREOS_LEGACY_ENABLED:
configure_os_legacy(disk, partition)
if shutil.which('configureOsCustom'):
configure_os_custom(disk, partition)
return
device = get_partition_device(disk, partition)
mountpoint = device.replace('dev', 'mnt')
logging.info(f'Configuring OS at {device}...')
if not mount_mkdir(device, mountpoint):
raise OgError(f'Cannot probe OS family. Unable to mount {device} into {mountpoint}')
os_family = get_os_family(mountpoint)
umount(mountpoint)
if os_family == OSFamily.WINDOWS:
configure_os_windows(disk, partition)
elif os_family == OSFamily.LINUX:
configure_os_linux(disk, partition)
if shutil.which('configureOsCustom'):
configure_os_custom(disk, partition)