From b1a2a2eb0768ef512746900ebb4d18d2304e21f5 Mon Sep 17 00:00:00 2001 From: Vadim Troshchinskiy Date: Thu, 17 Jul 2025 10:33:16 +0200 Subject: [PATCH] refs #2466 Remove duplicated libraries --- ogclient/lib/python3/bcd.py | 345 ----- ogclient/lib/python3/disk.py | 124 -- ogclient/lib/python3/gitlib-tests.py | 52 - ogclient/lib/python3/gitlib.py | 1731 -------------------------- ogclient/lib/python3/kernel.py | 22 - ogclient/lib/python3/ntfs.py | 111 -- ogclient/lib/python3/test.py | 13 - 7 files changed, 2398 deletions(-) delete mode 100755 ogclient/lib/python3/bcd.py delete mode 100644 ogclient/lib/python3/disk.py delete mode 100755 ogclient/lib/python3/gitlib-tests.py delete mode 100755 ogclient/lib/python3/gitlib.py delete mode 100644 ogclient/lib/python3/kernel.py delete mode 100644 ogclient/lib/python3/ntfs.py delete mode 100644 ogclient/lib/python3/test.py diff --git a/ogclient/lib/python3/bcd.py b/ogclient/lib/python3/bcd.py deleted file mode 100755 index 40d1280..0000000 --- a/ogclient/lib/python3/bcd.py +++ /dev/null @@ -1,345 +0,0 @@ -#!/usr/bin/env python3 -import hivex -import argparse -import struct - -from hivex import Hivex -from hivex.hive_types import * - - -# Docs: -# -# https://www.geoffchappell.com/notes/windows/boot/bcd/objects.htm -# https://learn.microsoft.com/en-us/previous-versions/windows/desktop/bcd/bcdbootmgrelementtypes - -#print(f"Root: {root}") - - -BCD_Enumerations = { - "BcdLibraryDevice_ApplicationDevice" : 0x11000001, - "BcdLibraryString_ApplicationPath" : 0x12000002, - "BcdLibraryString_Description" : 0x12000004, - "BcdLibraryString_PreferredLocale" : 0x12000005, - "BcdLibraryObjectList_InheritedObjects" : 0x14000006, - "BcdLibraryInteger_TruncatePhysicalMemory" : 0x15000007, - "BcdLibraryObjectList_RecoverySequence" : 0x14000008, - "BcdLibraryBoolean_AutoRecoveryEnabled" : 0x16000009, - "BcdLibraryIntegerList_BadMemoryList" : 0x1700000a, - "BcdLibraryBoolean_AllowBadMemoryAccess" : 0x1600000b, - "BcdLibraryInteger_FirstMegabytePolicy" : 0x1500000c, - "BcdLibraryInteger_RelocatePhysicalMemory" : 0x1500000D, - "BcdLibraryInteger_AvoidLowPhysicalMemory" : 0x1500000E, - "BcdLibraryBoolean_DebuggerEnabled" : 0x16000010, - "BcdLibraryInteger_DebuggerType" : 0x15000011, - "BcdLibraryInteger_SerialDebuggerPortAddress" : 0x15000012, - "BcdLibraryInteger_SerialDebuggerPort" : 0x15000013, - "BcdLibraryInteger_SerialDebuggerBaudRate" : 0x15000014, - "BcdLibraryInteger_1394DebuggerChannel" : 0x15000015, - "BcdLibraryString_UsbDebuggerTargetName" : 0x12000016, - "BcdLibraryBoolean_DebuggerIgnoreUsermodeExceptions" : 0x16000017, - "BcdLibraryInteger_DebuggerStartPolicy" : 0x15000018, - "BcdLibraryString_DebuggerBusParameters" : 0x12000019, - "BcdLibraryInteger_DebuggerNetHostIP" : 0x1500001A, - "BcdLibraryInteger_DebuggerNetPort" : 0x1500001B, - "BcdLibraryBoolean_DebuggerNetDhcp" : 0x1600001C, - "BcdLibraryString_DebuggerNetKey" : 0x1200001D, - "BcdLibraryBoolean_EmsEnabled" : 0x16000020, - "BcdLibraryInteger_EmsPort" : 0x15000022, - "BcdLibraryInteger_EmsBaudRate" : 0x15000023, - "BcdLibraryString_LoadOptionsString" : 0x12000030, - "BcdLibraryBoolean_DisplayAdvancedOptions" : 0x16000040, - "BcdLibraryBoolean_DisplayOptionsEdit" : 0x16000041, - "BcdLibraryDevice_BsdLogDevice" : 0x11000043, - "BcdLibraryString_BsdLogPath" : 0x12000044, - "BcdLibraryBoolean_GraphicsModeDisabled" : 0x16000046, - "BcdLibraryInteger_ConfigAccessPolicy" : 0x15000047, - "BcdLibraryBoolean_DisableIntegrityChecks" : 0x16000048, - "BcdLibraryBoolean_AllowPrereleaseSignatures" : 0x16000049, - "BcdLibraryString_FontPath" : 0x1200004A, - "BcdLibraryInteger_SiPolicy" : 0x1500004B, - "BcdLibraryInteger_FveBandId" : 0x1500004C, - "BcdLibraryBoolean_ConsoleExtendedInput" : 0x16000050, - "BcdLibraryInteger_GraphicsResolution" : 0x15000052, - "BcdLibraryBoolean_RestartOnFailure" : 0x16000053, - "BcdLibraryBoolean_GraphicsForceHighestMode" : 0x16000054, - "BcdLibraryBoolean_IsolatedExecutionContext" : 0x16000060, - "BcdLibraryBoolean_BootUxDisable" : 0x1600006C, - "BcdLibraryBoolean_BootShutdownDisabled" : 0x16000074, - "BcdLibraryIntegerList_AllowedInMemorySettings" : 0x17000077, - "BcdLibraryBoolean_ForceFipsCrypto" : 0x16000079, - - - "BcdBootMgrObjectList_DisplayOrder" : 0x24000001, - "BcdBootMgrObjectList_BootSequence" : 0x24000002, - "BcdBootMgrObject_DefaultObject" : 0x23000003, - "BcdBootMgrInteger_Timeout" : 0x25000004, - "BcdBootMgrBoolean_AttemptResume" : 0x26000005, - "BcdBootMgrObject_ResumeObject" : 0x23000006, - "BcdBootMgrObjectList_ToolsDisplayOrder" : 0x24000010, - "BcdBootMgrBoolean_DisplayBootMenu" : 0x26000020, - "BcdBootMgrBoolean_NoErrorDisplay" : 0x26000021, - "BcdBootMgrDevice_BcdDevice" : 0x21000022, - "BcdBootMgrString_BcdFilePath" : 0x22000023, - "BcdBootMgrBoolean_ProcessCustomActionsFirst" : 0x26000028, - "BcdBootMgrIntegerList_CustomActionsList" : 0x27000030, - "BcdBootMgrBoolean_PersistBootSequence" : 0x26000031, - - "BcdDeviceInteger_RamdiskImageOffset" : 0x35000001, - "BcdDeviceInteger_TftpClientPort" : 0x35000002, - "BcdDeviceInteger_SdiDevice" : 0x31000003, - "BcdDeviceInteger_SdiPath" : 0x32000004, - "BcdDeviceInteger_RamdiskImageLength" : 0x35000005, - "BcdDeviceBoolean_RamdiskExportAsCd" : 0x36000006, - "BcdDeviceInteger_RamdiskTftpBlockSize" : 0x36000007, - "BcdDeviceInteger_RamdiskTftpWindowSize" : 0x36000008, - "BcdDeviceBoolean_RamdiskMulticastEnabled" : 0x36000009, - "BcdDeviceBoolean_RamdiskMulticastTftpFallback" : 0x3600000A, - "BcdDeviceBoolean_RamdiskTftpVarWindow" : 0x3600000B, - - "BcdMemDiagInteger_PassCount" : 0x25000001, - "BcdMemDiagInteger_FailureCount" : 0x25000003, - - "Reserved1" : 0x21000001, - "Reserved2" : 0x22000002, - "BcdResumeBoolean_UseCustomSettings" : 0x26000003, - "BcdResumeDevice_AssociatedOsDevice" : 0x21000005, - "BcdResumeBoolean_DebugOptionEnabled" : 0x26000006, - "BcdResumeInteger_BootMenuPolicy" : 0x25000008, - - "BcdOSLoaderDevice_OSDevice" : 0x21000001, - "BcdOSLoaderString_SystemRoot" : 0x22000002, - "BcdOSLoaderObject_AssociatedResumeObject" : 0x23000003, - "BcdOSLoaderBoolean_DetectKernelAndHal" : 0x26000010, - "BcdOSLoaderString_KernelPath" : 0x22000011, - "BcdOSLoaderString_HalPath" : 0x22000012, - "BcdOSLoaderString_DbgTransportPath" : 0x22000013, - "BcdOSLoaderInteger_NxPolicy" : 0x25000020, - "BcdOSLoaderInteger_PAEPolicy" : 0x25000021, - "BcdOSLoaderBoolean_WinPEMode" : 0x26000022, - "BcdOSLoaderBoolean_DisableCrashAutoReboot" : 0x26000024, - "BcdOSLoaderBoolean_UseLastGoodSettings" : 0x26000025, - "BcdOSLoaderBoolean_AllowPrereleaseSignatures" : 0x26000027, - "BcdOSLoaderBoolean_NoLowMemory" : 0x26000030, - "BcdOSLoaderInteger_RemoveMemory" : 0x25000031, - "BcdOSLoaderInteger_IncreaseUserVa" : 0x25000032, - "BcdOSLoaderBoolean_UseVgaDriver" : 0x26000040, - "BcdOSLoaderBoolean_DisableBootDisplay" : 0x26000041, - "BcdOSLoaderBoolean_DisableVesaBios" : 0x26000042, - "BcdOSLoaderBoolean_DisableVgaMode" : 0x26000043, - "BcdOSLoaderInteger_ClusterModeAddressing" : 0x25000050, - "BcdOSLoaderBoolean_UsePhysicalDestination" : 0x26000051, - "BcdOSLoaderInteger_RestrictApicCluster" : 0x25000052, - "BcdOSLoaderBoolean_UseLegacyApicMode" : 0x26000054, - "BcdOSLoaderInteger_X2ApicPolicy" : 0x25000055, - "BcdOSLoaderBoolean_UseBootProcessorOnly" : 0x26000060, - "BcdOSLoaderInteger_NumberOfProcessors" : 0x25000061, - "BcdOSLoaderBoolean_ForceMaximumProcessors" : 0x26000062, - "BcdOSLoaderBoolean_ProcessorConfigurationFlags" : 0x25000063, - "BcdOSLoaderBoolean_MaximizeGroupsCreated" : 0x26000064, - "BcdOSLoaderBoolean_ForceGroupAwareness" : 0x26000065, - "BcdOSLoaderInteger_GroupSize" : 0x25000066, - "BcdOSLoaderInteger_UseFirmwarePciSettings" : 0x26000070, - "BcdOSLoaderInteger_MsiPolicy" : 0x25000071, - "BcdOSLoaderInteger_SafeBoot" : 0x25000080, - "BcdOSLoaderBoolean_SafeBootAlternateShell" : 0x26000081, - "BcdOSLoaderBoolean_BootLogInitialization" : 0x26000090, - "BcdOSLoaderBoolean_VerboseObjectLoadMode" : 0x26000091, - "BcdOSLoaderBoolean_KernelDebuggerEnabled" : 0x260000a0, - "BcdOSLoaderBoolean_DebuggerHalBreakpoint" : 0x260000a1, - "BcdOSLoaderBoolean_UsePlatformClock" : 0x260000A2, - "BcdOSLoaderBoolean_ForceLegacyPlatform" : 0x260000A3, - "BcdOSLoaderInteger_TscSyncPolicy" : 0x250000A6, - "BcdOSLoaderBoolean_EmsEnabled" : 0x260000b0, - "BcdOSLoaderInteger_DriverLoadFailurePolicy" : 0x250000c1, - "BcdOSLoaderInteger_BootMenuPolicy" : 0x250000C2, - "BcdOSLoaderBoolean_AdvancedOptionsOneTime" : 0x260000C3, - "BcdOSLoaderInteger_BootStatusPolicy" : 0x250000E0, - "BcdOSLoaderBoolean_DisableElamDrivers" : 0x260000E1, - "BcdOSLoaderInteger_HypervisorLaunchType" : 0x250000F0, - "BcdOSLoaderBoolean_HypervisorDebuggerEnabled" : 0x260000F2, - "BcdOSLoaderInteger_HypervisorDebuggerType" : 0x250000F3, - "BcdOSLoaderInteger_HypervisorDebuggerPortNumber" : 0x250000F4, - "BcdOSLoaderInteger_HypervisorDebuggerBaudrate" : 0x250000F5, - "BcdOSLoaderInteger_HypervisorDebugger1394Channel" : 0x250000F6, - "BcdOSLoaderInteger_BootUxPolicy" : 0x250000F7, - "BcdOSLoaderString_HypervisorDebuggerBusParams" : 0x220000F9, - "BcdOSLoaderInteger_HypervisorNumProc" : 0x250000FA, - "BcdOSLoaderInteger_HypervisorRootProcPerNode" : 0x250000FB, - "BcdOSLoaderBoolean_HypervisorUseLargeVTlb" : 0x260000FC, - "BcdOSLoaderInteger_HypervisorDebuggerNetHostIp" : 0x250000FD, - "BcdOSLoaderInteger_HypervisorDebuggerNetHostPort" : 0x250000FE, - "BcdOSLoaderInteger_TpmBootEntropyPolicy" : 0x25000100, - "BcdOSLoaderString_HypervisorDebuggerNetKey" : 0x22000110, - "BcdOSLoaderBoolean_HypervisorDebuggerNetDhcp" : 0x26000114, - "BcdOSLoaderInteger_HypervisorIommuPolicy" : 0x25000115, - "BcdOSLoaderInteger_XSaveDisable" : 0x2500012b -} - - -def format_value(bcd, bcd_value): - - name = bcd.value_key(bcd_value) - (type, length) = bcd.value_type(bcd_value) - - typename = "" - str_value = "" - if type == REG_SZ: - typename = "SZ" - str_value = bcd.value_string(bcd_value) - elif type == REG_DWORD: - typename = "DWORD" - dval = bcd.value_dword(bcd_value) - - str_value = hex(dval) + " (" + str(bcd.value_dword(bcd_value)) + ")" - elif type == REG_BINARY: - typename = "BIN" - (length, value) = bcd.value_value(bcd_value) - str_value = value.hex() - elif type == REG_DWORD_BIG_ENDIAN: - typename = "DWORD_BE" - elif type == REG_EXPAND_SZ: - typename = "EXPAND SZ" - elif type == REG_FULL_RESOURCE_DESCRIPTOR: - typename = "RES DESC" - elif type == REG_LINK: - typename = "LINK" - elif type == REG_MULTI_SZ: - typename = "MULTISZ" - (length, str_value) = bcd.value_value(bcd_value) - str_value = str_value.decode('utf-16le') - str_value = str_value.replace("\0", ";") - #value = ";".join("\0".split(value)) - elif type == REG_NONE: - typename = "NONE" - elif type == REG_QWORD: - typename = "QWORD" - elif type == REG_RESOURCE_LIST: - typename = "RES LIST" - elif type == REG_RESOURCE_REQUIREMENTS_LIST: - typename = "REQ LIST" - else: - typename = str(type) - str_value = "???" - - - return (typename, length, str_value) - -def dump_all(root, depth = 0): - - padding = "\t" * depth - - children = bcd.node_children(root) - - if len(children) > 0: - - for child in children: - name = bcd.node_name(child) - print(f"{padding}{name}") - - dump_all(child, depth + 1) - # print(f"Child: {child}") - - #print(f"Values: {num_vals}") - return - - - - - - values = bcd.node_values(root) - #print(f"Value list: {values}") - - for v in values: - (type_name, length, str_value) = format_value(bcd, v) - name = bcd.value_key(v) - - print(f"{padding}{name: <16}: [{type_name: <10}]; ({length: < 4}) {str_value}") - - -class WindowsBCD: - def __init__(self, filename): - self.filename = filename - self.bcd = Hivex(filename) - - def dump(self, root=None, depth = 0): - padding = "\t" * depth - - if root is None: - root = self.bcd.root() - - children = self.bcd.node_children(root) - - if len(children) > 0: - for child in children: - name = self.bcd.node_name(child) - print(f"{padding}{name}") - - self.dump(child, depth + 1) - return - - values = self.bcd.node_values(root) - - for v in values: - (type_name, length, str_value) = format_value(self.bcd, v) - name = self.bcd.value_key(v) - - print(f"{padding}{name: <16}: [{type_name: <10}]; ({length: < 4}) {str_value}") - - def list(self): - root = self.bcd.root() - objects = self.bcd.node_get_child(root, "Objects") - - for child in self.bcd.node_children(objects): - entry_id = self.bcd.node_name(child) - - elements = self.bcd.node_get_child(child, "Elements") - description_entry = self.bcd.node_get_child(elements, "12000004") - - if description_entry: - values = self.bcd.node_values(description_entry) - if values: - (type_name, length, str_value) = format_value(self.bcd, values[0]) - print(f"{entry_id}: {str_value}") - else: - print(f"{entry_id}: [no description value!?]") - - - appdevice_entry = self.bcd.node_get_child(elements, "11000001") - - if appdevice_entry: - values = self.bcd.node_values(appdevice_entry) - (length, data) = self.bcd.value_value(values[0]) - hex = data.hex() - print(f"LEN: {length}, HEX: {hex}, RAW: {data}") - if len(data) > 10: - etype = struct.unpack_from(' {orig_file_path}") - os.rename(renamed_file_path, orig_file_path) - else: - if os.path.exists(orig_file_path): - self.logger.warning(f"Can't rename {renamed_file_path} => {orig_file_path}: Already renamed") - else: - self.logger.warning(f"Can't rename {renamed_file_path} => {orig_file_path}: Source file not found") - - if not destructive_only: - self.logger.info("Processing empty_directories.jsonl") - with open(os.path.join(meta_dir, "empty_directories.jsonl"), "r", encoding='utf-8') as empties_file: - for line in empties_file: - if line.isspace(): - self.logger.debug("Empty line, skipping") - continue - - empties_data = json.loads(line) - empty_dir = empties_data['dir'] - - # os.path.join no acepta /foo como una ruta relativa para concatenar - if empty_dir.startswith("/"): - empty_dir = empty_dir[1:] - - empty_dir_keep = os.path.join(path, empty_dir, ".opengnsys-keep") - - self.logger.debug(f"Empty directory: {empty_dir}") - full_empty_dir = os.path.join(path, empty_dir) - Path(full_empty_dir).mkdir(parents=True, exist_ok=True) - - if os.path.exists(empty_dir_keep): - self.logger.debug(f"Deleting: {empty_dir_keep}") - os.unlink(empty_dir_keep) - - if not destructive_only: - self.logger.info("Processing unix_permissions.jsonl") - with open(os.path.join(meta_dir, "unix_permissions.jsonl"), "r", encoding='utf-8') as acls_file: - for line in acls_file: - if line.isspace(): - self.logger.debug("Empty line, skipping") - continue - - perms_data = json.loads(line) - #self.logger.debug(f"Data: {acls_data}") - - perms_path = perms_data['path'] - file_perms = perms_data['mode'] - file_uid = perms_data['uid'] - file_gid = perms_data['gid'] - - if perms_path.startswith("/"): - perms_path = perms_path[1:] - - perms_file_path = os.path.join(path, perms_path) - - if os.path.exists(perms_file_path): - self.logger.debug(f"Applying permissions {file_perms}, owner {file_uid}, group {file_gid} to {perms_file_path}") - # chown clears suid bits, must come first - os.chown(perms_file_path, file_uid, file_gid) - os.chmod(perms_file_path, file_perms) - else: - self.logger.warning(f"Can't apply permissions to {perms_file_path}, file doesn't exist.") - - - if not destructive_only: - self.logger.info("Processing acls.jsonl") - with open(os.path.join(meta_dir, "acls.jsonl"), "r", encoding='utf-8') as acls_file: - for line in acls_file: - if line.isspace(): - self.logger.debug("Empty line, skipping") - continue - - # docs: https://pylibacl.k1024.org/module.html#posix1e.ACL.to_any_text - - acls_data = json.loads(line) - #self.logger.debug(f"Data: {acls_data}") - - acl_file = acls_data['file'] - acl_text = base64.b64decode(bytes(acls_data['acl'], 'utf-8')) - - if acl_file.startswith("/"): - acl_file = acl_file[1:] - - acl_file_path = os.path.join(path, acl_file) - #self.logger.debug(f"TXT: {acl_text}" ) - acl = posix1e.ACL(data = acl_text) - #self.logger.debug(f"ACL: {acl_text}" ) - - self.logger.debug(f"Applying ACL to {acl_file_path}") - if os.path.exists(acl_file_path): - acl.applyto(acl_file_path) - - if not destructive_only: - self.logger.info("Processing xattrs.jsonl") - with open(os.path.join(meta_dir, "xattrs.jsonl"), "r", encoding='utf-8') as xattrs_file: - for line in xattrs_file: - if line.isspace(): - self.logger.debug("Empty line, skipping") - continue - - xattrs_data = json.loads(line) - xattrs_file = xattrs_data['file'] - - if xattrs_file.startswith("/"): - xattrs_file = xattrs_file[1:] - - xattrs_file_path = os.path.join(path, xattrs_file) - - #self.logger.debug(f"Line: {line}") - - self.logger.info("Processing special_files.jsonl") - with open(os.path.join(meta_dir, "special_files.jsonl"), "r", encoding='utf-8') as specials_file: - for line in specials_file: - if line.isspace(): - self.logger.debug("Empty line, skipping") - continue - - #self.logger.debug(f"Line: {line}") - data = json.loads(line) - filename = data['file'] - full_path = os.path.join(path, filename) - file_mode = data['mode'] - - try: - if stat.S_ISSOCK(file_mode): - self.logger.debug(f"Restoring socket {filename}") - os.mknod(full_path, mode = file_mode) - elif stat.S_ISFIFO(file_mode): - self.logger.debug(f"Restoring FIFO {filename}") - os.mknod(full_path, mode = file_mode) - elif stat.S_ISBLK(file_mode): - self.logger.debug(f"Restoring block device {filename}") - os.mknod(full_path, mode = file_mode, device = data['rdev']) - elif stat.S_ISCHR(file_mode): - self.logger.debug(f"Restoring character device {filename}") - os.mknod(full_path, mode = file_mode, device = data['rdev']) - else: - self.logger.warning(f"Unknown file type for {filename}: {file_mode}") - - # chown clears suid bit, so even though it's redundant in most cases and already - # done above, set the full perms on the file again anyway. - os.chown(full_path, data['uid'], data['gid']) - os.chmod(full_path, file_mode) - except FileExistsError as exists: - self.logger.debug(f"Exists: {full_path}") - except OSError as oserr: - self.logger.warning(f"Failed to create special file {full_path}: Error {oserr.errno}: {oserr.strerror}") - - - - - - self.logger.info("Metadata restoration completed.") - - def _configure_repo(self, repo): - """ - #ogGitConfig - #@brief Configura usuario y permisos de git. - #@return - """ - - self.logger.debug(f"Configuring repository {repo}") - repo.config_writer().add_value("user", "name", "OpenGnsys").release() - repo.config_writer().add_value("user", "email", "OpenGnsys@opengnsys.com").release() - repo.config_writer().add_value("core", "filemode", "false").release() - repo.config_writer().add_value("push", "autoSetupRemote", "true").release() - repo.config_writer().add_value("maintenance", "autoDetach", "false").release() - - def initRepo(self, device, repo_name): - """ - Initialize a Git repository on a specified device. - - This method mounts the device, initializes a Git repository, configures it, - and sets up a remote origin. It handles both NTFS and other filesystem types. - - Args: - device (str): The device path to initialize the repository on. - repo_name (str): The name of the repository to be created. - - Raises: - RuntimeError: If the .git directory is of an unrecognized file type. - - Notes: - - The method mounts the device to /mnt/{device_basename}. - - The .git directory is created in a cache partition and symlinked to the device. - - The repository is initialized and configured, and an initial commit is made. - - The method sets up a remote origin and pushes the initial commit. - """ - - if not self.check_remote_exists(repo_name): - self.logger.error("Specified repository can't be used, aborting.") - return - - path = self.fs.ensure_mounted(device) - self.logger.info("Initializing repository: %s", path) - - git_dir = os.path.join(path, ".git") - real_git_dir = os.path.join(self.cache_dir, f"git-{repo_name}") - repo_url = self._getOgRepository(repo_name) - - if os.path.exists(real_git_dir): - self.logger.debug(f"Removing existing repository {real_git_dir}") - shutil.rmtree(real_git_dir) - - if os.path.exists(git_dir) or os.path.islink(git_dir): - if os.path.islink(git_dir) or os.path.isfile(git_dir): - self.logger.debug(f"Removing gitdir: {git_dir}") - os.unlink(git_dir) - elif os.path.isdir(git_dir): - # We want to host git in the cache partition, .git shouldn't be a directory under the - # filesystem. - self.logger.warning(f"Removing directory-type gitdir, this should be a link or a file: {git_dir}") - shutil.rmtree(git_dir) - else: - raise RuntimeError("Git dir is of an unrecognized file type!") - -# if not os.path.exists(git_dir): - #self.logger.debug("Creating " + git_dir) - #with open(git_dir, "w") as git_dir: - # git_dir.write(f"gitdir: {real_git_dir}\n") - - self.logger.debug(f"Initializing repo in cache at {real_git_dir}") - #os.mkdir(real_git_dir) - #with git.Repo.init(real_git_dir, bare=True) as temprepo: - # self._configure_repo(temprepo) - - os.symlink(real_git_dir, git_dir) - - - with git.Repo.init(path) as repo: - # On NTFS, we have to unmount the filesystem to do secaudit. - # Gitpython objects existing at that time may mean a dangling git process that prevents - # the required unmounting. - # - # So we make sure we destroy gitpython after this initial stage, to recreate it - # right after _create_metadata. - self._configure_repo(repo) - self._write_ignore_list(path) - - - # Adding the gitignore and doing the manual --force saves us an expensive fetch if - # the repo already had data in it, and allows us to use the gitpython functions with - # progress reports for doing the full push later. - origin = repo.create_remote("origin", repo_url) - repo.index.add(f"{path}/.gitignore") - repo.index.commit("Initial commit") - repo.git.push("--force") # Obliterate whatever might have been there - - self.logger.debug("Fetching origin") - origin.fetch(progress=OgProgressPrinter(self.logger)) - - repo.heads.master.set_tracking_branch(origin.refs.master) - - - metadata_ret = self._create_metadata(path, initial_creation=True) - - repo = git.Repo(path) - - self.logger.debug(f"Building list of files to add from path {path}") - - add_files = [] - - # Nota: repo.index.add(".") agrega archivos pero git después cree que - # no han sido agregados? - for ent in os.listdir(path): - if repo.ignored(ent) or ent == ".git" or ent == "." or ent == "..": - self.logger.debug(f"Ignoring: {ent}") - elif ent in self.fully_ignored_dirs: - # FIXME: repo.index.add tiene un bug que ignora Force=true - #repo.index.add("dev/.opengnsys-keep") - self.logger.debug("Fully ignored dir: {ent}") - add_files.append(f"{ent}/.opengnsys-keep") - else: - self.logger.debug(f"Adding: {ent}") - add_files.append(ent) - #repo.index.add(ent, force=False) - - - for lnk in metadata_ret['symlinks']: - self.logger.debug(f"Adding symlink: {lnk}") - add_files.append(lnk) - - add_files_new = [] - for file in add_files: - if os.path.exists(os.path.join(path, file)): - add_files_new = add_files_new + [file] - else: - self.logger.warning(f"We wanted to add {file} but it wasn't found. Please debug.") - - add_files = add_files_new - - self.logger.info("Adding %d files", len(add_files)) - with OperationTimer(self, "add all files"): - #subprocess.run(["git", "add"] + add_files, check=True, cwd=path) - repo.index.add(items = add_files, force=True ) - - # FIXME: This shouldn't actually happen, we shouldn't have any untracked files - if self.debug_check_for_untracked_files: - self.logger.info("Checking for untracked files...") - - with OperationTimer(self, "add untracked files"): - untracked_list = repo.untracked_files - if untracked_list: - self.logger.warning(f"Untracked files: {untracked_list}") - self.logger.warning("Adding %d untracked files", len(untracked_list)) - #repo.index.add(items = untracked_list, force=True) - subprocess.run(["git", "add"] + untracked_list, check=True, cwd=path) - - - self.logger.info("Committing") - repo.index.commit("Initial commit") - - # Restaurar cosas modificadas para git - self._restore_metadata(path, destructive_only=True, set_device_uuids=False) - - - #self.logger.debug("Commit done, will unmount now") - #self._umount_device(device) - - if self.fs.filesystem_type(mountpoint = path) == "ntfs": - self.fs.unload_ntfs() - - - - - - # repo.create_head - - # repo.heads.master.set_tracking_branch(origin.refs.master) - - self.logger.info("Uploading to ogrepository") - origin.push(progress=OgProgressPrinter(self.logger)) - - #repo.git.push("--set-upstream", "origin", repo.head.ref, "--force") - - def cloneRepo(self, repo_name, destination, boot_device): - """ - Clones a repository to a specified destination and sets up the bootloader. - - Args: - repo_name (str): The name of the repository to clone. - destination (str): The destination directory where the repository will be cloned. - boot_device (str): The boot device to install the bootloader. - - Raises: - RequirementException: If the repository metadata is incorrect or if the repository's - boot system is incompatible with the current system. - - Logs: - Info: Logs the start of the cloning process. - Debug: Logs the repository URL, EFI compatibility of the repository and the system. - """ - self.logger.info(f"Cloning repo: {repo_name} => {destination}") - - - repo_url = self._getOgRepository(repo_name) - real_git_dir = os.path.join(self.cache_dir, f"git-{repo_name}") - - if os.path.exists(real_git_dir): - self.logger.debug(f"Removing existing repository {real_git_dir}") - shutil.rmtree(real_git_dir) - - self.logger.debug(f"URL: {repo_url}") - - all_metadata = self._get_repo_metadata(repo_name) - metadata = all_metadata["metadata.json"] - fs_data = all_metadata["filesystems.json"] - - - if len(fs_data.keys()) == 0: - raise RequirementException("El repositorio contiene metadatos incorrectos. Falta información sobre sistemas de archivos en filesystems.json.") - - if not "efi_boot" in metadata: - raise RequirementException("El repositorio contiene metadatos incorrectos. Falta información de metadatos en metadata.json") - - repo_is_efi = metadata["efi_boot"] - efi = self._is_efi() - - self.logger.debug(f"Repository made for EFI: {repo_is_efi}") - self.logger.debug(f"Our system using EFI : {efi}") - - if repo_is_efi != efi: - raise RequirementException("Repositorio usa sistema de arranque incompatible con sistema actual") - - self.fs.unmount(device = destination) - - filesystem_map = {"/" : destination} - - self._create_filesystems(fs_data, filesystem_map) - - destination_dir = "/mnt/repo-" + repo_name - - self.fs.mount(destination, destination_dir) - - self._delete_contents(destination_dir) - - self.logger.info("Cloning repository from %s", repo_url) - - repo = git.Repo.clone_from(repo_url, destination_dir, multi_options = [f"--separate-git-dir={real_git_dir}"], progress=OgProgressPrinter(self.logger)) - - if repo_is_efi: - self._efi_install(root_directory=destination_dir) - else: - self._grub_install(root_directory=destination_dir, boot_device=boot_device) - - self.fs.mklostandfound(destination_dir) - self._restore_metadata(destination_dir, set_device_uuids=True) - - if self.fs.filesystem_type(mountpoint = destination_dir) == "ntfs": - self._ntfs_restore_secaudit(destination_dir) - - self.logger.info("Clone completed.") - - - def commit(self, path = None, device = None, message = None): - """ - Commit all current changes to the local data - """ - - if path is None: - path = self.fs.ensure_mounted(device) - - self.logger.info("Committing changes to repository") - repo = git.Repo(path) - - self._create_metadata(path, initial_creation=False) - - self.logger.info("Adding files") - repo.index.add("*") - - self.logger.info("Creating commit") - repo.index.commit(message) - - # Restaurar cosas modificadas para git - self._restore_metadata(path, destructive_only=True) - - - def restoreRepo(self, path): - """ - Restore the repository to the state it had before the non-committed modifications - """ - self.logger.info("Undoing any user changes to the filesystem") - repo = git.Repo(path) - - repo.head.reset(index=True, working_tree=True) - - # Restaurar cosas modificadas para git - self._restore_metadata(path, destructive_only=True) - - def push(self, path = None, device = None): - """ - Push local changes to ogrepository - - Use commit() first to save local changes. - """ - - - if path is None: - path = self.fs.ensure_mounted(device) - - repo = git.Repo(path) - - self.logger.info("Uploading to ogrepository") - if not "origin" in repo.remotes: - self.logger.critical("'origin' remote not found!") - return - - origin = repo.remotes["origin"] - repo.heads.master.set_tracking_branch(origin.refs.master) - origin.push(progress=OgProgressPrinter(self.logger)) - - #repo.git.push("--set-upstream", "origin", repo.head.ref, "--force") # force = True) - - - def fetch(self, path = None, device = None): - """ - Fetch updates from ogrepository. Doesn't change the filesystem. - """ - - - if path is None: - path = self.fs.ensure_mounted(device) - - repo = git.Repo(path) - - self.logger.info("Fetching from ogrepository") - origin = repo.remotes.origin - - if origin: - self.logger.debug("Fetching from origin") - origin.fetch(progress=OgProgressPrinter(self.logger)) - else: - self.logger.error("Origin not found, can't fetch") - - def pull(self, path = None, device = None): - """ - Pull changes from ogrepository - - This unconditionally overwrites remote changes. There is no conflict resolution. - """ - - - if path is None: - path = self.fs.ensure_mounted(device) - - repo = git.Repo(path) - - self.logger.debug("Downloading from ogrepository") - repo.git.fetch() - repo.head.reset(index=True, working_tree=True) - - # Restaurar cosas modificadas para git - self._restore_metadata(path, destructive_only=True) - - def check_remote_exists(self, repo_name): - repo_url = self._getOgRepository(repo_name) - - self.logger.info("Checking whether %s exists and is accessible", repo_url) - - ret = subprocess.run(["/usr/bin/git", "ls-remote", repo_url], encoding='utf-8', capture_output=True, check=False) - if ret.returncode == 0: - return True - else: - self.logger.warning("Remote can't be accessed, git said: %s", ret.stderr) - return False - - - - - - -if __name__ == '__main__': - # python no cree que nuestra consola usa utf-8. - # esto arregla las tildes y las eñes - sys.stdout.reconfigure(encoding='utf-8') - - kernel_args = parse_kernel_cmdline() - - - opengnsys_log_dir = "/opt/opengnsys/log" - - logger = logging.getLogger(__package__) - logger.setLevel(logging.DEBUG) - - streamLog = logging.StreamHandler() - streamLog.setLevel(logging.INFO) - - if not os.path.exists(opengnsys_log_dir): - os.mkdir(opengnsys_log_dir) - - ip_address = "unknown" - if "ip" in kernel_args: - ip_address = kernel_args["ip"].split(":")[0] - - - logFilePath = f"{opengnsys_log_dir}/{ip_address}.gitlib.log" - - fileLog = logging.FileHandler(logFilePath) - fileLog.setLevel(logging.DEBUG) - - formatter = logging.Formatter('%(asctime)s - %(name)24s - [%(levelname)5s] - %(message)s') - - streamLog.setFormatter(formatter) - fileLog.setFormatter(formatter) - - logger.addHandler(streamLog) - logger.addHandler(fileLog) - - - logger.info("Program start, logging details to %s", logFilePath) - - parser = argparse.ArgumentParser( - prog="OpenGnsys Git Library", - description="Funciones de Git", - ) - - #parser.add_argument("--init-repo", type=str, metavar='DIR', help="Inicializar repositorio desde DIR") - parser.add_argument("--init-repo-from", type=str, metavar='DEV', help="Inicializar repositorio desde DEV") - parser.add_argument("--clone-repo-to", type=str, metavar='DEV', help="Clonar repositorio a DIR. Elimina todos los datos en ruta destino!") - parser.add_argument("--repo", type=str, help="Repositorio en ogrepository (linux, windows, mac)") - parser.add_argument("--boot-device", type=str, help="Dispositivo de arranque") - parser.add_argument("--commit", type=str, metavar='DEV', help="Commit de cambios en el directorio") - parser.add_argument("--restore", type=str, metavar='DEV', help="Eliminar cambios en el directorio") - parser.add_argument("--push", type=str, metavar='DEV', help="Subir cambios a ogrepository") - parser.add_argument("--pull", type=str, metavar='DEV', help="Bajar cambios de ogrepository") - parser.add_argument("--fetch", type=str, metavar='DEV', help="Fetch changes from ogrepository") - parser.add_argument("--efi-config", type=str, metavar="NAME", help="Name of the custom EFI configuration to deploy") - parser.add_argument("--verify-repo", action='store_true', help="Verify whether the indicated repository exists and can be used") - - - parser.add_argument("--ntfs-type", type=str, metavar="FS", help="Tipo de NTFS, 'kernel' o 'fuse'") - parser.add_argument("--test-create-metadata", type=str, metavar="DIR", help="Test metadata generation") - parser.add_argument("--test-restore-metadata", type=str, metavar="DIR", help="Test metadata restoration") - parser.add_argument("--test-restore-metadata-destructive", type=str, metavar="DIR", help="Test metadata restoration, destructive parts only") - parser.add_argument("--test-clone-metadata", type=str, metavar="REPO", help="Test metadata cloning") - parser.add_argument("--test-efi-install", type=str, metavar="DIR", help = "Install EFI") - parser.add_argument("-m", "--message", type=str, metavar="MSG", help="Commit message") - parser.add_argument("--test-set-ntfsid", type=str, metavar="ID", help="Set NTFS ID") - parser.add_argument("--test-restore-secaudit",type=str, metavar="DIR", help="Test restoring NTFS secaudit") - parser.add_argument("--test-get-part-uuid", type=str, metavar="PART", help="Get partition UUID") - - parser.add_argument("--device", type=str, metavar="DEV", help="Device to set the UUID on") - - parser.add_argument("-v", "--verbose", action="store_true", help = "Verbose console output") - - args = parser.parse_args() - - if args.verbose: - streamLog.setLevel(logging.DEBUG) - - - logger.debug("Starting") - - ntfs_impl = NTFSImplementation.NTFS3G - - if not args.ntfs_type is None: - if args.ntfs_type == "kernel": - ntfs_impl = NTFSImplementation.KERNEL - elif args.ntfs_type == "fuse": - ntfs_impl = NTFSImplementation.NTFS3G - else: - raise ValueError(f"Unknown NTFS implementation: {args.ntfs_type}") - - - og_git = OpengnsysGitLibrary(ntfs_implementation = ntfs_impl) - # og_git._runBashFunction("ogMountCache", []) - - -# if args.init_repo: - # #og_git.initRepo("/mnt/sda1", "linux") - # with OperationTimer(og_git, "git init"): - # og_git.initRepo(args.init_repo, args.repo) - if args.init_repo_from: - with OperationTimer(og_git, "git init"): - og_git.initRepo(args.init_repo_from, args.repo) - elif args.clone_repo_to: - #og_git.cloneRepo("linux", "/opt/opengnsys/cache/cloned") - with OperationTimer(og_git, "git clone"): - og_git.cloneRepo(args.repo, args.clone_repo_to, args.boot_device) - #og_git._restore_metadata("/opt/opengnsys/cache/cloned") - #og_git._restore_metadata(args.clone_repo_to) - elif args.commit: - with OperationTimer(og_git, "git commit"): - og_git.commit(device = args.commit, message = args.message) - elif args.restore: - with OperationTimer(og_git, "git restore"): - og_git.restoreRepo(args.restore) - elif args.push: - with OperationTimer(og_git, "git push"): - og_git.push(device = args.push) - elif args.fetch: - with OperationTimer(og_git, "git fetch"): - og_git.fetch(device = args.fetch) - elif args.pull: - with OperationTimer(og_git, "git pull"): - og_git.pull(device = args.pull) - elif args.verify_repo: - if og_git.check_remote_exists(args.repo): - print("Remote checks OK") - else: - print("Check failed") - - elif args.test_create_metadata: - og_git._create_metadata(args.test_create_metadata, initial_creation=False) # pylint: disable=protected-access - elif args.test_restore_metadata: - og_git._restore_metadata(args.test_restore_metadata, set_device_uuids=True) # pylint: disable=protected-access - elif args.test_restore_metadata_destructive: - og_git._restore_metadata(path = args.test_restore_metadata_destructive, destructive_only=True) # pylint: disable=protected-access - elif args.test_clone_metadata: - og_git._get_repo_metadata(args.test_clone_metadata) # pylint: disable=protected-access - elif args.test_set_ntfsid: - ntfs = NTFSLibrary(ntfs_impl) - ntfs.modify_uuid(args.device, args.test_set_ntfsid) - elif args.test_efi_install: - og_git._efi_install(root_directory=args.test_efi_install, config_name = args.efi_config) # pylint: disable=protected-access - elif args.test_restore_secaudit: - og_git._ntfs_restore_secaudit(args.test_restore_secaudit) # pylint: disable=protected-access - else: - print("Please specify an action.") - parser.print_help() - sys.exit(1) - # - - # Make sure all filesystem changes are written, just in case the oglive is rebooted without an unmount - os.sync() diff --git a/ogclient/lib/python3/kernel.py b/ogclient/lib/python3/kernel.py deleted file mode 100644 index f0a7eb2..0000000 --- a/ogclient/lib/python3/kernel.py +++ /dev/null @@ -1,22 +0,0 @@ - - -def parse_kernel_cmdline(): - """Parse the kernel arguments to obtain configuration parameters in Oglive - - OpenGnsys passes data in the kernel arguments, for example: - [...] group=Aula_virtual ogrepo=192.168.2.1 oglive=192.168.2.1 [...] - - Returns: - dict: Dict of configuration parameters and their values. - """ - params = {} - - with open("/proc/cmdline", encoding='utf-8') as cmdline: - line = cmdline.readline() - parts = line.split() - for part in parts: - if "=" in part: - key, value = part.split("=") - params[key] = value - - return params \ No newline at end of file diff --git a/ogclient/lib/python3/ntfs.py b/ogclient/lib/python3/ntfs.py deleted file mode 100644 index d3c3a57..0000000 --- a/ogclient/lib/python3/ntfs.py +++ /dev/null @@ -1,111 +0,0 @@ - -import logging -import subprocess - -from enum import Enum - - -class NTFSImplementation(Enum): - KERNEL = 1 - NTFS3G = 2 - - -class NTFSLibrary: - """ - A library for managing NTFS filesystems. - - Attributes: - logger (logging.Logger): Logger for the class. - implementation (NTFSImplementation): The implementation to use for mounting NTFS filesystems. - """ - - def __init__(self, implementation): - """ - Initializes the instance with the given implementation. - - Args: - implementation: The implementation to be used by the instance. - - Attributes: - logger (logging.Logger): Logger instance for the class, set to debug level. - implementation: The implementation provided during initialization. - """ - self.logger = logging.getLogger("NTFSLibrary") - self.logger.setLevel(logging.DEBUG) - self.implementation = implementation - - self.logger.debug("Initializing") - - def create_filesystem(self, device, label): - """ - Creates an NTFS filesystem on the specified device with the given label. - - Args: - device (str): The device path where the NTFS filesystem will be created. - label (str): The label to assign to the NTFS filesystem. - - Returns: - None - - Logs: - Logs the creation process with the device and label information. - """ - self.logger.info(f"Creating NTFS in {device} with label {label}") - - subprocess.run(["/usr/sbin/mkntfs", device, "-Q", "-L", label], check=True) - - - def mount_filesystem(self, device, mountpoint): - """ - Mounts a filesystem on the specified mountpoint using the specified NTFS implementation. - - Args: - device (str): The device path to be mounted (e.g., '/dev/sda1'). - mountpoint (str): The directory where the device will be mounted. - - Raises: - ValueError: If the NTFS implementation is unknown. - - """ - self.logger.info(f"Mounting {device} in {mountpoint} using implementation {self.implementation}") - if self.implementation == NTFSImplementation.KERNEL: - subprocess.run(["/usr/bin/mount", "-t", "ntfs3", device, mountpoint], check = True) - elif self.implementation == NTFSImplementation.NTFS3G: - subprocess.run(["/usr/bin/ntfs-3g", device, mountpoint], check = True) - else: - raise ValueError("Unknown NTFS implementation: {self.implementation}") - - def modify_uuid(self, device, uuid): - """ - Modify the UUID of an NTFS device. - - This function changes the UUID of the specified NTFS device to the given UUID. - It reads the current UUID from the device, logs the change, and writes the new UUID. - - Args: - device (str): The path to the NTFS device file. - uuid (str): The new UUID to be set, in hexadecimal string format. - - Raises: - IOError: If there is an error opening or writing to the device file. - """ - - ntfs_uuid_offset = 0x48 - ntfs_uuid_length = 8 - - binary_uuid = bytearray.fromhex(uuid) - binary_uuid.reverse() - - self.logger.info(f"Changing UUID on {device} to {uuid}") - with open(device, 'r+b') as ntfs_dev: - self.logger.debug("Reading %i bytes from offset %i", ntfs_uuid_length, ntfs_uuid_offset) - - ntfs_dev.seek(ntfs_uuid_offset) - prev_uuid = bytearray(ntfs_dev.read(ntfs_uuid_length)) - prev_uuid.reverse() - prev_uuid_hex = bytearray.hex(prev_uuid) - self.logger.debug(f"Previous UUID: {prev_uuid_hex}") - - self.logger.debug("Writing...") - ntfs_dev.seek(ntfs_uuid_offset) - ntfs_dev.write(binary_uuid) diff --git a/ogclient/lib/python3/test.py b/ogclient/lib/python3/test.py deleted file mode 100644 index 37e14a0..0000000 --- a/ogclient/lib/python3/test.py +++ /dev/null @@ -1,13 +0,0 @@ -#!/usr/bin/env python3 -def _hex_to_bin( hex_str): - - while len(hex_str) != 16: - hex_str = "0" + hex_str - - hex_int = int(hex_str, 16) - binary = bin(hex_int)[2:].zfill(64) - - return binary - - -print( _hex_to_bin("0ACA")) \ No newline at end of file