Refactoring and more pydoc

fixes
Vadim vtroshchinskiy 2024-11-05 23:28:59 +01:00
parent 84cc3c2742
commit aa8f5bc22e
3 changed files with 716 additions and 400 deletions

View File

@ -0,0 +1,527 @@
import logging
import subprocess
import os
import json
import blkid
from ntfs import *
# pylint: disable=locally-disabled, line-too-long, logging-fstring-interpolation, too-many-lines
class FilesystemLibrary:
def __init__(self, ntfs_implementation = NTFSImplementation.KERNEL):
self.logger = logging.getLogger("OpengnsysFilesystemLibrary")
self.logger.setLevel(logging.DEBUG)
self.mounts = {}
self.base_mount_path = "/mnt"
self.ntfs_implementation = ntfs_implementation
self.update_mounts()
def _rmmod(self, module):
self.logger.debug("Trying to unload module {module}...")
subprocess.run(["/usr/sbin/rmmod", module], check=True)
def _modprobe(self, module):
self.logger.debug("Trying to load module {module}...")
subprocess.run(["/usr/sbin/modprobe", module], check=True)
# _parse_mounts
def update_mounts(self):
"""
Update the current mount points by parsing the /proc/mounts file.
This method reads the /proc/mounts file to gather information about
the currently mounted filesystems. It stores this information in a
dictionary where the keys are the mount points and the values are
dictionaries containing details about each filesystem.
The details stored for each filesystem include:
- device: The device file associated with the filesystem.
- mountpoint: The directory where the filesystem is mounted.
- type: The type of the filesystem (e.g., ext4, vfat).
- options: Mount options associated with the filesystem.
- dump_freq: The dump frequency for the filesystem.
- passno: The pass number for filesystem checks.
The method also adds an entry for each mount point with a trailing
slash to ensure consistency in accessing the mount points.
Attributes:
mounts (dict): A dictionary where keys are mount points and values
are dictionaries containing filesystem details.
"""
filesystems = {}
self.logger.debug("Parsing /proc/mounts")
with open("/proc/mounts", 'r', encoding='utf-8') as mounts:
for line in mounts:
parts = line.split()
data = {}
data['device'] = parts[0]
data['mountpoint'] = parts[1]
data['type'] = parts[2]
data['options'] = parts[3]
data['dump_freq'] = parts[4]
data['passno'] = parts[5]
filesystems[data["mountpoint"]] = data
filesystems[data["mountpoint"] + "/"] = data
self.mounts = filesystems
def find_mountpoint(self, device):
"""
Find the mount point for a given device.
This method checks if the specified device is currently mounted and returns
the corresponding mount point if it is found.
Args:
device (str): The path to the device to check.
Returns:
str or None: The mount point of the device if it is mounted, otherwise None.
"""
norm = os.path.normpath(device)
self.logger.debug(f"Checking if {device} is mounted")
for mountpoint, mount in self.mounts.items():
#self.logger.debug(f"Item: {mount}")
#self.logger.debug(f"Checking: " + mount['device'])
if mount['device'] == norm:
return mountpoint
return None
def find_device(self, mountpoint):
"""
Find the device corresponding to a given mount point.
Args:
mountpoint (str): The mount point to search for.
Returns:
str or None: The device corresponding to the mount point if found,
otherwise None.
"""
self.logger.debug("Finding device corresponding to mount point %s", mountpoint)
if mountpoint in self.mounts:
return self.mounts[mountpoint]['device']
else:
self.logger.warning("Failed to find mountpoint %s", mountpoint)
return None
def is_mounted(self, device = None, mountpoint = None):
def is_mounted(self, device=None, mountpoint=None):
"""
Check if a device or mountpoint is currently mounted.
Either checking by device or mountpoint is valid.
Args:
device (str, optional): The device to check if it is mounted.
Defaults to None.
mountpoint (str, optional): The mountpoint to check if it is mounted.
Defaults to None.
Returns:
bool: True if the device is mounted or the mountpoint is in the list
of mounts, False otherwise.
"""
self.update_mounts()
if device:
return not self.find_mountpoint(device) is None
else:
return mountpoint in self.mounts
def unmount(self, device = None, mountpoint = None):
def unmount(self, device=None, mountpoint=None):
"""
Unmounts a filesystem.
This method unmounts a filesystem either by the device name or the mountpoint.
If a device is provided, it finds the corresponding mountpoint and unmounts it.
If a mountpoint is provided directly, it unmounts the filesystem at that mountpoint.
Args:
device (str, optional): The device name to unmount. Defaults to None.
mountpoint (str, optional): The mountpoint to unmount. Defaults to None.
Raises:
subprocess.CalledProcessError: If the unmount command fails.
Logs:
Debug information about the unmounting process.
"""
if device:
self.logger.debug("Finding mountpoint of %s", device)
mountpoint = self.find_mountpoint(device)
if not mountpoint is None:
self.logger.debug(f"Unmounting {mountpoint}")
subprocess.run(["/usr/bin/umount", mountpoint], check=True)
# We've unmounted a new filesystem, update our filesystems list
self.update_mounts()
else:
self.logger.debug(f"{device} is not mounted")
def mount(self, device, mountpoint, filesystem = None):
"""
Mounts a device to a specified mountpoint.
Parameters:
device (str): The device to be mounted (e.g., '/dev/sda1').
mountpoint (str): The directory where the device will be mounted.
filesystem (str, optional): The type of filesystem to be used (e.g., 'ext4', 'ntfs'). Defaults to None.
Raises:
subprocess.CalledProcessError: If the mount command fails.
Logs:
Debug information about the mounting process, including the mount command, return code, stdout, and stderr.
Side Effects:
Creates the mountpoint directory if it does not exist.
Updates the internal list of mounted filesystems.
"""
self.logger.debug(f"Mounting {device} at {mountpoint}")
if not os.path.exists(mountpoint):
self.logger.debug(f"Creating directory {mountpoint}")
os.mkdir(mountpoint)
mount_cmd = ["/usr/bin/mount"]
if not filesystem is None:
mount_cmd = mount_cmd + ["-t", filesystem]
mount_cmd = mount_cmd + [device, mountpoint]
self.logger.debug(f"Mount command: {mount_cmd}")
result = subprocess.run(mount_cmd, check=True, capture_output = True)
self.logger.debug(f"retorno: {result.returncode}")
self.logger.debug(f"stdout: {result.stdout}")
self.logger.debug(f"stderr: {result.stderr}")
# We've mounted a new filesystem, update our filesystems list
self.update_mounts()
def ensure_mounted(self, device):
"""
Ensure that the given device is mounted.
This method attempts to mount the specified device to a path derived from
the base mount path and the device's basename. If the device is of type NTFS,
it uses the NTFSLibrary to handle the mounting process. For other filesystem
types, it uses a generic mount method.
Args:
device (str): The path to the device that needs to be mounted.
Returns:
str: The path where the device is mounted.
Logs:
- Info: When starting the mounting process.
- Debug: Various debug information including the mount path, filesystem type,
and success message.
Raises:
OSError: If there is an error creating the mount directory or mounting the device.
"""
self.logger.info("Mounting %s", device)
self.unmount(device = device)
path = os.path.join(self.base_mount_path, os.path.basename(device))
self.logger.debug(f"Will mount repo at {path}")
if not os.path.exists(path):
os.mkdir(path)
if self.filesystem_type(device) == "ntfs":
self.logger.debug("Handing a NTFS filesystem")
self._modprobe("ntfs3")
self.ntfsfix(device)
ntfs = NTFSLibrary(self.ntfs_implementation)
ntfs.mount_filesystem(device, path)
self.update_mounts()
else:
self.logger.debug("Handling a non-NTFS filesystem")
self.mount(device, path)
self.logger.debug("Successfully mounted at %s", path)
return path
def filesystem_type(self, device = None, mountpoint = None):
"""
Determine the filesystem type of a given device or mountpoint.
Args:
device (str, optional): The device to probe. If not provided, the device
will be determined based on the mountpoint.
mountpoint (str, optional): The mountpoint to find the device for. This
is used only if the device is not provided.
Returns:
str: The filesystem type of the device.
Raises:
KeyError: If the filesystem type cannot be determined from the probe.
Logs:
Debug: Logs the process of finding the device, probing the device, and
the determined filesystem type.
"""
if device is None:
self.logger.debug("Finding device for mountpoint %s", mountpoint)
device = self.find_device(mountpoint)
self.logger.debug(f"Probing {device}")
pr = blkid.Probe()
pr.set_device(device)
pr.enable_superblocks(True)
pr.set_superblocks_flags(blkid.SUBLKS_TYPE | blkid.SUBLKS_USAGE | blkid.SUBLKS_UUID | blkid.SUBLKS_UUIDRAW | blkid.SUBLKS_LABELRAW)
pr.do_safeprobe()
fstype = pr["TYPE"].decode('utf-8')
self.logger.debug(f"FS type is {fstype}")
return fstype
def is_filesystem(self, path):
"""
Check if the given path is a filesystem root.
Args:
path (str): The path to check.
Returns:
bool: True if the path is a filesystem root, False otherwise.
"""
# This is just an alias for better code readability
return self.is_mounted(mountpoint = path)
def create_filesystem(self, fs_type = None, fs_uuid = None, device = None):
"""
Create a filesystem on the specified device.
Parameters:
fs_type (str): The type of filesystem to create (e.g., 'ntfs', 'ext4', 'xfs', 'btrfs').
fs_uuid (str): The UUID to assign to the filesystem.
device (str): The device on which to create the filesystem (e.g., '/dev/sda1').
Raises:
RuntimeError: If the filesystem type is not recognized or if the filesystem creation command fails.
"""
self.logger.info(f"Creating filesystem {fs_type} with UUID {fs_uuid} in {device}")
if fs_type == "ntfs" or fs_type == "ntfs3":
self.logger.debug("Creating NTFS filesystem")
ntfs = NTFSLibrary(self.ntfs_implementation)
ntfs.create_filesystem(device, "NTFS")
ntfs.modify_uuid(device, fs_uuid)
else:
command = [f"/usr/sbin/mkfs.{fs_type}"]
command_args = []
if fs_type == "ext4" or fs_type == "ext3":
command_args = ["-U", fs_uuid, "-F", device]
elif fs_type == "xfs":
command_args = ["-m", f"uuid={fs_uuid}", "-f", device]
elif fs_type == "btrfs":
command_args = ["-U", fs_uuid, "-f", device]
else:
raise RuntimeError(f"Don't know how to create filesystem of type {fs_type}")
command = command + command_args
self.logger.debug(f"Creating Linux filesystem of type {fs_type} on {device}, command {command}")
result = subprocess.run(command, check = True, capture_output=True)
self.logger.debug(f"retorno: {result.returncode}")
self.logger.debug(f"stdout: {result.stdout}")
self.logger.debug(f"stderr: {result.stderr}")
def mklostandfound(self, path):
"""
Recreate the lost+found if necessary.
When cloning at the root of a filesystem, cleaning the contents
removes the lost+found directory. This is a special directory that requires the use of
a tool to recreate it.
It may fail if the filesystem does not need it. We consider this harmless and ignore it.
The command is entirely skipped on NTFS, as mklost+found may malfunction if run on it,
and has no useful purpose.
"""
if self.is_filesystem(path):
if self.filesystem_type(mountpoint=path) == "ntfs":
self.logger.debug("Not running mklost+found on NTFS")
return
curdir = os.getcwd()
result = None
try:
self.logger.debug(f"Re-creating lost+found in {path}")
os.chdir(path)
result = subprocess.run(["/usr/sbin/mklost+found"], check=True, capture_output=True)
except subprocess.SubprocessError as e:
self.logger.warning(f"Error running mklost+found: {e}")
if result:
self.logger.debug(f"retorno: {result.returncode}")
self.logger.debug(f"stdout: {result.stdout}")
self.logger.debug(f"stderr: {result.stderr}")
os.chdir(curdir)
def ntfsfix(self, device):
"""
Run the ntfsfix command on the specified device.
This method uses the ntfsfix utility to fix common NTFS problems on the given device.
This allows mounting an unclean NTFS filesystem.
Args:
device (str): The path to the device to be fixed.
Raises:
subprocess.CalledProcessError: If the ntfsfix command fails.
"""
self.logger.debug(f"Running ntfsfix on {device}")
subprocess.run(["/usr/bin/ntfsfix", "-d", device], check=True)
def unload_ntfs(self):
"""
Unloads the NTFS filesystem module.
This is a function added as a result of NTFS kernel module troubleshooting,
to try to ensure that NTFS code is only active as long as necessary.
The module is internally loaded as needed, so there's no load_ntfs function.
It may be removed in the future.
Raises:
RuntimeError: If the module cannot be removed.
"""
self._rmmod("ntfs3")
def find_boot_device(self):
"""
Searches for the EFI boot partition on the system.
This method scans the system's partitions to locate the EFI boot partition,
which is identified by the GUID "C12A7328-F81F-11D2-BA4B-00A0C93EC93B".
Returns:
str: The device node of the EFI partition if found, otherwise None.
Logs:
- Debug messages indicating the progress of the search.
- A warning message if the EFI partition is not found.
"""
disks = []
self.logger.debug("Looking for EFI partition")
with open("/proc/partitions", "r", encoding='utf-8') as partitions_file:
line_num=0
for line in partitions_file:
if line_num >=2:
data = line.split()
disk = data[3]
disks.append(disk)
self.logger.debug(f"Disk: {disk}")
line_num = line_num + 1
for disk in disks:
self.logger.debug("Loading partitions for disk %s", disk)
#disk_json_data = subprocess.run(["/usr/sbin/sfdisk", "-J", f"/dev/{disk}"], check=False, capture_output=True)
sfdisk_out = subprocess.run(["/usr/sbin/sfdisk", "-J", f"/dev/{disk}"], check=False, capture_output=True)
if sfdisk_out.returncode == 0:
disk_json_data = sfdisk_out.stdout
disk_data = json.loads(disk_json_data)
for part in disk_data["partitiontable"]["partitions"]:
self.logger.debug("Checking partition %s", part)
if part["type"] == "C12A7328-F81F-11D2-BA4B-00A0C93EC93B":
self.logger.debug("EFI partition found at %s", part["node"])
return part["node"]
else:
self.logger.debug("sfdisk returned with code %i, error %s", sfdisk_out.returncode, sfdisk_out.stderr)
self.logger.warning("Failed to find EFI partition!")
def temp_unmount(self, mountpoint):
"""
Temporarily unmounts the filesystem at the given mountpoint.
This method finds the device associated with the specified mountpoint,
and returns the information to remount it with temp_remount.
The purpose of this function is to temporarily unmount a filesystem for
actions like fsck, and to mount it back afterwards.
Args:
mountpoint (str): The mountpoint of the filesystem to unmount.
Returns:
dict: A dictionary containing the information needed to remount the filesystem.
"""
device = self.find_device(mountpoint)
fs = self.filesystem_type(mountpoint = mountpoint)
data = {"mountpoint" : mountpoint, "device" :device, "filesystem" : fs}
self.logger.debug("Temporarily unmounting device %s, mounted on %s, fs type %s", mountpoint, device, fs)
self.unmount(mountpoint = mountpoint)
return data
def temp_remount(self, unmount_data):
"""
Remounts a filesystem unmounted with temp_unmount
This method remounts a filesystem using the data provided by temp_unmount
Args:
unmount_data (dict): A dictionary containing the data needed to remount the filesystem.
Returns:
None
"""
self.logger.debug("Remounting temporarily unmounted device %s on %s, fs type %s", unmount_data["device"], unmount_data["mountpoint"], unmount_data["filesystem"])
self.mount(device = unmount_data["device"], mountpoint=unmount_data["mountpoint"], filesystem=unmount_data["filesystem"])

