Store and restore GPT partition UUIDs

GPT disks and individual partitions have UUIDs that can be used by
the Windows boot process. Store them, so they can be restored.
windows-boot-fixes
Vadim vtroshchinskiy 2024-12-12 09:52:25 +01:00
parent a7f5a87539
commit 274f2b956e
2 changed files with 184 additions and 5 deletions

115
gitlib/disk.py 100644
View File

@ -0,0 +1,115 @@
import logging
import subprocess
import re
# pylint: disable=locally-disabled, line-too-long, logging-fstring-interpolation, too-many-lines
class DiskLibrary:
def __init__(self):
self.logger = logging.getLogger("OpengnsysDiskLibrary")
self.logger.setLevel(logging.DEBUG)
def split_device_partition(self, device):
"""
Parses a device file like /dev/sda3 into the root device (/dev/sda) and partition number (3)
Args:
device (str): Device in /dev
Returns:
[base_device, partno]
"""
r = re.compile("^(.*?)(\\d+)$")
m = r.match(device)
disk = m.group(1)
partno = int(m.group(2))
self.logger.debug(f"{device} parsed into disk device {disk}, partition {partno}")
return (disk, partno)
def get_disk_json_data(self, device):
"""
Returns the partition JSON data dump for the entire disk, even if a partition is passed.
This is specifically in the format used by sfdisk.
Args:
device (str): Block device, eg, /dev/sda3
Returns:
str: JSON dump produced by sfdisk
"""
(disk, partno) = self.split_device_partition(device)
result = subprocess.run(["/usr/sbin/sfdisk", "--json", disk], check=True, capture_output=True, encoding='utf-8')
return result.stdout.strip()
def get_disk_uuid(self, device):
"""
Returns the UUID of the disk itself, if there's a GPT partition table.
Args:
device (str): Block device, eg, /dev/sda3
Returns:
str: UUID
"""
(disk, partno) = self.split_device_partition(device)
result = subprocess.run(["/usr/sbin/sfdisk", "--disk-id", disk], check=True, capture_output=True, encoding='utf-8')
return result.stdout.strip()
def set_disk_uuid(self, device, uuid):
(disk, partno) = self.split_device_partition(device)
subprocess.run(["/usr/sbin/sfdisk", "--disk-id", disk, uuid], check=True, encoding='utf-8')
def get_partition_uuid(self, device):
"""
Returns the UUID of the partition, if there's a GPT partition table.
Args:
device (str): Block device, eg, /dev/sda3
Returns:
str: UUID
"""
(disk, partno) = self.split_device_partition(device)
result = subprocess.run(["/usr/sbin/sfdisk", "--part-uuid", disk, str(partno)], check=True, capture_output=True, encoding='utf-8')
return result.stdout.strip()
def set_partition_uuid(self, device, uuid):
(disk, partno) = self.split_device_partition(device)
subprocess.run(["/usr/sbin/sfdisk", "--part-uuid", disk, str(partno), uuid], check=True, encoding='utf-8')
def get_partition_type(self, device):
"""
Returns the type UUID of the partition, if there's a GPT partition table.
Args:
device (str): Block device, eg, /dev/sda3
Returns:
str: UUID
"""
(disk, partno) = self.split_device_partition(device)
result = subprocess.run(["/usr/sbin/sfdisk", "--part-type", disk, str(partno)], check=True, capture_output=True, encoding='utf-8')
return result.stdout.strip()
def set_partition_type(self, device, uuid):
(disk, partno) = self.split_device_partition(device)
subprocess.run(["/usr/sbin/sfdisk", "--part-type", disk, str(partno), uuid], check=True, encoding='utf-8')

View File

