From aa8f5bc22ef19336708b2b8b863f333db3707736 Mon Sep 17 00:00:00 2001 From: Vadim Troshchinskiy Date: Tue, 5 Nov 2024 23:28:59 +0100 Subject: [PATCH] Refactoring and more pydoc --- gitlib/filesystem.py | 527 +++++++++++++++++++++++++++++++++++++++++++ gitlib/gitlib.py | 478 +++++++-------------------------------- gitlib/ntfs.py | 111 +++++++++ 3 files changed, 716 insertions(+), 400 deletions(-) create mode 100644 gitlib/filesystem.py create mode 100644 gitlib/ntfs.py diff --git a/gitlib/filesystem.py b/gitlib/filesystem.py new file mode 100644 index 0000000..9fc9cdf --- /dev/null +++ b/gitlib/filesystem.py @@ -0,0 +1,527 @@ + +import logging +import subprocess +import os +import json +import blkid + +from ntfs import * + + + +# pylint: disable=locally-disabled, line-too-long, logging-fstring-interpolation, too-many-lines + + +class FilesystemLibrary: + def __init__(self, ntfs_implementation = NTFSImplementation.KERNEL): + self.logger = logging.getLogger("OpengnsysFilesystemLibrary") + self.logger.setLevel(logging.DEBUG) + + self.mounts = {} + self.base_mount_path = "/mnt" + self.ntfs_implementation = ntfs_implementation + + self.update_mounts() + + def _rmmod(self, module): + self.logger.debug("Trying to unload module {module}...") + subprocess.run(["/usr/sbin/rmmod", module], check=True) + + def _modprobe(self, module): + self.logger.debug("Trying to load module {module}...") + subprocess.run(["/usr/sbin/modprobe", module], check=True) + + + # _parse_mounts + def update_mounts(self): + """ + Update the current mount points by parsing the /proc/mounts file. + + This method reads the /proc/mounts file to gather information about + the currently mounted filesystems. It stores this information in a + dictionary where the keys are the mount points and the values are + dictionaries containing details about each filesystem. + + The details stored for each filesystem include: + - device: The device file associated with the filesystem. + - mountpoint: The directory where the filesystem is mounted. + - type: The type of the filesystem (e.g., ext4, vfat). + - options: Mount options associated with the filesystem. + - dump_freq: The dump frequency for the filesystem. + - passno: The pass number for filesystem checks. + + The method also adds an entry for each mount point with a trailing + slash to ensure consistency in accessing the mount points. + + Attributes: + mounts (dict): A dictionary where keys are mount points and values + are dictionaries containing filesystem details. + """ + filesystems = {} + + self.logger.debug("Parsing /proc/mounts") + + with open("/proc/mounts", 'r', encoding='utf-8') as mounts: + for line in mounts: + parts = line.split() + data = {} + data['device'] = parts[0] + data['mountpoint'] = parts[1] + data['type'] = parts[2] + data['options'] = parts[3] + data['dump_freq'] = parts[4] + data['passno'] = parts[5] + + filesystems[data["mountpoint"]] = data + filesystems[data["mountpoint"] + "/"] = data + + self.mounts = filesystems + + def find_mountpoint(self, device): + """ + Find the mount point for a given device. + + This method checks if the specified device is currently mounted and returns + the corresponding mount point if it is found. + + Args: + device (str): The path to the device to check. + + Returns: + str or None: The mount point of the device if it is mounted, otherwise None. + """ + norm = os.path.normpath(device) + + self.logger.debug(f"Checking if {device} is mounted") + for mountpoint, mount in self.mounts.items(): + #self.logger.debug(f"Item: {mount}") + #self.logger.debug(f"Checking: " + mount['device']) + if mount['device'] == norm: + return mountpoint + + return None + + def find_device(self, mountpoint): + """ + Find the device corresponding to a given mount point. + + Args: + mountpoint (str): The mount point to search for. + + Returns: + str or None: The device corresponding to the mount point if found, + otherwise None. + """ + self.logger.debug("Finding device corresponding to mount point %s", mountpoint) + if mountpoint in self.mounts: + return self.mounts[mountpoint]['device'] + else: + self.logger.warning("Failed to find mountpoint %s", mountpoint) + return None + + def is_mounted(self, device = None, mountpoint = None): + def is_mounted(self, device=None, mountpoint=None): + """ + Check if a device or mountpoint is currently mounted. + + Either checking by device or mountpoint is valid. + + Args: + device (str, optional): The device to check if it is mounted. + Defaults to None. + mountpoint (str, optional): The mountpoint to check if it is mounted. + Defaults to None. + + Returns: + bool: True if the device is mounted or the mountpoint is in the list + of mounts, False otherwise. + """ + self.update_mounts() + if device: + return not self.find_mountpoint(device) is None + else: + return mountpoint in self.mounts + + def unmount(self, device = None, mountpoint = None): + def unmount(self, device=None, mountpoint=None): + """ + Unmounts a filesystem. + + This method unmounts a filesystem either by the device name or the mountpoint. + If a device is provided, it finds the corresponding mountpoint and unmounts it. + If a mountpoint is provided directly, it unmounts the filesystem at that mountpoint. + + Args: + device (str, optional): The device name to unmount. Defaults to None. + mountpoint (str, optional): The mountpoint to unmount. Defaults to None. + + Raises: + subprocess.CalledProcessError: If the unmount command fails. + + Logs: + Debug information about the unmounting process. + """ + if device: + self.logger.debug("Finding mountpoint of %s", device) + mountpoint = self.find_mountpoint(device) + + if not mountpoint is None: + self.logger.debug(f"Unmounting {mountpoint}") + subprocess.run(["/usr/bin/umount", mountpoint], check=True) + + # We've unmounted a new filesystem, update our filesystems list + self.update_mounts() + else: + self.logger.debug(f"{device} is not mounted") + + + def mount(self, device, mountpoint, filesystem = None): + """ + Mounts a device to a specified mountpoint. + + Parameters: + device (str): The device to be mounted (e.g., '/dev/sda1'). + mountpoint (str): The directory where the device will be mounted. + filesystem (str, optional): The type of filesystem to be used (e.g., 'ext4', 'ntfs'). Defaults to None. + + Raises: + subprocess.CalledProcessError: If the mount command fails. + + Logs: + Debug information about the mounting process, including the mount command, return code, stdout, and stderr. + + Side Effects: + Creates the mountpoint directory if it does not exist. + Updates the internal list of mounted filesystems. + """ + self.logger.debug(f"Mounting {device} at {mountpoint}") + + if not os.path.exists(mountpoint): + self.logger.debug(f"Creating directory {mountpoint}") + os.mkdir(mountpoint) + + mount_cmd = ["/usr/bin/mount"] + + if not filesystem is None: + mount_cmd = mount_cmd + ["-t", filesystem] + + mount_cmd = mount_cmd + [device, mountpoint] + + self.logger.debug(f"Mount command: {mount_cmd}") + result = subprocess.run(mount_cmd, check=True, capture_output = True) + + self.logger.debug(f"retorno: {result.returncode}") + self.logger.debug(f"stdout: {result.stdout}") + self.logger.debug(f"stderr: {result.stderr}") + + # We've mounted a new filesystem, update our filesystems list + self.update_mounts() + + def ensure_mounted(self, device): + """ + Ensure that the given device is mounted. + + This method attempts to mount the specified device to a path derived from + the base mount path and the device's basename. If the device is of type NTFS, + it uses the NTFSLibrary to handle the mounting process. For other filesystem + types, it uses a generic mount method. + + Args: + device (str): The path to the device that needs to be mounted. + + Returns: + str: The path where the device is mounted. + + Logs: + - Info: When starting the mounting process. + - Debug: Various debug information including the mount path, filesystem type, + and success message. + + Raises: + OSError: If there is an error creating the mount directory or mounting the device. + """ + + self.logger.info("Mounting %s", device) + + self.unmount(device = device) + path = os.path.join(self.base_mount_path, os.path.basename(device)) + + self.logger.debug(f"Will mount repo at {path}") + if not os.path.exists(path): + os.mkdir(path) + + if self.filesystem_type(device) == "ntfs": + self.logger.debug("Handing a NTFS filesystem") + + self._modprobe("ntfs3") + self.ntfsfix(device) + + ntfs = NTFSLibrary(self.ntfs_implementation) + ntfs.mount_filesystem(device, path) + self.update_mounts() + + else: + self.logger.debug("Handling a non-NTFS filesystem") + self.mount(device, path) + + self.logger.debug("Successfully mounted at %s", path) + return path + + + def filesystem_type(self, device = None, mountpoint = None): + """ + Determine the filesystem type of a given device or mountpoint. + + Args: + device (str, optional): The device to probe. If not provided, the device + will be determined based on the mountpoint. + mountpoint (str, optional): The mountpoint to find the device for. This + is used only if the device is not provided. + + Returns: + str: The filesystem type of the device. + + Raises: + KeyError: If the filesystem type cannot be determined from the probe. + + Logs: + Debug: Logs the process of finding the device, probing the device, and + the determined filesystem type. + """ + + if device is None: + self.logger.debug("Finding device for mountpoint %s", mountpoint) + device = self.find_device(mountpoint) + + self.logger.debug(f"Probing {device}") + + pr = blkid.Probe() + pr.set_device(device) + pr.enable_superblocks(True) + pr.set_superblocks_flags(blkid.SUBLKS_TYPE | blkid.SUBLKS_USAGE | blkid.SUBLKS_UUID | blkid.SUBLKS_UUIDRAW | blkid.SUBLKS_LABELRAW) + pr.do_safeprobe() + + fstype = pr["TYPE"].decode('utf-8') + self.logger.debug(f"FS type is {fstype}") + + return fstype + + def is_filesystem(self, path): + """ + Check if the given path is a filesystem root. + + Args: + path (str): The path to check. + + Returns: + bool: True if the path is a filesystem root, False otherwise. + """ + + # This is just an alias for better code readability + return self.is_mounted(mountpoint = path) + + def create_filesystem(self, fs_type = None, fs_uuid = None, device = None): + """ + Create a filesystem on the specified device. + + Parameters: + fs_type (str): The type of filesystem to create (e.g., 'ntfs', 'ext4', 'xfs', 'btrfs'). + fs_uuid (str): The UUID to assign to the filesystem. + device (str): The device on which to create the filesystem (e.g., '/dev/sda1'). + + Raises: + RuntimeError: If the filesystem type is not recognized or if the filesystem creation command fails. + + """ + + self.logger.info(f"Creating filesystem {fs_type} with UUID {fs_uuid} in {device}") + + if fs_type == "ntfs" or fs_type == "ntfs3": + self.logger.debug("Creating NTFS filesystem") + ntfs = NTFSLibrary(self.ntfs_implementation) + ntfs.create_filesystem(device, "NTFS") + ntfs.modify_uuid(device, fs_uuid) + + else: + command = [f"/usr/sbin/mkfs.{fs_type}"] + command_args = [] + + if fs_type == "ext4" or fs_type == "ext3": + command_args = ["-U", fs_uuid, "-F", device] + elif fs_type == "xfs": + command_args = ["-m", f"uuid={fs_uuid}", "-f", device] + elif fs_type == "btrfs": + command_args = ["-U", fs_uuid, "-f", device] + else: + raise RuntimeError(f"Don't know how to create filesystem of type {fs_type}") + + command = command + command_args + + self.logger.debug(f"Creating Linux filesystem of type {fs_type} on {device}, command {command}") + result = subprocess.run(command, check = True, capture_output=True) + + self.logger.debug(f"retorno: {result.returncode}") + self.logger.debug(f"stdout: {result.stdout}") + self.logger.debug(f"stderr: {result.stderr}") + + + + def mklostandfound(self, path): + """ + Recreate the lost+found if necessary. + + When cloning at the root of a filesystem, cleaning the contents + removes the lost+found directory. This is a special directory that requires the use of + a tool to recreate it. + + It may fail if the filesystem does not need it. We consider this harmless and ignore it. + + The command is entirely skipped on NTFS, as mklost+found may malfunction if run on it, + and has no useful purpose. + """ + if self.is_filesystem(path): + if self.filesystem_type(mountpoint=path) == "ntfs": + self.logger.debug("Not running mklost+found on NTFS") + return + + + curdir = os.getcwd() + result = None + + try: + self.logger.debug(f"Re-creating lost+found in {path}") + os.chdir(path) + result = subprocess.run(["/usr/sbin/mklost+found"], check=True, capture_output=True) + except subprocess.SubprocessError as e: + self.logger.warning(f"Error running mklost+found: {e}") + + if result: + self.logger.debug(f"retorno: {result.returncode}") + self.logger.debug(f"stdout: {result.stdout}") + self.logger.debug(f"stderr: {result.stderr}") + + os.chdir(curdir) + + def ntfsfix(self, device): + """ + Run the ntfsfix command on the specified device. + + This method uses the ntfsfix utility to fix common NTFS problems on the given device. + + This allows mounting an unclean NTFS filesystem. + + Args: + device (str): The path to the device to be fixed. + + Raises: + subprocess.CalledProcessError: If the ntfsfix command fails. + """ + self.logger.debug(f"Running ntfsfix on {device}") + subprocess.run(["/usr/bin/ntfsfix", "-d", device], check=True) + + + def unload_ntfs(self): + """ + Unloads the NTFS filesystem module. + + This is a function added as a result of NTFS kernel module troubleshooting, + to try to ensure that NTFS code is only active as long as necessary. + + The module is internally loaded as needed, so there's no load_ntfs function. + + It may be removed in the future. + + Raises: + RuntimeError: If the module cannot be removed. + """ + self._rmmod("ntfs3") + + def find_boot_device(self): + """ + Searches for the EFI boot partition on the system. + + This method scans the system's partitions to locate the EFI boot partition, + which is identified by the GUID "C12A7328-F81F-11D2-BA4B-00A0C93EC93B". + + Returns: + str: The device node of the EFI partition if found, otherwise None. + + Logs: + - Debug messages indicating the progress of the search. + - A warning message if the EFI partition is not found. + """ + disks = [] + + self.logger.debug("Looking for EFI partition") + with open("/proc/partitions", "r", encoding='utf-8') as partitions_file: + line_num=0 + for line in partitions_file: + if line_num >=2: + data = line.split() + disk = data[3] + disks.append(disk) + self.logger.debug(f"Disk: {disk}") + + line_num = line_num + 1 + + for disk in disks: + self.logger.debug("Loading partitions for disk %s", disk) + #disk_json_data = subprocess.run(["/usr/sbin/sfdisk", "-J", f"/dev/{disk}"], check=False, capture_output=True) + sfdisk_out = subprocess.run(["/usr/sbin/sfdisk", "-J", f"/dev/{disk}"], check=False, capture_output=True) + + if sfdisk_out.returncode == 0: + disk_json_data = sfdisk_out.stdout + disk_data = json.loads(disk_json_data) + + for part in disk_data["partitiontable"]["partitions"]: + self.logger.debug("Checking partition %s", part) + if part["type"] == "C12A7328-F81F-11D2-BA4B-00A0C93EC93B": + self.logger.debug("EFI partition found at %s", part["node"]) + return part["node"] + else: + self.logger.debug("sfdisk returned with code %i, error %s", sfdisk_out.returncode, sfdisk_out.stderr) + + + self.logger.warning("Failed to find EFI partition!") + + def temp_unmount(self, mountpoint): + """ + Temporarily unmounts the filesystem at the given mountpoint. + + This method finds the device associated with the specified mountpoint, + and returns the information to remount it with temp_remount. + + The purpose of this function is to temporarily unmount a filesystem for + actions like fsck, and to mount it back afterwards. + + Args: + mountpoint (str): The mountpoint of the filesystem to unmount. + + Returns: + dict: A dictionary containing the information needed to remount the filesystem. + """ + device = self.find_device(mountpoint) + fs = self.filesystem_type(mountpoint = mountpoint) + + data = {"mountpoint" : mountpoint, "device" :device, "filesystem" : fs} + + self.logger.debug("Temporarily unmounting device %s, mounted on %s, fs type %s", mountpoint, device, fs) + + self.unmount(mountpoint = mountpoint) + return data + + def temp_remount(self, unmount_data): + """ + Remounts a filesystem unmounted with temp_unmount + + This method remounts a filesystem using the data provided by temp_unmount + + Args: + unmount_data (dict): A dictionary containing the data needed to remount the filesystem. + + Returns: + None + """ + + self.logger.debug("Remounting temporarily unmounted device %s on %s, fs type %s", unmount_data["device"], unmount_data["mountpoint"], unmount_data["filesystem"]) + self.mount(device = unmount_data["device"], mountpoint=unmount_data["mountpoint"], filesystem=unmount_data["filesystem"]) diff --git a/gitlib/gitlib.py b/gitlib/gitlib.py index 0d19fe6..e20065d 100755 --- a/gitlib/gitlib.py +++ b/gitlib/gitlib.py @@ -28,13 +28,14 @@ import base64 import stat import time -from enum import Enum import git import libarchive import xattr import posix1e import blkid +from filesystem import * +from ntfs import * class OgProgressPrinter(git.RemoteProgress): """ @@ -108,129 +109,6 @@ class RequirementException(Exception): super().__init__(message) self.message = message - - -class NTFSImplementation(Enum): - KERNEL = 1 - NTFS3G = 2 - - -class NTFSLibrary: - """ - A library for managing NTFS filesystems. - - Attributes: - logger (logging.Logger): Logger for the class. - implementation (NTFSImplementation): The implementation to use for mounting NTFS filesystems. - """ - - def __init__(self, implementation): - """ - Initializes the instance with the given implementation. - - Args: - implementation: The implementation to be used by the instance. - - Attributes: - logger (logging.Logger): Logger instance for the class, set to debug level. - implementation: The implementation provided during initialization. - """ - self.logger = logging.getLogger("NTFSLibrary") - self.logger.setLevel(logging.DEBUG) - self.implementation = implementation - - self.logger.debug("Initializing") - - def create_filesystem(self, device, label): - """ - Creates an NTFS filesystem on the specified device with the given label. - - Args: - device (str): The device path where the NTFS filesystem will be created. - label (str): The label to assign to the NTFS filesystem. - - Returns: - None - - Logs: - Logs the creation process with the device and label information. - """ - self.logger.info(f"Creating NTFS in {device} with label {label}") - - subprocess.run(["/usr/sbin/mkntfs", device, "-Q", "-L", label], check=True) - - - def mount_filesystem(self, device, mountpoint): - """ - Mounts a filesystem on the specified mountpoint using the specified NTFS implementation. - - Args: - device (str): The device path to be mounted (e.g., '/dev/sda1'). - mountpoint (str): The directory where the device will be mounted. - - Raises: - ValueError: If the NTFS implementation is unknown. - - """ - self.logger.info(f"Mounting {device} in {mountpoint} using implementation {self.implementation}") - if self.implementation == NTFSImplementation.KERNEL: - subprocess.run(["/usr/bin/mount", "-t", "ntfs3", device, mountpoint], check = True) - elif self.implementation == NTFSImplementation.NTFS3G: - subprocess.run(["/usr/bin/ntfs-3g", device, mountpoint], check = True) - else: - raise ValueError("Unknown NTFS implementation: {self.implementation}") - - def _hex_to_bin(self, hex_str): - - while len(hex_str) != 16: - hex_str = "0" + hex_str - - hex_int = int(hex_str, 16) - binary = bin(hex_int)[2:].zfill(64) - - return binary - - def _bin_to_hex(self, binary_data): - binary_int = int(binary_data, 256) - hex_str = hex(binary_int)[2:] - hex_str = hex_str.zfill((len(binary_data) + 3) // 4) - return hex_str - - def modify_uuid(self, device, uuid): - """ - Modify the UUID of an NTFS device. - - This function changes the UUID of the specified NTFS device to the given UUID. - It reads the current UUID from the device, logs the change, and writes the new UUID. - - Args: - device (str): The path to the NTFS device file. - uuid (str): The new UUID to be set, in hexadecimal string format. - - Raises: - IOError: If there is an error opening or writing to the device file. - """ - - ntfs_uuid_offset = 0x48 - ntfs_uuid_length = 8 - - binary_uuid = bytearray.fromhex(uuid) - binary_uuid.reverse() - - self.logger.info(f"Changing UUID on {device} to {uuid}") - with open(device, 'r+b') as ntfs_dev: - self.logger.debug("Reading %i bytes from offset %i", ntfs_uuid_length, ntfs_uuid_offset) - - ntfs_dev.seek(ntfs_uuid_offset) - prev_uuid = bytearray(ntfs_dev.read(ntfs_uuid_length)) - prev_uuid.reverse() - prev_uuid_hex = bytearray.hex(prev_uuid) - self.logger.debug(f"Previous UUID: {prev_uuid_hex}") - - self.logger.debug("Writing...") - ntfs_dev.seek(ntfs_uuid_offset) - ntfs_dev.write(binary_uuid) - class OpengnsysGitLibrary: """OpenGnsys Git Library""" @@ -263,8 +141,11 @@ class OpengnsysGitLibrary: self.logger.setLevel(logging.DEBUG) self.logger.debug(f"Initializing. Cache = {require_cache}, ntfs = {ntfs_implementation}") + self.fs = FilesystemLibrary(ntfs_implementation = ntfs_implementation) + #self.ntfs = NTFSLibrary() + + #self.repo_server = "192.168.2.1" - self.mounts = self._parse_mounts() self.repo_user = "oggit" self.repo_image_path = "oggit" self.ntfs_implementation = ntfs_implementation @@ -353,105 +234,6 @@ class OpengnsysGitLibrary: """ return os.path.exists("/sys/firmware/efi") - def _parse_mounts(self): - filesystems = {} - - self.logger.debug("Parsing /proc/mounts") - - with open("/proc/mounts", 'r', encoding='utf-8') as mounts: - for line in mounts: - parts = line.split() - data = {} - data['device'] = parts[0] - data['mountpoint'] = parts[1] - data['type'] = parts[2] - data['options'] = parts[3] - data['dump_freq'] = parts[4] - data['passno'] = parts[5] - - filesystems[data["mountpoint"]] = data - filesystems[data["mountpoint"] + "/"] = data - - return filesystems - - def _find_mountpoint(self, device): - norm = os.path.normpath(device) - - self.logger.debug(f"Checking if {device} is mounted") - for mountpoint, mount in self.mounts.items(): - #self.logger.debug(f"Item: {mount}") - #self.logger.debug(f"Checking: " + mount['device']) - if mount['device'] == norm: - return mountpoint - - return None - - def _is_device_mounted(self, device): - return not self._find_mountpoint(device) is None - - def _unmount_device(self, device): - mountpoint = self._find_mountpoint(device) - - if not mountpoint is None: - self.logger.debug(f"Unmounting {mountpoint}") - subprocess.run(["/usr/bin/umount", mountpoint], check=True) - - # We've unmounted a new filesystem, update our filesystems list - self.mounts = self._parse_mounts() - else: - self.logger.debug(f"{device} is not mounted") - - - def _rmmod(self, module): - self.logger.debug("Trying to unload module {module}...") - subprocess.run(["/usr/sbin/rmmod", module], check=True) - - def _modprobe(self, module): - self.logger.debug("Trying to load module {module}...") - subprocess.run(["/usr/sbin/modprobe", module], check=True) - - def _mount(self, device, mountpoint, filesystem = None): - self.logger.debug(f"Mounting {device} at {mountpoint}") - - if not os.path.exists(mountpoint): - self.logger.debug(f"Creating directory {mountpoint}") - os.mkdir(mountpoint) - - mount_cmd = ["/usr/bin/mount"] - - if not filesystem is None: - mount_cmd = mount_cmd + ["-t", filesystem] - - mount_cmd = mount_cmd + [device, mountpoint] - - self.logger.debug(f"Mount command: {mount_cmd}") - result = subprocess.run(mount_cmd, check=True, capture_output = True) - - self.logger.debug(f"retorno: {result.returncode}") - self.logger.debug(f"stdout: {result.stdout}") - self.logger.debug(f"stderr: {result.stderr}") - - # We've mounted a new filesystem, update our filesystems list - self.mounts = self._parse_mounts() - - - def _ntfsfix(self, device): - self.logger.debug(f"Running ntfsfix on {device}") - subprocess.run(["/usr/bin/ntfsfix", "-d", device], check=True) - - def _filesystem_type(self, device): - self.logger.debug(f"Probing {device}") - - pr = blkid.Probe() - pr.set_device(device) - pr.enable_superblocks(True) - pr.set_superblocks_flags(blkid.SUBLKS_TYPE | blkid.SUBLKS_USAGE | blkid.SUBLKS_UUID | blkid.SUBLKS_UUIDRAW | blkid.SUBLKS_LABELRAW) - pr.do_safeprobe() - - fstype = pr["TYPE"].decode('utf-8') - self.logger.debug(f"FS type is {fstype}") - - return fstype def _write_ignore_list(self, base_path): ignore_file = base_path + "/.gitignore" @@ -484,59 +266,7 @@ class OpengnsysGitLibrary: self.logger.debug("%i parameters found", len(params)) return params - def _is_filesystem(self, path): - """ - Check if the given path is a filesystem root. - This method reads the '/proc/mounts' file to determine if the specified - path is a mount point (i.e., a filesystem root). - - Args: - path (str): The path to check. - - Returns: - bool: True if the path is a filesystem root, False otherwise. - """ - with open('/proc/mounts', 'r', encoding='utf-8') as mounts: - for mount in mounts: - parts = mount.split() - self.logger.debug(f"Parts: {parts}") - mountpoint = parts[1] - - if os.path.normpath(mountpoint) == os.path.normpath(path): - self.logger.debug(f"Directory {path} is a filesystem root") - return True - - self.logger.debug(f"Directory {path} is not a filesystem root") - return False - - - def _mklostandfound(self, path): - """Recreate the lost+found if necessary. - - When cloning at the root of a filesystem, cleaning the contents - removes the lost+found directory. This is a special directory that requires the use of - a tool to recreate it. - - It may fail if the filesystem does not need it. We consider this harmless and ignore it. - """ - if self._is_filesystem(path): - curdir = os.getcwd() - result = None - - try: - self.logger.debug(f"Re-creating lost+found in {path}") - os.chdir(path) - result = subprocess.run(["/usr/sbin/mklost+found"], check=True, capture_output=True) - except subprocess.SubprocessError as e: - self.logger.warning(f"Error running mklost+found: {e}") - - if result: - self.logger.debug(f"retorno: {result.returncode}") - self.logger.debug(f"stdout: {result.stdout}") - self.logger.debug(f"stderr: {result.stderr}") - - os.chdir(curdir) def _ntfs_secaudit(self, data): @@ -562,53 +292,65 @@ class OpengnsysGitLibrary: os.rename(metadata_file + ".new", metadata_file) + + def _ntfs_restore_secaudit(self, path): + self.logger.debug("Restoring NTFS metadata for %s", path) + + if not self.fs.is_filesystem(path): + self.logger.error("Path %s is not a filesystem!") + return + + if self.fs.filesystem_type(mountpoint = path) != "ntfs": + self.logger.error("Path %s is not NTFS!", path) + return + + metadata_file = os.path.join(path,".opengnsys-metadata", "ntfs_secaudit.txt") + secaudit_data = "" + + self.logger.debug("Reading audit metadata from %s...", metadata_file) + with open(metadata_file, "rb") as meta: + secaudit_data = meta.read() + self.logger.debug("Read %i bytes", len(secaudit_data)) + + device = self.fs.find_device(path) + mountdata = self.fs.temp_unmount(path) + + self.logger.info("Restoring secaudit data...") + result = subprocess.run(["/usr/bin/ntfssecaudit", "-se", device], check=False, capture_output=True, input=secaudit_data) + + if result.returncode == 0: + self.logger.debug("Completed, return code %i", result.returncode) + self.logger.debug("STDOUT: %s", result.stdout) + self.logger.debug("STDERR: %s", result.stderr) + else: + # An error return code can be returned for reasons like missing files, so we deal this + # as non-fatal. + self.logger.error("Completed, return code %i", result.returncode) + self.logger.error("STDOUT: %s", result.stdout) + self.logger.error("STDERR: %s", result.stderr) + + + self.fs.temp_remount(mountdata) + + def _create_filesystems(self, fs_data, fs_map): for mountpoint in fs_map: dest_device = fs_map[mountpoint] data = fs_data[mountpoint] - fs_type = data["type"] - fs_uuid = data["uuid"] + self.fs.create_filesystem(device = dest_device, fs_type = data["type"], fs_uuid = data["uuid"]) - self.logger.info(f"Creating filesystem {fs_type} with UUID {fs_uuid} in {dest_device}") - - if fs_type == "ntfs" or fs_type == "ntfs3": - self.logger.debug("Creating NTFS filesystem") - ntfs = NTFSLibrary(self.ntfs_implementation) - ntfs.create_filesystem(dest_device, "NTFS") - #ntfs.modify_uuid(dest_device, fs_uuid) - else: - command = [f"/usr/sbin/mkfs.{fs_type}"] - command_args = [] - - if fs_type == "ext4" or fs_type == "ext3": - command_args = ["-U", fs_uuid, "-F", dest_device] - elif fs_type == "xfs": - command_args = ["-m", f"uuid={fs_uuid}", "-f", dest_device] - elif fs_type == "btrfs": - command_args = ["-U", fs_uuid, "-f", dest_device] - else: - raise RuntimeError(f"Don't know how to create filesystem of type {fs_type}") - - command = command + command_args - - self.logger.debug(f"Creating Linux filesystem of type {fs_type} on {dest_device}, command {command}") - result = subprocess.run(command, check = True, capture_output=True) - - self.logger.debug(f"retorno: {result.returncode}") - self.logger.debug(f"stdout: {result.stdout}") - self.logger.debug(f"stderr: {result.stderr}") def _grub_install(self, root_directory, boot_device): """ - Install GRUB bootloader on the specified boot device. + Install GRUB boot loader on the specified boot device. This method checks for the presence of GRUB 2.x and GRUB 1.x installers and attempts to install the appropriate version. If neither installer is found, a RequirementException is raised. Args: - boot_device (str): The device on which to install the GRUB bootloader (e.g., '/dev/sda'). + boot_device (str): The device on which to install the GRUB boot loader (e.g., '/dev/sda'). root_directory (str): The root directory where GRUB files should be installed. Raises: @@ -653,8 +395,8 @@ class OpengnsysGitLibrary: shutil.Error: If an error occurs during the copying of the EFI data. """ - boot_device = self._find_boot_device() - boot_mount = self._find_mountpoint(boot_device) + boot_device = self.fs.find_boot_device() + boot_mount = self.fs.find_mountpoint(boot_device) self.logger.info(f"Installing EFI files in {boot_mount}") meta_dir = os.path.join(root_directory, ".opengnsys-metadata") @@ -685,8 +427,8 @@ class OpengnsysGitLibrary: def _efi_copy(self, root_directory, system_specific = False, config_name = None): meta_dir = os.path.join(root_directory, ".opengnsys-metadata") - boot_device = self._find_boot_device() - boot_mount = self._find_mountpoint(boot_device) + boot_device = self.fs.find_boot_device() + boot_mount = self.fs.find_mountpoint(boot_device) efi_files_dir = "" @@ -718,53 +460,6 @@ class OpengnsysGitLibrary: - def _find_boot_device(self): - """ - Searches for the EFI boot partition on the system. - - This method scans the system's partitions to locate the EFI boot partition, - which is identified by the GUID "C12A7328-F81F-11D2-BA4B-00A0C93EC93B". - - Returns: - str: The device node of the EFI partition if found, otherwise None. - - Logs: - - Debug messages indicating the progress of the search. - - A warning message if the EFI partition is not found. - """ - disks = [] - - self.logger.debug("Looking for EFI partition") - with open("/proc/partitions", "r", encoding='utf-8') as partitions_file: - line_num=0 - for line in partitions_file: - if line_num >=2: - data = line.split() - disk = data[3] - disks.append(disk) - self.logger.debug(f"Disk: {disk}") - - line_num = line_num + 1 - - for disk in disks: - self.logger.debug("Loading partitions for disk %s", disk) - #disk_json_data = subprocess.run(["/usr/sbin/sfdisk", "-J", f"/dev/{disk}"], check=False, capture_output=True) - sfdisk_out = subprocess.run(["/usr/sbin/sfdisk", "-J", f"/dev/{disk}"], check=False, capture_output=True) - - if sfdisk_out.returncode == 0: - disk_json_data = sfdisk_out.stdout - disk_data = json.loads(disk_json_data) - - for part in disk_data["partitiontable"]["partitions"]: - self.logger.debug("Checking partition %s", part) - if part["type"] == "C12A7328-F81F-11D2-BA4B-00A0C93EC93B": - self.logger.debug("EFI partition found at %s", part["node"]) - return part["node"] - else: - self.logger.debug("sfdisk returned with code %i, error %s", sfdisk_out.returncode, sfdisk_out.stderr) - - - self.logger.warning("Failed to find EFI partition!") def _delete_contents(self, path): @@ -982,8 +677,8 @@ class OpengnsysGitLibrary: mount_found = False - if root in self.mounts: - mount = self.mounts[root] + if root in self.fs.mounts: + mount = self.fs.mounts[root] root_path_rel = root[len(path):None] if len(root_path_rel) == 0 or root_path_rel[0] != "/": root_path_rel = "/" + root_path_rel @@ -1038,7 +733,7 @@ class OpengnsysGitLibrary: errstr = f"""We've failed to find metadata for the root filesystem! Root: {root} FS data: """ - errstr = errstr + json.dumps(self.mounts, indent=4) + errstr = errstr + json.dumps(self.fs.mounts, indent=4) self.logger.error(errstr) raise RuntimeError(errstr) @@ -1235,7 +930,6 @@ class OpengnsysGitLibrary: """ self.logger.debug("Initializing") - self.mounts = self._parse_mounts() self.logger.info(f"Restoring metadata in {path}") meta_dir = os.path.join(path, ".opengnsys-metadata") @@ -1428,34 +1122,6 @@ class OpengnsysGitLibrary: repo.config_writer().add_value("user", "email", "OpenGnsys@opengnsys.com").release() repo.config_writer().add_value("core", "filemode", "false").release() - def _ensure_mounted(self, device): - - self.logger.info("Mounting %s", device) - - self._unmount_device(device) - path = os.path.join("/mnt", os.path.basename(device)) - - self.logger.debug(f"Will mount repo at {path}") - if not os.path.exists(path): - os.mkdir(path) - - if self._filesystem_type(device) == "ntfs": - self.logger.debug("Handing a NTFS filesystem") - - self._modprobe("ntfs3") - self._ntfsfix(device) - - ntfs = NTFSLibrary(self.ntfs_implementation) - ntfs.mount_filesystem(device, path) - self.mounts = self._parse_mounts() - - else: - self.logger.debug("Handling a non-NTFS filesystem") - self._mount(device, path) - - self.logger.debug("Successfully mounted at %s", path) - return path - def initRepo(self, device, repo_name): """ Initialize a Git repository on a specified device. @@ -1477,7 +1143,7 @@ class OpengnsysGitLibrary: - The method sets up a remote origin and pushes the initial commit. """ - path = self._ensure_mounted(device) + path = self.fs.ensure_mounted(device) self.logger.info("Initializing repository: %s", path) git_dir = os.path.join(path, ".git") @@ -1578,7 +1244,9 @@ class OpengnsysGitLibrary: #self.logger.debug("Commit done, will unmount now") #self._umount_device(device) - self._rmmod("ntfs3") + + + self.fs.unload_ntfs() repo_url = self._getOgRepository(repo_name) @@ -1646,7 +1314,7 @@ class OpengnsysGitLibrary: if repo_is_efi != efi: raise RequirementException("Repositorio usa sistema de arranque incompatible con sistema actual") - self._unmount_device(destination) + self.fs.unmount(device = destination) filesystem_map = {"/" : destination} @@ -1654,7 +1322,7 @@ class OpengnsysGitLibrary: destination_dir = "/mnt/repo-" + repo_name - self._mount(destination, destination_dir) + self.fs.mount(destination, destination_dir) self._delete_contents(destination_dir) @@ -1667,9 +1335,14 @@ class OpengnsysGitLibrary: else: self._grub_install(root_directory=destination_dir, boot_device=boot_device) - self._mklostandfound(destination_dir) + self.fs.mklostandfound(destination_dir) self._restore_metadata(destination_dir) + if self.fs.filesystem_type(mountpoint = destination_dir) == "ntfs": + self._ntfs_restore_secaudit(destination_dir) + + self.logger.info("Clone completed.") + def commit(self, path = None, device = None, message = None): """ @@ -1677,7 +1350,7 @@ class OpengnsysGitLibrary: """ if path is None: - path = self._ensure_mounted(device) + path = self.fs.ensure_mounted(device) self.logger.info("Committing changes to repository") repo = git.Repo(path) @@ -1715,7 +1388,7 @@ class OpengnsysGitLibrary: if path is None: - path = self._ensure_mounted(device) + path = self.fs.ensure_mounted(device) repo = git.Repo(path) @@ -1730,7 +1403,7 @@ class OpengnsysGitLibrary: if path is None: - path = self._ensure_mounted(device) + path = self.fs.ensure_mounted(device) repo = git.Repo(path) @@ -1817,6 +1490,7 @@ if __name__ == '__main__': parser.add_argument("--test-efi-install", type=str, metavar="DIR", help = "Install EFI") parser.add_argument("-m", "--message", type=str, metavar="MSG", help="Commit message") parser.add_argument("--test-set-ntfsid", type=str, metavar="ID", help="Set NTFS ID") + parser.add_argument("--test-restore-secaudit",type=str, metavar="DIR", help="Test restoring NTFS secaudit") parser.add_argument("--device", type=str, metavar="DEV", help="Device to set the UUID on") parser.add_argument("-v", "--verbose", action="store_true", help = "Verbose console output") @@ -1885,8 +1559,12 @@ if __name__ == '__main__': ntfs.modify_uuid(args.device, args.test_set_ntfsid) elif args.test_efi_install: og_git._efi_install(root_directory=args.test_efi_install, config_name = args.efi_config) # pylint: disable=protected-access + elif args.test_restore_secaudit: + og_git._ntfs_restore_secaudit(args.test_restore_secaudit) # pylint: disable=protected-access else: - print("Debe especificar una acción") + print("Please specify an action.") + parser.print_help() + sys.exit(1) # # Make sure all filesystem changes are written, just in case the oglive is rebooted without an unmount diff --git a/gitlib/ntfs.py b/gitlib/ntfs.py new file mode 100644 index 0000000..d3c3a57 --- /dev/null +++ b/gitlib/ntfs.py @@ -0,0 +1,111 @@ + +import logging +import subprocess + +from enum import Enum + + +class NTFSImplementation(Enum): + KERNEL = 1 + NTFS3G = 2 + + +class NTFSLibrary: + """ + A library for managing NTFS filesystems. + + Attributes: + logger (logging.Logger): Logger for the class. + implementation (NTFSImplementation): The implementation to use for mounting NTFS filesystems. + """ + + def __init__(self, implementation): + """ + Initializes the instance with the given implementation. + + Args: + implementation: The implementation to be used by the instance. + + Attributes: + logger (logging.Logger): Logger instance for the class, set to debug level. + implementation: The implementation provided during initialization. + """ + self.logger = logging.getLogger("NTFSLibrary") + self.logger.setLevel(logging.DEBUG) + self.implementation = implementation + + self.logger.debug("Initializing") + + def create_filesystem(self, device, label): + """ + Creates an NTFS filesystem on the specified device with the given label. + + Args: + device (str): The device path where the NTFS filesystem will be created. + label (str): The label to assign to the NTFS filesystem. + + Returns: + None + + Logs: + Logs the creation process with the device and label information. + """ + self.logger.info(f"Creating NTFS in {device} with label {label}") + + subprocess.run(["/usr/sbin/mkntfs", device, "-Q", "-L", label], check=True) + + + def mount_filesystem(self, device, mountpoint): + """ + Mounts a filesystem on the specified mountpoint using the specified NTFS implementation. + + Args: + device (str): The device path to be mounted (e.g., '/dev/sda1'). + mountpoint (str): The directory where the device will be mounted. + + Raises: + ValueError: If the NTFS implementation is unknown. + + """ + self.logger.info(f"Mounting {device} in {mountpoint} using implementation {self.implementation}") + if self.implementation == NTFSImplementation.KERNEL: + subprocess.run(["/usr/bin/mount", "-t", "ntfs3", device, mountpoint], check = True) + elif self.implementation == NTFSImplementation.NTFS3G: + subprocess.run(["/usr/bin/ntfs-3g", device, mountpoint], check = True) + else: + raise ValueError("Unknown NTFS implementation: {self.implementation}") + + def modify_uuid(self, device, uuid): + """ + Modify the UUID of an NTFS device. + + This function changes the UUID of the specified NTFS device to the given UUID. + It reads the current UUID from the device, logs the change, and writes the new UUID. + + Args: + device (str): The path to the NTFS device file. + uuid (str): The new UUID to be set, in hexadecimal string format. + + Raises: + IOError: If there is an error opening or writing to the device file. + """ + + ntfs_uuid_offset = 0x48 + ntfs_uuid_length = 8 + + binary_uuid = bytearray.fromhex(uuid) + binary_uuid.reverse() + + self.logger.info(f"Changing UUID on {device} to {uuid}") + with open(device, 'r+b') as ntfs_dev: + self.logger.debug("Reading %i bytes from offset %i", ntfs_uuid_length, ntfs_uuid_offset) + + ntfs_dev.seek(ntfs_uuid_offset) + prev_uuid = bytearray(ntfs_dev.read(ntfs_uuid_length)) + prev_uuid.reverse() + prev_uuid_hex = bytearray.hex(prev_uuid) + self.logger.debug(f"Previous UUID: {prev_uuid_hex}") + + self.logger.debug("Writing...") + ntfs_dev.seek(ntfs_uuid_offset) + ntfs_dev.write(binary_uuid)