View File

@ -28,13 +28,14 @@ import base64
import stat
import time
from enum import Enum
import git
import libarchive
import xattr
import posix1e
import blkid
from filesystem import *
from ntfs import *
class OgProgressPrinter(git.RemoteProgress):
"""
@ -108,129 +109,6 @@ class RequirementException(Exception):
super().__init__(message)
self.message = message
class NTFSImplementation(Enum):
KERNEL = 1
NTFS3G = 2
class NTFSLibrary:
"""
A library for managing NTFS filesystems.
Attributes:
logger (logging.Logger): Logger for the class.
implementation (NTFSImplementation): The implementation to use for mounting NTFS filesystems.
"""
def __init__(self, implementation):
"""
Initializes the instance with the given implementation.
Args:
implementation: The implementation to be used by the instance.
Attributes:
logger (logging.Logger): Logger instance for the class, set to debug level.
implementation: The implementation provided during initialization.
"""
self.logger = logging.getLogger("NTFSLibrary")
self.logger.setLevel(logging.DEBUG)
self.implementation = implementation
self.logger.debug("Initializing")
def create_filesystem(self, device, label):
"""
Creates an NTFS filesystem on the specified device with the given label.
Args:
device (str): The device path where the NTFS filesystem will be created.
label (str): The label to assign to the NTFS filesystem.
Returns:
None
Logs:
Logs the creation process with the device and label information.
"""
self.logger.info(f"Creating NTFS in {device} with label {label}")
subprocess.run(["/usr/sbin/mkntfs", device, "-Q", "-L", label], check=True)
def mount_filesystem(self, device, mountpoint):
"""
Mounts a filesystem on the specified mountpoint using the specified NTFS implementation.
Args:
device (str): The device path to be mounted (e.g., '/dev/sda1').
mountpoint (str): The directory where the device will be mounted.
Raises:
ValueError: If the NTFS implementation is unknown.
"""
self.logger.info(f"Mounting {device} in {mountpoint} using implementation {self.implementation}")
if self.implementation == NTFSImplementation.KERNEL:
subprocess.run(["/usr/bin/mount", "-t", "ntfs3", device, mountpoint], check = True)
elif self.implementation == NTFSImplementation.NTFS3G:
subprocess.run(["/usr/bin/ntfs-3g", device, mountpoint], check = True)
else:
raise ValueError("Unknown NTFS implementation: {self.implementation}")
def _hex_to_bin(self, hex_str):
while len(hex_str) != 16:
hex_str = "0" + hex_str
hex_int = int(hex_str, 16)
binary = bin(hex_int)[2:].zfill(64)
return binary
def _bin_to_hex(self, binary_data):
binary_int = int(binary_data, 256)
hex_str = hex(binary_int)[2:]
hex_str = hex_str.zfill((len(binary_data) + 3) // 4)
return hex_str
def modify_uuid(self, device, uuid):
"""
Modify the UUID of an NTFS device.
This function changes the UUID of the specified NTFS device to the given UUID.
It reads the current UUID from the device, logs the change, and writes the new UUID.
Args:
device (str): The path to the NTFS device file.
uuid (str): The new UUID to be set, in hexadecimal string format.
Raises:
IOError: If there is an error opening or writing to the device file.
"""
ntfs_uuid_offset = 0x48
ntfs_uuid_length = 8
binary_uuid = bytearray.fromhex(uuid)
binary_uuid.reverse()
self.logger.info(f"Changing UUID on {device} to {uuid}")
with open(device, 'r+b') as ntfs_dev:
self.logger.debug("Reading %i bytes from offset %i", ntfs_uuid_length, ntfs_uuid_offset)
ntfs_dev.seek(ntfs_uuid_offset)
prev_uuid = bytearray(ntfs_dev.read(ntfs_uuid_length))
prev_uuid.reverse()
prev_uuid_hex = bytearray.hex(prev_uuid)
self.logger.debug(f"Previous UUID: {prev_uuid_hex}")
self.logger.debug("Writing...")
ntfs_dev.seek(ntfs_uuid_offset)
ntfs_dev.write(binary_uuid)
class OpengnsysGitLibrary:
"""OpenGnsys Git Library"""
@ -263,8 +141,11 @@ class OpengnsysGitLibrary:
self.logger.setLevel(logging.DEBUG)
self.logger.debug(f"Initializing. Cache = {require_cache}, ntfs = {ntfs_implementation}")
self.fs = FilesystemLibrary(ntfs_implementation = ntfs_implementation)
#self.ntfs = NTFSLibrary()
#self.repo_server = "192.168.2.1"
self.mounts = self._parse_mounts()
self.repo_user = "oggit"
self.repo_image_path = "oggit"
self.ntfs_implementation = ntfs_implementation
@ -353,105 +234,6 @@ class OpengnsysGitLibrary:
"""
return os.path.exists("/sys/firmware/efi")
def _parse_mounts(self):
filesystems = {}
self.logger.debug("Parsing /proc/mounts")
with open("/proc/mounts", 'r', encoding='utf-8') as mounts:
for line in mounts:
parts = line.split()
data = {}
data['device'] = parts[0]
data['mountpoint'] = parts[1]
data['type'] = parts[2]
data['options'] = parts[3]
data['dump_freq'] = parts[4]
data['passno'] = parts[5]
filesystems[data["mountpoint"]] = data
filesystems[data["mountpoint"] + "/"] = data
return filesystems
def _find_mountpoint(self, device):
norm = os.path.normpath(device)
self.logger.debug(f"Checking if {device} is mounted")
for mountpoint, mount in self.mounts.items():
#self.logger.debug(f"Item: {mount}")
#self.logger.debug(f"Checking: " + mount['device'])
if mount['device'] == norm:
return mountpoint
return None
def _is_device_mounted(self, device):
return not self._find_mountpoint(device) is None
def _unmount_device(self, device):
mountpoint = self._find_mountpoint(device)
if not mountpoint is None:
self.logger.debug(f"Unmounting {mountpoint}")
subprocess.run(["/usr/bin/umount", mountpoint], check=True)
# We've unmounted a new filesystem, update our filesystems list
self.mounts = self._parse_mounts()
else:
self.logger.debug(f"{device} is not mounted")
def _rmmod(self, module):
self.logger.debug("Trying to unload module {module}...")
subprocess.run(["/usr/sbin/rmmod", module], check=True)
def _modprobe(self, module):
self.logger.debug("Trying to load module {module}...")
subprocess.run(["/usr/sbin/modprobe", module], check=True)
def _mount(self, device, mountpoint, filesystem = None):
self.logger.debug(f"Mounting {device} at {mountpoint}")
if not os.path.exists(mountpoint):
self.logger.debug(f"Creating directory {mountpoint}")
os.mkdir(mountpoint)
mount_cmd = ["/usr/bin/mount"]
if not filesystem is None:
mount_cmd = mount_cmd + ["-t", filesystem]
mount_cmd = mount_cmd + [device, mountpoint]
self.logger.debug(f"Mount command: {mount_cmd}")
result = subprocess.run(mount_cmd, check=True, capture_output = True)
self.logger.debug(f"retorno: {result.returncode}")
self.logger.debug(f"stdout: {result.stdout}")
self.logger.debug(f"stderr: {result.stderr}")
# We've mounted a new filesystem, update our filesystems list
self.mounts = self._parse_mounts()
def _ntfsfix(self, device):
self.logger.debug(f"Running ntfsfix on {device}")
subprocess.run(["/usr/bin/ntfsfix", "-d", device], check=True)
def _filesystem_type(self, device):
self.logger.debug(f"Probing {device}")
pr = blkid.Probe()
pr.set_device(device)
pr.enable_superblocks(True)
pr.set_superblocks_flags(blkid.SUBLKS_TYPE | blkid.SUBLKS_USAGE | blkid.SUBLKS_UUID | blkid.SUBLKS_UUIDRAW | blkid.SUBLKS_LABELRAW)
pr.do_safeprobe()
fstype = pr["TYPE"].decode('utf-8')
self.logger.debug(f"FS type is {fstype}")
return fstype
def _write_ignore_list(self, base_path):
ignore_file = base_path + "/.gitignore"
@ -484,59 +266,7 @@ class OpengnsysGitLibrary:
self.logger.debug("%i parameters found", len(params))
return params
def _is_filesystem(self, path):
"""
Check if the given path is a filesystem root.
This method reads the '/proc/mounts' file to determine if the specified
path is a mount point (i.e., a filesystem root).
Args:
path (str): The path to check.
Returns:
bool: True if the path is a filesystem root, False otherwise.
"""
with open('/proc/mounts', 'r', encoding='utf-8') as mounts:
for mount in mounts:
parts = mount.split()
self.logger.debug(f"Parts: {parts}")
mountpoint = parts[1]
if os.path.normpath(mountpoint) == os.path.normpath(path):
self.logger.debug(f"Directory {path} is a filesystem root")
return True
self.logger.debug(f"Directory {path} is not a filesystem root")
return False
def _mklostandfound(self, path):
"""Recreate the lost+found if necessary.
When cloning at the root of a filesystem, cleaning the contents
removes the lost+found directory. This is a special directory that requires the use of
a tool to recreate it.
It may fail if the filesystem does not need it. We consider this harmless and ignore it.
"""
if self._is_filesystem(path):
curdir = os.getcwd()
result = None
try:
self.logger.debug(f"Re-creating lost+found in {path}")
os.chdir(path)
result = subprocess.run(["/usr/sbin/mklost+found"], check=True, capture_output=True)
except subprocess.SubprocessError as e:
self.logger.warning(f"Error running mklost+found: {e}")
if result:
self.logger.debug(f"retorno: {result.returncode}")
self.logger.debug(f"stdout: {result.stdout}")
self.logger.debug(f"stderr: {result.stderr}")
os.chdir(curdir)
def _ntfs_secaudit(self, data):
@ -562,42 +292,54 @@ class OpengnsysGitLibrary:
os.rename(metadata_file + ".new", metadata_file)
def _ntfs_restore_secaudit(self, path):
self.logger.debug("Restoring NTFS metadata for %s", path)
if not self.fs.is_filesystem(path):
self.logger.error("Path %s is not a filesystem!")
return
if self.fs.filesystem_type(mountpoint = path) != "ntfs":
self.logger.error("Path %s is not NTFS!", path)
return
metadata_file = os.path.join(path,".opengnsys-metadata", "ntfs_secaudit.txt")
secaudit_data = ""
self.logger.debug("Reading audit metadata from %s...", metadata_file)
with open(metadata_file, "rb") as meta:
secaudit_data = meta.read()
self.logger.debug("Read %i bytes", len(secaudit_data))
device = self.fs.find_device(path)
mountdata = self.fs.temp_unmount(path)
self.logger.info("Restoring secaudit data...")
result = subprocess.run(["/usr/bin/ntfssecaudit", "-se", device], check=False, capture_output=True, input=secaudit_data)
if result.returncode == 0:
self.logger.debug("Completed, return code %i", result.returncode)
self.logger.debug("STDOUT: %s", result.stdout)
self.logger.debug("STDERR: %s", result.stderr)
else:
# An error return code can be returned for reasons like missing files, so we deal this
# as non-fatal.
self.logger.error("Completed, return code %i", result.returncode)
self.logger.error("STDOUT: %s", result.stdout)
self.logger.error("STDERR: %s", result.stderr)
self.fs.temp_remount(mountdata)
def _create_filesystems(self, fs_data, fs_map):
for mountpoint in fs_map:
dest_device = fs_map[mountpoint]
data = fs_data[mountpoint]
fs_type = data["type"]
fs_uuid = data["uuid"]
self.fs.create_filesystem(device = dest_device, fs_type = data["type"], fs_uuid = data["uuid"])
self.logger.info(f"Creating filesystem {fs_type} with UUID {fs_uuid} in {dest_device}")
if fs_type == "ntfs" or fs_type == "ntfs3":
self.logger.debug("Creating NTFS filesystem")
ntfs = NTFSLibrary(self.ntfs_implementation)
ntfs.create_filesystem(dest_device, "NTFS")
#ntfs.modify_uuid(dest_device, fs_uuid)
else:
command = [f"/usr/sbin/mkfs.{fs_type}"]
command_args = []
if fs_type == "ext4" or fs_type == "ext3":
command_args = ["-U", fs_uuid, "-F", dest_device]
elif fs_type == "xfs":
command_args = ["-m", f"uuid={fs_uuid}", "-f", dest_device]
elif fs_type == "btrfs":
command_args = ["-U", fs_uuid, "-f", dest_device]
else:
raise RuntimeError(f"Don't know how to create filesystem of type {fs_type}")
command = command + command_args
self.logger.debug(f"Creating Linux filesystem of type {fs_type} on {dest_device}, command {command}")
result = subprocess.run(command, check = True, capture_output=True)
self.logger.debug(f"retorno: {result.returncode}")
self.logger.debug(f"stdout: {result.stdout}")
self.logger.debug(f"stderr: {result.stderr}")
def _grub_install(self, root_directory, boot_device):
"""
@ -653,8 +395,8 @@ class OpengnsysGitLibrary:
shutil.Error: If an error occurs during the copying of the EFI data.
"""
boot_device = self._find_boot_device()
boot_mount = self._find_mountpoint(boot_device)
boot_device = self.fs.find_boot_device()
boot_mount = self.fs.find_mountpoint(boot_device)
self.logger.info(f"Installing EFI files in {boot_mount}")
meta_dir = os.path.join(root_directory, ".opengnsys-metadata")
@ -685,8 +427,8 @@ class OpengnsysGitLibrary:
def _efi_copy(self, root_directory, system_specific = False, config_name = None):
meta_dir = os.path.join(root_directory, ".opengnsys-metadata")
boot_device = self._find_boot_device()
boot_mount = self._find_mountpoint(boot_device)
boot_device = self.fs.find_boot_device()
boot_mount = self.fs.find_mountpoint(boot_device)
efi_files_dir = ""
@ -718,53 +460,6 @@ class OpengnsysGitLibrary:
def _find_boot_device(self):
"""
Searches for the EFI boot partition on the system.
This method scans the system's partitions to locate the EFI boot partition,
which is identified by the GUID "C12A7328-F81F-11D2-BA4B-00A0C93EC93B".
Returns:
str: The device node of the EFI partition if found, otherwise None.
Logs:
- Debug messages indicating the progress of the search.
- A warning message if the EFI partition is not found.
"""
disks = []
self.logger.debug("Looking for EFI partition")
with open("/proc/partitions", "r", encoding='utf-8') as partitions_file:
line_num=0
for line in partitions_file:
if line_num >=2:
data = line.split()
disk = data[3]
disks.append(disk)
self.logger.debug(f"Disk: {disk}")
line_num = line_num + 1
for disk in disks:
self.logger.debug("Loading partitions for disk %s", disk)
#disk_json_data = subprocess.run(["/usr/sbin/sfdisk", "-J", f"/dev/{disk}"], check=False, capture_output=True)
sfdisk_out = subprocess.run(["/usr/sbin/sfdisk", "-J", f"/dev/{disk}"], check=False, capture_output=True)
if sfdisk_out.returncode == 0:
disk_json_data = sfdisk_out.stdout
disk_data = json.loads(disk_json_data)
for part in disk_data["partitiontable"]["partitions"]:
self.logger.debug("Checking partition %s", part)
if part["type"] == "C12A7328-F81F-11D2-BA4B-00A0C93EC93B":
self.logger.debug("EFI partition found at %s", part["node"])
return part["node"]
else:
self.logger.debug("sfdisk returned with code %i, error %s", sfdisk_out.returncode, sfdisk_out.stderr)
self.logger.warning("Failed to find EFI partition!")
def _delete_contents(self, path):
@ -982,8 +677,8 @@ class OpengnsysGitLibrary:
mount_found = False
if root in self.mounts:
mount = self.mounts[root]
if root in self.fs.mounts:
mount = self.fs.mounts[root]
root_path_rel = root[len(path):None]
if len(root_path_rel) == 0 or root_path_rel[0] != "/":
root_path_rel = "/" + root_path_rel
@ -1038,7 +733,7 @@ class OpengnsysGitLibrary:
errstr = f"""We've failed to find metadata for the root filesystem!
Root: {root}
FS data: """
errstr = errstr + json.dumps(self.mounts, indent=4)
errstr = errstr + json.dumps(self.fs.mounts, indent=4)
self.logger.error(errstr)
raise RuntimeError(errstr)
@ -1235,7 +930,6 @@ class OpengnsysGitLibrary:
"""
self.logger.debug("Initializing")
self.mounts = self._parse_mounts()
self.logger.info(f"Restoring metadata in {path}")
meta_dir = os.path.join(path, ".opengnsys-metadata")
@ -1428,34 +1122,6 @@ class OpengnsysGitLibrary:
repo.config_writer().add_value("user", "email", "OpenGnsys@opengnsys.com").release()
repo.config_writer().add_value("core", "filemode", "false").release()
def _ensure_mounted(self, device):
self.logger.info("Mounting %s", device)
self._unmount_device(device)
path = os.path.join("/mnt", os.path.basename(device))
self.logger.debug(f"Will mount repo at {path}")
if not os.path.exists(path):
os.mkdir(path)
if self._filesystem_type(device) == "ntfs":
self.logger.debug("Handing a NTFS filesystem")
self._modprobe("ntfs3")
self._ntfsfix(device)
ntfs = NTFSLibrary(self.ntfs_implementation)
ntfs.mount_filesystem(device, path)
self.mounts = self._parse_mounts()
else:
self.logger.debug("Handling a non-NTFS filesystem")
self._mount(device, path)
self.logger.debug("Successfully mounted at %s", path)
return path
def initRepo(self, device, repo_name):
"""
Initialize a Git repository on a specified device.
@ -1477,7 +1143,7 @@ class OpengnsysGitLibrary:
- The method sets up a remote origin and pushes the initial commit.
"""
path = self._ensure_mounted(device)
path = self.fs.ensure_mounted(device)
self.logger.info("Initializing repository: %s", path)
git_dir = os.path.join(path, ".git")
@ -1578,7 +1244,9 @@ class OpengnsysGitLibrary:
#self.logger.debug("Commit done, will unmount now")
#self._umount_device(device)
self._rmmod("ntfs3")
self.fs.unload_ntfs()
repo_url = self._getOgRepository(repo_name)
@ -1646,7 +1314,7 @@ class OpengnsysGitLibrary:
if repo_is_efi != efi:
raise RequirementException("Repositorio usa sistema de arranque incompatible con sistema actual")
self._unmount_device(destination)
self.fs.unmount(device = destination)
filesystem_map = {"/" : destination}
@ -1654,7 +1322,7 @@ class OpengnsysGitLibrary:
destination_dir = "/mnt/repo-" + repo_name
self._mount(destination, destination_dir)
self.fs.mount(destination, destination_dir)
self._delete_contents(destination_dir)
@ -1667,9 +1335,14 @@ class OpengnsysGitLibrary:
else:
self._grub_install(root_directory=destination_dir, boot_device=boot_device)
self._mklostandfound(destination_dir)
self.fs.mklostandfound(destination_dir)
self._restore_metadata(destination_dir)
if self.fs.filesystem_type(mountpoint = destination_dir) == "ntfs":
self._ntfs_restore_secaudit(destination_dir)
self.logger.info("Clone completed.")
def commit(self, path = None, device = None, message = None):
"""
@ -1677,7 +1350,7 @@ class OpengnsysGitLibrary:
"""
if path is None:
path = self._ensure_mounted(device)
path = self.fs.ensure_mounted(device)
self.logger.info("Committing changes to repository")
repo = git.Repo(path)
@ -1715,7 +1388,7 @@ class OpengnsysGitLibrary:
if path is None:
path = self._ensure_mounted(device)
path = self.fs.ensure_mounted(device)
repo = git.Repo(path)
@ -1730,7 +1403,7 @@ class OpengnsysGitLibrary:
if path is None:
path = self._ensure_mounted(device)
path = self.fs.ensure_mounted(device)
repo = git.Repo(path)
@ -1817,6 +1490,7 @@ if __name__ == '__main__':
parser.add_argument("--test-efi-install", type=str, metavar="DIR", help = "Install EFI")
parser.add_argument("-m", "--message", type=str, metavar="MSG", help="Commit message")
parser.add_argument("--test-set-ntfsid", type=str, metavar="ID", help="Set NTFS ID")
parser.add_argument("--test-restore-secaudit",type=str, metavar="DIR", help="Test restoring NTFS secaudit")
parser.add_argument("--device", type=str, metavar="DEV", help="Device to set the UUID on")
parser.add_argument("-v", "--verbose", action="store_true", help = "Verbose console output")
@ -1885,8 +1559,12 @@ if __name__ == '__main__':
ntfs.modify_uuid(args.device, args.test_set_ntfsid)
elif args.test_efi_install:
og_git._efi_install(root_directory=args.test_efi_install, config_name = args.efi_config) # pylint: disable=protected-access
elif args.test_restore_secaudit:
og_git._ntfs_restore_secaudit(args.test_restore_secaudit) # pylint: disable=protected-access
else:
print("Debe especificar una acción")
print("Please specify an action.")
parser.print_help()
sys.exit(1)
#
# Make sure all filesystem changes are written, just in case the oglive is rebooted without an unmount

111
gitlib/ntfs.py 100644
View File

@ -0,0 +1,111 @@
import logging
import subprocess
from enum import Enum
class NTFSImplementation(Enum):
KERNEL = 1
NTFS3G = 2
class NTFSLibrary:
"""
A library for managing NTFS filesystems.
Attributes:
logger (logging.Logger): Logger for the class.
implementation (NTFSImplementation): The implementation to use for mounting NTFS filesystems.
"""
def __init__(self, implementation):
"""
Initializes the instance with the given implementation.
Args:
implementation: The implementation to be used by the instance.
Attributes:
logger (logging.Logger): Logger instance for the class, set to debug level.
implementation: The implementation provided during initialization.
"""
self.logger = logging.getLogger("NTFSLibrary")
self.logger.setLevel(logging.DEBUG)
self.implementation = implementation
self.logger.debug("Initializing")
def create_filesystem(self, device, label):
"""
Creates an NTFS filesystem on the specified device with the given label.
Args:
device (str): The device path where the NTFS filesystem will be created.
label (str): The label to assign to the NTFS filesystem.
Returns:
None
Logs:
Logs the creation process with the device and label information.
"""
self.logger.info(f"Creating NTFS in {device} with label {label}")
subprocess.run(["/usr/sbin/mkntfs", device, "-Q", "-L", label], check=True)
def mount_filesystem(self, device, mountpoint):
"""
Mounts a filesystem on the specified mountpoint using the specified NTFS implementation.
Args:
device (str): The device path to be mounted (e.g., '/dev/sda1').
mountpoint (str): The directory where the device will be mounted.
Raises:
ValueError: If the NTFS implementation is unknown.
"""
self.logger.info(f"Mounting {device} in {mountpoint} using implementation {self.implementation}")
if self.implementation == NTFSImplementation.KERNEL:
subprocess.run(["/usr/bin/mount", "-t", "ntfs3", device, mountpoint], check = True)
elif self.implementation == NTFSImplementation.NTFS3G:
subprocess.run(["/usr/bin/ntfs-3g", device, mountpoint], check = True)
else:
raise ValueError("Unknown NTFS implementation: {self.implementation}")
def modify_uuid(self, device, uuid):
"""
Modify the UUID of an NTFS device.
This function changes the UUID of the specified NTFS device to the given UUID.
It reads the current UUID from the device, logs the change, and writes the new UUID.
Args:
device (str): The path to the NTFS device file.
uuid (str): The new UUID to be set, in hexadecimal string format.
Raises:
IOError: If there is an error opening or writing to the device file.
"""
ntfs_uuid_offset = 0x48
ntfs_uuid_length = 8
binary_uuid = bytearray.fromhex(uuid)
binary_uuid.reverse()
self.logger.info(f"Changing UUID on {device} to {uuid}")
with open(device, 'r+b') as ntfs_dev:
self.logger.debug("Reading %i bytes from offset %i", ntfs_uuid_length, ntfs_uuid_offset)
ntfs_dev.seek(ntfs_uuid_offset)
prev_uuid = bytearray(ntfs_dev.read(ntfs_uuid_length))
prev_uuid.reverse()
prev_uuid_hex = bytearray.hex(prev_uuid)
self.logger.debug(f"Previous UUID: {prev_uuid_hex}")
self.logger.debug("Writing...")
ntfs_dev.seek(ntfs_uuid_offset)
ntfs_dev.write(binary_uuid)