@ -39,7 +39,10 @@ import xattr
import posix1e
import blkid
from filesystem import *
from disk import *
from ntfs import *
import re
import uuid
class OgProgressPrinter(git.RemoteProgress):
"""
@ -146,6 +149,7 @@ class OpengnsysGitLibrary:
self.logger.debug(f"Initializing. Cache = {require_cache}, ntfs = {ntfs_implementation}")
self.fs = FilesystemLibrary(ntfs_implementation = ntfs_implementation)
self.disk = DiskLibrary()
#self.ntfs = NTFSLibrary()
@ -546,7 +550,6 @@ class OpengnsysGitLibrary:
def _ogGetOsType(self):
return "Linux"
def _get_repo_metadata(self, repo):
"""Obtiene metadatos de un repositorio remoto sin clonar el repo entero.
@ -655,6 +658,10 @@ class OpengnsysGitLibrary:
xattrs_file = open(os.path.join(meta_dir, "xattrs.jsonl.new"), "w", encoding='utf-8')
renamed_file = open(os.path.join(meta_dir, "renamed.jsonl.new"), "w", encoding='utf-8')
filesystems_file = open(os.path.join(meta_dir, "filesystems.json.new"), "w", encoding='utf-8')
partitions_file = open(os.path.join(meta_dir, "partitions.json.new"), "w", encoding='utf-8')
partitions_file.write(self.disk.get_disk_json_data(self.fs.find_device(mountpoint=path)))
ntfs = False
@ -713,6 +720,11 @@ class OpengnsysGitLibrary:
filesystems_data[root_path_rel]['sector_size'] = pr.sector_size
filesystems_data[root_path_rel]['uuid'] = str(pr["UUID"], 'utf-8')
filesystems_data[root_path_rel]['uuid_raw'] = str(base64.b64encode(pr["UUID_RAW"]), 'utf-8')
filesystems_data[root_path_rel]["part_uuid"] = self.disk.get_partition_uuid(mount['device'])
filesystems_data[root_path_rel]["part_type"] = self.disk.get_partition_type(mount['device'])
filesystems_data[root_path_rel]["disk_uuid"] = self.disk.get_disk_uuid(mount['device'])
# TODO: Esto de momento no funciona -- LABEL no se encuentra
#filesystems_data[root_path_rel]['label'] = pr["LABEL"]
@ -898,6 +910,7 @@ class OpengnsysGitLibrary:
renamed_file.close()
filesystems_file.close()
metadata_file.close()
partitions_file.close()
os.rename(os.path.join(meta_dir, "empty_directories.jsonl.new"), os.path.join(meta_dir, "empty_directories.jsonl"))
os.rename(os.path.join(meta_dir, "special_files.jsonl.new"), os.path.join(meta_dir, "special_files.jsonl"))
@ -907,6 +920,8 @@ class OpengnsysGitLibrary:
os.rename(os.path.join(meta_dir, "renamed.jsonl.new"), os.path.join(meta_dir, "renamed.jsonl"))
os.rename(os.path.join(meta_dir, "filesystems.json.new"), os.path.join(meta_dir, "filesystems.json"))
os.rename(os.path.join(meta_dir, "metadata.json.new"), os.path.join(meta_dir, "metadata.json"))
os.rename(os.path.join(meta_dir, "partitions.json.new"), os.path.join(meta_dir, "partitions.json"))
self.logger.debug("Processing pending NTFS secaudits...")
for audit in ntfs_secaudit_list:
@ -916,7 +931,7 @@ class OpengnsysGitLibrary:
self.logger.debug("Metadata updated")
return return_data
def _restore_metadata(self, path, destructive_only=False):
def _restore_metadata(self, path, destructive_only=False, set_device_uuids=False):
"""Restore the metadata created by _create_metadata
Args:
@ -942,6 +957,50 @@ class OpengnsysGitLibrary:
self.logger.error(f"Metadata directory not found: {meta_dir}")
return
if set_device_uuids:
# Windows boot manager uses partition UUIDs in at least some cases. One option to make booting work
# is to store all such UUIDs and restore them on the destination machine.
self.logger.info("Processing filesystems.json")
with open(os.path.join(meta_dir, "filesystems.json"), "r", encoding='utf-8') as filesystems_file:
filesystems = json.loads(filesystems_file.read())
disk_device = self.fs.find_device(path)
if "disk_uuid" in filesystems["/"]:
self.logger.info("Restoring device and partition UUIDs on %s", disk_device)
prev_uuid = self.disk.get_disk_uuid(disk_device)
new_uuid = filesystems["/"]["disk_uuid"]
if new_uuid != prev_uuid:
self.logger.info("Setting disk UUID to %s (was %s)", new_uuid, prev_uuid)
self.disk.set_disk_uuid(disk_device, new_uuid)
else:
self.logger.info("Not setting disk UUID, already was correct")
prev_uuid = self.disk.get_partition_uuid(disk_device)
new_uuid = filesystems["/"]["part_uuid"]
if new_uuid != prev_uuid:
self.logger.info("Setting partition UUID to %s (was %s)", new_uuid, prev_uuid)
self.disk.set_partition_uuid(disk_device, new_uuid)
else:
self.logger.info("Not setting partition UUID, already was correct")
prev_uuid = self.disk.get_partition_type(disk_device)
new_uuid = filesystems["/"]["part_type"]
if new_uuid != prev_uuid:
self.logger.info("Setting partition type to %s (was %s)", new_uuid, prev_uuid)
self.disk.set_partition_type(disk_device, new_uuid)
else:
self.logger.info("Not setting partition type, already was correct")
self.logger.info("Done setting disk UUIDs")
else:
self.logger.warning("Partition UUID data not present in metadata, skipping")
# Process renames first so that all the filenames are as they should be
# for the following steps.
self.logger.info("Processing renamed.jsonl")
@ -1112,6 +1171,9 @@ class OpengnsysGitLibrary:
self.logger.warning(f"Failed to create special file {full_path}: Error {oserr.errno}: {oserr.strerror}")
self.logger.info("Metadata restoration completed.")
def _configure_repo(self, repo):
@ -1243,7 +1305,7 @@ class OpengnsysGitLibrary:
repo.index.commit("Initial commit")
# Restaurar cosas modificadas para git
self._restore_metadata(path, destructive_only=True)
self._restore_metadata(path, destructive_only=True, set_device_uuids=False)
#self.logger.debug("Commit done, will unmount now")
@ -1340,7 +1402,7 @@ class OpengnsysGitLibrary:
self._grub_install(root_directory=destination_dir, boot_device=boot_device)
self.fs.mklostandfound(destination_dir)
self._restore_metadata(destination_dir)
self._restore_metadata(destination_dir, set_device_uuids=True)
if self.fs.filesystem_type(mountpoint = destination_dir) == "ntfs":
self._ntfs_restore_secaudit(destination_dir)
@ -1495,6 +1557,8 @@ if __name__ == '__main__':
parser.add_argument("-m", "--message", type=str, metavar="MSG", help="Commit message")
parser.add_argument("--test-set-ntfsid", type=str, metavar="ID", help="Set NTFS ID")
parser.add_argument("--test-restore-secaudit",type=str, metavar="DIR", help="Test restoring NTFS secaudit")
parser.add_argument("--test-get-part-uuid", type=str, metavar="PART", help="Get partition UUID")
parser.add_argument("--device", type=str, metavar="DEV", help="Device to set the UUID on")
parser.add_argument("-v", "--verbose", action="store_true", help = "Verbose console output")
@ -1553,7 +1617,7 @@ if __name__ == '__main__':
elif args.test_create_metadata:
og_git._create_metadata(args.test_create_metadata, initial_creation=False) # pylint: disable=protected-access
elif args.test_restore_metadata:
og_git._restore_metadata(args.test_restore_metadata) # pylint: disable=protected-access
og_git._restore_metadata(args.test_restore_metadata, set_device_uuids=True) # pylint: disable=protected-access
elif args.test_restore_metadata_destructive:
og_git._restore_metadata(path = args.test_restore_metadata_destructive, destructive_only=True) # pylint: disable=protected-access
elif args.test_clone_metadata: