545 lines
20 KiB
Python
545 lines
20 KiB
Python
|
|
import logging
|
|
import subprocess
|
|
import os
|
|
import json
|
|
import blkid
|
|
import time
|
|
|
|
from ntfs import *
|
|
|
|
|
|
|
|
# pylint: disable=locally-disabled, line-too-long, logging-fstring-interpolation, too-many-lines
|
|
|
|
|
|
class FilesystemLibrary:
|
|
def __init__(self, ntfs_implementation = NTFSImplementation.KERNEL):
|
|
self.logger = logging.getLogger("OpengnsysFilesystemLibrary")
|
|
self.logger.setLevel(logging.DEBUG)
|
|
|
|
self.mounts = {}
|
|
self.base_mount_path = "/mnt"
|
|
self.ntfs_implementation = ntfs_implementation
|
|
|
|
self.update_mounts()
|
|
|
|
def _rmmod(self, module):
|
|
self.logger.debug("Trying to unload module {module}...")
|
|
subprocess.run(["/usr/sbin/rmmod", module], check=False)
|
|
|
|
def _modprobe(self, module):
|
|
self.logger.debug("Trying to load module {module}...")
|
|
subprocess.run(["/usr/sbin/modprobe", module], check=True)
|
|
|
|
|
|
# _parse_mounts
|
|
def update_mounts(self):
|
|
"""
|
|
Update the current mount points by parsing the /proc/mounts file.
|
|
|
|
This method reads the /proc/mounts file to gather information about
|
|
the currently mounted filesystems. It stores this information in a
|
|
dictionary where the keys are the mount points and the values are
|
|
dictionaries containing details about each filesystem.
|
|
|
|
The details stored for each filesystem include:
|
|
- device: The device file associated with the filesystem.
|
|
- mountpoint: The directory where the filesystem is mounted.
|
|
- type: The type of the filesystem (e.g., ext4, vfat).
|
|
- options: Mount options associated with the filesystem.
|
|
- dump_freq: The dump frequency for the filesystem.
|
|
- passno: The pass number for filesystem checks.
|
|
|
|
The method also adds an entry for each mount point with a trailing
|
|
slash to ensure consistency in accessing the mount points.
|
|
|
|
Attributes:
|
|
mounts (dict): A dictionary where keys are mount points and values
|
|
are dictionaries containing filesystem details.
|
|
"""
|
|
filesystems = {}
|
|
|
|
self.logger.debug("Parsing /proc/mounts")
|
|
|
|
with open("/proc/mounts", 'r', encoding='utf-8') as mounts:
|
|
for line in mounts:
|
|
parts = line.split()
|
|
data = {}
|
|
data['device'] = parts[0]
|
|
data['mountpoint'] = parts[1]
|
|
data['type'] = parts[2]
|
|
data['options'] = parts[3]
|
|
data['dump_freq'] = parts[4]
|
|
data['passno'] = parts[5]
|
|
|
|
filesystems[data["mountpoint"]] = data
|
|
filesystems[data["mountpoint"] + "/"] = data
|
|
|
|
self.mounts = filesystems
|
|
|
|
def find_mountpoint(self, device):
|
|
"""
|
|
Find the mount point for a given device.
|
|
|
|
This method checks if the specified device is currently mounted and returns
|
|
the corresponding mount point if it is found.
|
|
|
|
Args:
|
|
device (str): The path to the device to check.
|
|
|
|
Returns:
|
|
str or None: The mount point of the device if it is mounted, otherwise None.
|
|
"""
|
|
norm = os.path.normpath(device)
|
|
|
|
self.logger.debug(f"Checking if {device} is mounted")
|
|
for mountpoint, mount in self.mounts.items():
|
|
#self.logger.debug(f"Item: {mount}")
|
|
#self.logger.debug(f"Checking: " + mount['device'])
|
|
if mount['device'] == norm:
|
|
return mountpoint
|
|
|
|
return None
|
|
|
|
def find_device(self, mountpoint):
|
|
"""
|
|
Find the device corresponding to a given mount point.
|
|
|
|
Args:
|
|
mountpoint (str): The mount point to search for.
|
|
|
|
Returns:
|
|
str or None: The device corresponding to the mount point if found,
|
|
otherwise None.
|
|
"""
|
|
self.update_mounts()
|
|
self.logger.debug("Finding device corresponding to mount point %s", mountpoint)
|
|
if mountpoint in self.mounts:
|
|
return self.mounts[mountpoint]['device']
|
|
else:
|
|
self.logger.warning("Failed to find mountpoint %s", mountpoint)
|
|
return None
|
|
|
|
def is_mounted(self, device = None, mountpoint = None):
|
|
def is_mounted(self, device=None, mountpoint=None):
|
|
"""
|
|
Check if a device or mountpoint is currently mounted.
|
|
|
|
Either checking by device or mountpoint is valid.
|
|
|
|
Args:
|
|
device (str, optional): The device to check if it is mounted.
|
|
Defaults to None.
|
|
mountpoint (str, optional): The mountpoint to check if it is mounted.
|
|
Defaults to None.
|
|
|
|
Returns:
|
|
bool: True if the device is mounted or the mountpoint is in the list
|
|
of mounts, False otherwise.
|
|
"""
|
|
self.update_mounts()
|
|
if device:
|
|
return not self.find_mountpoint(device) is None
|
|
else:
|
|
return mountpoint in self.mounts
|
|
|
|
def unmount(self, device = None, mountpoint = None):
|
|
def unmount(self, device=None, mountpoint=None):
|
|
"""
|
|
Unmounts a filesystem.
|
|
|
|
This method unmounts a filesystem either by the device name or the mountpoint.
|
|
If a device is provided, it finds the corresponding mountpoint and unmounts it.
|
|
If a mountpoint is provided directly, it unmounts the filesystem at that mountpoint.
|
|
|
|
Args:
|
|
device (str, optional): The device name to unmount. Defaults to None.
|
|
mountpoint (str, optional): The mountpoint to unmount. Defaults to None.
|
|
|
|
Raises:
|
|
subprocess.CalledProcessError: If the unmount command fails.
|
|
|
|
Logs:
|
|
Debug information about the unmounting process.
|
|
"""
|
|
if device:
|
|
self.logger.debug("Finding mountpoint of %s", device)
|
|
mountpoint = self.find_mountpoint(device)
|
|
|
|
if not mountpoint is None:
|
|
self.logger.debug(f"Unmounting {mountpoint}")
|
|
|
|
done = False
|
|
start_time = time.time()
|
|
timeout = 60
|
|
|
|
|
|
while not done and (time.time() - start_time) < timeout:
|
|
ret = subprocess.run(["/usr/bin/umount", mountpoint], check=False, capture_output=True, encoding='utf-8')
|
|
if ret.returncode == 0:
|
|
done=True
|
|
else:
|
|
if "target is busy" in ret.stderr:
|
|
self.logger.debug("Filesystem busy, waiting. %.1f seconds left", timeout - (time.time() - start_time))
|
|
time.sleep(0.1)
|
|
else:
|
|
raise subprocess.CalledProcessError(ret.returncode, ret.args, output=ret.stdout, stderr=ret.stderr)
|
|
|
|
# We've unmounted a new filesystem, update our filesystems list
|
|
self.update_mounts()
|
|
else:
|
|
self.logger.debug(f"{device} is not mounted")
|
|
|
|
|
|
def mount(self, device, mountpoint, filesystem = None):
|
|
"""
|
|
Mounts a device to a specified mountpoint.
|
|
|
|
Parameters:
|
|
device (str): The device to be mounted (e.g., '/dev/sda1').
|
|
mountpoint (str): The directory where the device will be mounted.
|
|
filesystem (str, optional): The type of filesystem to be used (e.g., 'ext4', 'ntfs'). Defaults to None.
|
|
|
|
Raises:
|
|
subprocess.CalledProcessError: If the mount command fails.
|
|
|
|
Logs:
|
|
Debug information about the mounting process, including the mount command, return code, stdout, and stderr.
|
|
|
|
Side Effects:
|
|
Creates the mountpoint directory if it does not exist.
|
|
Updates the internal list of mounted filesystems.
|
|
"""
|
|
self.logger.debug(f"Mounting {device} at {mountpoint}")
|
|
|
|
if not os.path.exists(mountpoint):
|
|
self.logger.debug(f"Creating directory {mountpoint}")
|
|
os.mkdir(mountpoint)
|
|
|
|
mount_cmd = ["/usr/bin/mount"]
|
|
|
|
if not filesystem is None:
|
|
mount_cmd = mount_cmd + ["-t", filesystem]
|
|
|
|
mount_cmd = mount_cmd + [device, mountpoint]
|
|
|
|
self.logger.debug(f"Mount command: {mount_cmd}")
|
|
result = subprocess.run(mount_cmd, check=True, capture_output = True)
|
|
|
|
self.logger.debug(f"retorno: {result.returncode}")
|
|
self.logger.debug(f"stdout: {result.stdout}")
|
|
self.logger.debug(f"stderr: {result.stderr}")
|
|
|
|
# We've mounted a new filesystem, update our filesystems list
|
|
self.update_mounts()
|
|
|
|
def ensure_mounted(self, device):
|
|
"""
|
|
Ensure that the given device is mounted.
|
|
|
|
This method attempts to mount the specified device to a path derived from
|
|
the base mount path and the device's basename. If the device is of type NTFS,
|
|
it uses the NTFSLibrary to handle the mounting process. For other filesystem
|
|
types, it uses a generic mount method.
|
|
|
|
Args:
|
|
device (str): The path to the device that needs to be mounted.
|
|
|
|
Returns:
|
|
str: The path where the device is mounted.
|
|
|
|
Logs:
|
|
- Info: When starting the mounting process.
|
|
- Debug: Various debug information including the mount path, filesystem type,
|
|
and success message.
|
|
|
|
Raises:
|
|
OSError: If there is an error creating the mount directory or mounting the device.
|
|
"""
|
|
|
|
self.logger.info("Mounting %s", device)
|
|
|
|
self.unmount(device = device)
|
|
path = os.path.join(self.base_mount_path, os.path.basename(device))
|
|
|
|
self.logger.debug(f"Will mount repo at {path}")
|
|
if not os.path.exists(path):
|
|
os.mkdir(path)
|
|
|
|
if self.filesystem_type(device) == "ntfs":
|
|
self.logger.debug("Handing a NTFS filesystem")
|
|
|
|
self._modprobe("ntfs3")
|
|
self.ntfsfix(device)
|
|
|
|
ntfs = NTFSLibrary(self.ntfs_implementation)
|
|
ntfs.mount_filesystem(device, path)
|
|
self.update_mounts()
|
|
|
|
else:
|
|
self.logger.debug("Handling a non-NTFS filesystem")
|
|
self.mount(device, path)
|
|
|
|
self.logger.debug("Successfully mounted at %s", path)
|
|
return path
|
|
|
|
|
|
def filesystem_type(self, device = None, mountpoint = None):
|
|
"""
|
|
Determine the filesystem type of a given device or mountpoint.
|
|
|
|
Args:
|
|
device (str, optional): The device to probe. If not provided, the device
|
|
will be determined based on the mountpoint.
|
|
mountpoint (str, optional): The mountpoint to find the device for. This
|
|
is used only if the device is not provided.
|
|
|
|
Returns:
|
|
str: The filesystem type of the device.
|
|
|
|
Raises:
|
|
KeyError: If the filesystem type cannot be determined from the probe.
|
|
|
|
Logs:
|
|
Debug: Logs the process of finding the device, probing the device, and
|
|
the determined filesystem type.
|
|
"""
|
|
|
|
if device is None:
|
|
self.logger.debug("Finding device for mountpoint %s", mountpoint)
|
|
device = self.find_device(mountpoint)
|
|
|
|
self.logger.debug(f"Probing {device}")
|
|
|
|
pr = blkid.Probe()
|
|
pr.set_device(device)
|
|
pr.enable_superblocks(True)
|
|
pr.set_superblocks_flags(blkid.SUBLKS_TYPE | blkid.SUBLKS_USAGE | blkid.SUBLKS_UUID | blkid.SUBLKS_UUIDRAW | blkid.SUBLKS_LABELRAW)
|
|
pr.do_safeprobe()
|
|
|
|
fstype = pr["TYPE"].decode('utf-8')
|
|
self.logger.debug(f"FS type is {fstype}")
|
|
|
|
return fstype
|
|
|
|
def is_filesystem(self, path):
|
|
"""
|
|
Check if the given path is a filesystem root.
|
|
|
|
Args:
|
|
path (str): The path to check.
|
|
|
|
Returns:
|
|
bool: True if the path is a filesystem root, False otherwise.
|
|
"""
|
|
|
|
# This is just an alias for better code readability
|
|
return self.is_mounted(mountpoint = path)
|
|
|
|
def create_filesystem(self, fs_type = None, fs_uuid = None, device = None):
|
|
"""
|
|
Create a filesystem on the specified device.
|
|
|
|
Parameters:
|
|
fs_type (str): The type of filesystem to create (e.g., 'ntfs', 'ext4', 'xfs', 'btrfs').
|
|
fs_uuid (str): The UUID to assign to the filesystem.
|
|
device (str): The device on which to create the filesystem (e.g., '/dev/sda1').
|
|
|
|
Raises:
|
|
RuntimeError: If the filesystem type is not recognized or if the filesystem creation command fails.
|
|
|
|
"""
|
|
|
|
self.logger.info(f"Creating filesystem {fs_type} with UUID {fs_uuid} in {device}")
|
|
|
|
if fs_type == "ntfs" or fs_type == "ntfs3":
|
|
self.logger.debug("Creating NTFS filesystem")
|
|
ntfs = NTFSLibrary(self.ntfs_implementation)
|
|
ntfs.create_filesystem(device, "NTFS")
|
|
ntfs.modify_uuid(device, fs_uuid)
|
|
|
|
else:
|
|
command = [f"/usr/sbin/mkfs.{fs_type}"]
|
|
command_args = []
|
|
|
|
if fs_type == "ext4" or fs_type == "ext3":
|
|
command_args = ["-U", fs_uuid, "-F", device]
|
|
elif fs_type == "xfs":
|
|
command_args = ["-m", f"uuid={fs_uuid}", "-f", device]
|
|
elif fs_type == "btrfs":
|
|
command_args = ["-U", fs_uuid, "-f", device]
|
|
else:
|
|
raise RuntimeError(f"Don't know how to create filesystem of type {fs_type}")
|
|
|
|
command = command + command_args
|
|
|
|
self.logger.debug(f"Creating Linux filesystem of type {fs_type} on {device}, command {command}")
|
|
result = subprocess.run(command, check = True, capture_output=True)
|
|
|
|
self.logger.debug(f"retorno: {result.returncode}")
|
|
self.logger.debug(f"stdout: {result.stdout}")
|
|
self.logger.debug(f"stderr: {result.stderr}")
|
|
|
|
|
|
|
|
def mklostandfound(self, path):
|
|
"""
|
|
Recreate the lost+found if necessary.
|
|
|
|
When cloning at the root of a filesystem, cleaning the contents
|
|
removes the lost+found directory. This is a special directory that requires the use of
|
|
a tool to recreate it.
|
|
|
|
It may fail if the filesystem does not need it. We consider this harmless and ignore it.
|
|
|
|
The command is entirely skipped on NTFS, as mklost+found may malfunction if run on it,
|
|
and has no useful purpose.
|
|
"""
|
|
if self.is_filesystem(path):
|
|
if self.filesystem_type(mountpoint=path) == "ntfs":
|
|
self.logger.debug("Not running mklost+found on NTFS")
|
|
return
|
|
|
|
|
|
curdir = os.getcwd()
|
|
result = None
|
|
|
|
try:
|
|
self.logger.debug(f"Re-creating lost+found in {path}")
|
|
os.chdir(path)
|
|
result = subprocess.run(["/usr/sbin/mklost+found"], check=True, capture_output=True)
|
|
except subprocess.SubprocessError as e:
|
|
self.logger.warning(f"Error running mklost+found: {e}")
|
|
|
|
if result:
|
|
self.logger.debug(f"retorno: {result.returncode}")
|
|
self.logger.debug(f"stdout: {result.stdout}")
|
|
self.logger.debug(f"stderr: {result.stderr}")
|
|
|
|
os.chdir(curdir)
|
|
|
|
def ntfsfix(self, device):
|
|
"""
|
|
Run the ntfsfix command on the specified device.
|
|
|
|
This method uses the ntfsfix utility to fix common NTFS problems on the given device.
|
|
|
|
This allows mounting an unclean NTFS filesystem.
|
|
|
|
Args:
|
|
device (str): The path to the device to be fixed.
|
|
|
|
Raises:
|
|
subprocess.CalledProcessError: If the ntfsfix command fails.
|
|
"""
|
|
self.logger.debug(f"Running ntfsfix on {device}")
|
|
subprocess.run(["/usr/bin/ntfsfix", "-d", device], check=True)
|
|
|
|
|
|
def unload_ntfs(self):
|
|
"""
|
|
Unloads the NTFS filesystem module.
|
|
|
|
This is a function added as a result of NTFS kernel module troubleshooting,
|
|
to try to ensure that NTFS code is only active as long as necessary.
|
|
|
|
The module is internally loaded as needed, so there's no load_ntfs function.
|
|
|
|
It may be removed in the future.
|
|
|
|
Raises:
|
|
RuntimeError: If the module cannot be removed.
|
|
"""
|
|
self._rmmod("ntfs3")
|
|
|
|
def find_boot_device(self):
|
|
"""
|
|
Searches for the EFI boot partition on the system.
|
|
|
|
This method scans the system's partitions to locate the EFI boot partition,
|
|
which is identified by the GUID "C12A7328-F81F-11D2-BA4B-00A0C93EC93B".
|
|
|
|
Returns:
|
|
str: The device node of the EFI partition if found, otherwise None.
|
|
|
|
Logs:
|
|
- Debug messages indicating the progress of the search.
|
|
- A warning message if the EFI partition is not found.
|
|
"""
|
|
disks = []
|
|
|
|
self.logger.debug("Looking for EFI partition")
|
|
with open("/proc/partitions", "r", encoding='utf-8') as partitions_file:
|
|
line_num=0
|
|
for line in partitions_file:
|
|
if line_num >=2:
|
|
data = line.split()
|
|
disk = data[3]
|
|
disks.append(disk)
|
|
self.logger.debug(f"Disk: {disk}")
|
|
|
|
line_num = line_num + 1
|
|
|
|
for disk in disks:
|
|
self.logger.debug("Loading partitions for disk %s", disk)
|
|
#disk_json_data = subprocess.run(["/usr/sbin/sfdisk", "-J", f"/dev/{disk}"], check=False, capture_output=True)
|
|
sfdisk_out = subprocess.run(["/usr/sbin/sfdisk", "-J", f"/dev/{disk}"], check=False, capture_output=True)
|
|
|
|
if sfdisk_out.returncode == 0:
|
|
disk_json_data = sfdisk_out.stdout
|
|
disk_data = json.loads(disk_json_data)
|
|
|
|
for part in disk_data["partitiontable"]["partitions"]:
|
|
self.logger.debug("Checking partition %s", part)
|
|
if part["type"] == "C12A7328-F81F-11D2-BA4B-00A0C93EC93B":
|
|
self.logger.debug("EFI partition found at %s", part["node"])
|
|
return part["node"]
|
|
else:
|
|
self.logger.debug("sfdisk returned with code %i, error %s", sfdisk_out.returncode, sfdisk_out.stderr)
|
|
|
|
|
|
self.logger.warning("Failed to find EFI partition!")
|
|
|
|
def temp_unmount(self, mountpoint):
|
|
"""
|
|
Temporarily unmounts the filesystem at the given mountpoint.
|
|
|
|
This method finds the device associated with the specified mountpoint,
|
|
and returns the information to remount it with temp_remount.
|
|
|
|
The purpose of this function is to temporarily unmount a filesystem for
|
|
actions like fsck, and to mount it back afterwards.
|
|
|
|
Args:
|
|
mountpoint (str): The mountpoint of the filesystem to unmount.
|
|
|
|
Returns:
|
|
dict: A dictionary containing the information needed to remount the filesystem.
|
|
"""
|
|
device = self.find_device(mountpoint)
|
|
fs = self.filesystem_type(mountpoint = mountpoint)
|
|
|
|
data = {"mountpoint" : mountpoint, "device" :device, "filesystem" : fs}
|
|
|
|
self.logger.debug("Temporarily unmounting device %s, mounted on %s, fs type %s", mountpoint, device, fs)
|
|
|
|
self.unmount(mountpoint = mountpoint)
|
|
return data
|
|
|
|
def temp_remount(self, unmount_data):
|
|
"""
|
|
Remounts a filesystem unmounted with temp_unmount
|
|
|
|
This method remounts a filesystem using the data provided by temp_unmount
|
|
|
|
Args:
|
|
unmount_data (dict): A dictionary containing the data needed to remount the filesystem.
|
|
|
|
Returns:
|
|
None
|
|
"""
|
|
|
|
self.logger.debug("Remounting temporarily unmounted device %s on %s, fs type %s", unmount_data["device"], unmount_data["mountpoint"], unmount_data["filesystem"])
|
|
self.mount(device = unmount_data["device"], mountpoint=unmount_data["mountpoint"], filesystem=unmount_data["filesystem"])
|