Merge pull request 'refs #2227 add oggit scripts and libs' (#61) from add-oggit-scripts-libs into main
ogclient/pipeline/head This commit looks good Details
ogclient/pipeline/tag This commit looks good Details

Reviewed-on: #61
fix-ogboot 0.18.0
Natalia Serrano 2025-06-16 15:59:21 +02:00
commit 6c543660f9
11 changed files with 3018 additions and 0 deletions

View File

@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [0.18.0] - 2025-06-16
### Added
- Added scripts and libs for oggit
## [0.17.0] - 2025-06-16
### Changed

View File

@ -0,0 +1,38 @@
#!/usr/bin/env python3
import os
import subprocess
import sys
import time
sys.path.insert(0, "/opt/oglive/rootfs/opt/opengnsys/lib/python3/")
sys.path.insert(0, "/opt/opengnsys/interfaceAdm/git/")
sys.path.insert(0, "/opt/opengnsys/ogrepository/oggit/lib/")
import NetLib
import ogGlobals
import SystemLib
from gitlib import OpengnsysGitLibrary, NTFSImplementation
def create_image(disk_num, partition_num, repo, image_name):
ntfs_impl = NTFSImplementation.NTFS3G
og_git = OpengnsysGitLibrary(ntfs_implementation = ntfs_impl)
device = og_git._runBashFunction("ogDiskToDev", [str(disk_num), str(partition_num)])
og_git.initRepo(device, image_name)
def main():
if len(sys.argv) != 6:
sys.exit(SystemLib.ogRaiseError(OG_ERR_FORMAT, "Incorrect number of arguments"))
disk_num, partition_num, image_name, repo, tag = sys.argv[1:6]
retval = create_image(disk_num, partition_num, repo, image_name)
sys.exit(retval)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,32 @@
#!/usr/bin/env python3
import sys
import subprocess
sys.path.insert(0, "/opt/oglive/rootfs/opt/opengnsys/lib/python3/")
sys.path.insert(0, "/opt/opengnsys/interfaceAdm/git/")
sys.path.insert(0, "/opt/opengnsys/ogrepository/oggit/lib/")
import NetLib
import ogGlobals
import SystemLib
from gitlib import OpengnsysGitLibrary, NTFSImplementation
if __name__ == "__main__":
if len(sys.argv) < 4:
print("Usage: python RestaurarImagenGit.py <disk> <partition> <repo> <boot_device>")
sys.exit(1)
disk = sys.argv[1]
partition = sys.argv[2]
repo = sys.argv[3]
boot_device = sys.argv[4]
ntfs_impl = NTFSImplementation.NTFS3G
og_git = OpengnsysGitLibrary(ntfs_implementation = ntfs_impl)
device = og_git._runBashFunction("ogDiskToDev", [str(disk), str(partition)])
og_git.cloneRepo(repo, device, boot_device)

View File

@ -0,0 +1,345 @@
#!/usr/bin/env python3
import hivex
import argparse
import struct
from hivex import Hivex
from hivex.hive_types import *
# Docs:
#
# https://www.geoffchappell.com/notes/windows/boot/bcd/objects.htm
# https://learn.microsoft.com/en-us/previous-versions/windows/desktop/bcd/bcdbootmgrelementtypes
#print(f"Root: {root}")
BCD_Enumerations = {
"BcdLibraryDevice_ApplicationDevice" : 0x11000001,
"BcdLibraryString_ApplicationPath" : 0x12000002,
"BcdLibraryString_Description" : 0x12000004,
"BcdLibraryString_PreferredLocale" : 0x12000005,
"BcdLibraryObjectList_InheritedObjects" : 0x14000006,
"BcdLibraryInteger_TruncatePhysicalMemory" : 0x15000007,
"BcdLibraryObjectList_RecoverySequence" : 0x14000008,
"BcdLibraryBoolean_AutoRecoveryEnabled" : 0x16000009,
"BcdLibraryIntegerList_BadMemoryList" : 0x1700000a,
"BcdLibraryBoolean_AllowBadMemoryAccess" : 0x1600000b,
"BcdLibraryInteger_FirstMegabytePolicy" : 0x1500000c,
"BcdLibraryInteger_RelocatePhysicalMemory" : 0x1500000D,
"BcdLibraryInteger_AvoidLowPhysicalMemory" : 0x1500000E,
"BcdLibraryBoolean_DebuggerEnabled" : 0x16000010,
"BcdLibraryInteger_DebuggerType" : 0x15000011,
"BcdLibraryInteger_SerialDebuggerPortAddress" : 0x15000012,
"BcdLibraryInteger_SerialDebuggerPort" : 0x15000013,
"BcdLibraryInteger_SerialDebuggerBaudRate" : 0x15000014,
"BcdLibraryInteger_1394DebuggerChannel" : 0x15000015,
"BcdLibraryString_UsbDebuggerTargetName" : 0x12000016,
"BcdLibraryBoolean_DebuggerIgnoreUsermodeExceptions" : 0x16000017,
"BcdLibraryInteger_DebuggerStartPolicy" : 0x15000018,
"BcdLibraryString_DebuggerBusParameters" : 0x12000019,
"BcdLibraryInteger_DebuggerNetHostIP" : 0x1500001A,
"BcdLibraryInteger_DebuggerNetPort" : 0x1500001B,
"BcdLibraryBoolean_DebuggerNetDhcp" : 0x1600001C,
"BcdLibraryString_DebuggerNetKey" : 0x1200001D,
"BcdLibraryBoolean_EmsEnabled" : 0x16000020,
"BcdLibraryInteger_EmsPort" : 0x15000022,
"BcdLibraryInteger_EmsBaudRate" : 0x15000023,
"BcdLibraryString_LoadOptionsString" : 0x12000030,
"BcdLibraryBoolean_DisplayAdvancedOptions" : 0x16000040,
"BcdLibraryBoolean_DisplayOptionsEdit" : 0x16000041,
"BcdLibraryDevice_BsdLogDevice" : 0x11000043,
"BcdLibraryString_BsdLogPath" : 0x12000044,
"BcdLibraryBoolean_GraphicsModeDisabled" : 0x16000046,
"BcdLibraryInteger_ConfigAccessPolicy" : 0x15000047,
"BcdLibraryBoolean_DisableIntegrityChecks" : 0x16000048,
"BcdLibraryBoolean_AllowPrereleaseSignatures" : 0x16000049,
"BcdLibraryString_FontPath" : 0x1200004A,
"BcdLibraryInteger_SiPolicy" : 0x1500004B,
"BcdLibraryInteger_FveBandId" : 0x1500004C,
"BcdLibraryBoolean_ConsoleExtendedInput" : 0x16000050,
"BcdLibraryInteger_GraphicsResolution" : 0x15000052,
"BcdLibraryBoolean_RestartOnFailure" : 0x16000053,
"BcdLibraryBoolean_GraphicsForceHighestMode" : 0x16000054,
"BcdLibraryBoolean_IsolatedExecutionContext" : 0x16000060,
"BcdLibraryBoolean_BootUxDisable" : 0x1600006C,
"BcdLibraryBoolean_BootShutdownDisabled" : 0x16000074,
"BcdLibraryIntegerList_AllowedInMemorySettings" : 0x17000077,
"BcdLibraryBoolean_ForceFipsCrypto" : 0x16000079,
"BcdBootMgrObjectList_DisplayOrder" : 0x24000001,
"BcdBootMgrObjectList_BootSequence" : 0x24000002,
"BcdBootMgrObject_DefaultObject" : 0x23000003,
"BcdBootMgrInteger_Timeout" : 0x25000004,
"BcdBootMgrBoolean_AttemptResume" : 0x26000005,
"BcdBootMgrObject_ResumeObject" : 0x23000006,
"BcdBootMgrObjectList_ToolsDisplayOrder" : 0x24000010,
"BcdBootMgrBoolean_DisplayBootMenu" : 0x26000020,
"BcdBootMgrBoolean_NoErrorDisplay" : 0x26000021,
"BcdBootMgrDevice_BcdDevice" : 0x21000022,
"BcdBootMgrString_BcdFilePath" : 0x22000023,
"BcdBootMgrBoolean_ProcessCustomActionsFirst" : 0x26000028,
"BcdBootMgrIntegerList_CustomActionsList" : 0x27000030,
"BcdBootMgrBoolean_PersistBootSequence" : 0x26000031,
"BcdDeviceInteger_RamdiskImageOffset" : 0x35000001,
"BcdDeviceInteger_TftpClientPort" : 0x35000002,
"BcdDeviceInteger_SdiDevice" : 0x31000003,
"BcdDeviceInteger_SdiPath" : 0x32000004,
"BcdDeviceInteger_RamdiskImageLength" : 0x35000005,
"BcdDeviceBoolean_RamdiskExportAsCd" : 0x36000006,
"BcdDeviceInteger_RamdiskTftpBlockSize" : 0x36000007,
"BcdDeviceInteger_RamdiskTftpWindowSize" : 0x36000008,
"BcdDeviceBoolean_RamdiskMulticastEnabled" : 0x36000009,
"BcdDeviceBoolean_RamdiskMulticastTftpFallback" : 0x3600000A,
"BcdDeviceBoolean_RamdiskTftpVarWindow" : 0x3600000B,
"BcdMemDiagInteger_PassCount" : 0x25000001,
"BcdMemDiagInteger_FailureCount" : 0x25000003,
"Reserved1" : 0x21000001,
"Reserved2" : 0x22000002,
"BcdResumeBoolean_UseCustomSettings" : 0x26000003,
"BcdResumeDevice_AssociatedOsDevice" : 0x21000005,
"BcdResumeBoolean_DebugOptionEnabled" : 0x26000006,
"BcdResumeInteger_BootMenuPolicy" : 0x25000008,
"BcdOSLoaderDevice_OSDevice" : 0x21000001,
"BcdOSLoaderString_SystemRoot" : 0x22000002,
"BcdOSLoaderObject_AssociatedResumeObject" : 0x23000003,
"BcdOSLoaderBoolean_DetectKernelAndHal" : 0x26000010,
"BcdOSLoaderString_KernelPath" : 0x22000011,
"BcdOSLoaderString_HalPath" : 0x22000012,
"BcdOSLoaderString_DbgTransportPath" : 0x22000013,
"BcdOSLoaderInteger_NxPolicy" : 0x25000020,
"BcdOSLoaderInteger_PAEPolicy" : 0x25000021,
"BcdOSLoaderBoolean_WinPEMode" : 0x26000022,
"BcdOSLoaderBoolean_DisableCrashAutoReboot" : 0x26000024,
"BcdOSLoaderBoolean_UseLastGoodSettings" : 0x26000025,
"BcdOSLoaderBoolean_AllowPrereleaseSignatures" : 0x26000027,
"BcdOSLoaderBoolean_NoLowMemory" : 0x26000030,
"BcdOSLoaderInteger_RemoveMemory" : 0x25000031,
"BcdOSLoaderInteger_IncreaseUserVa" : 0x25000032,
"BcdOSLoaderBoolean_UseVgaDriver" : 0x26000040,
"BcdOSLoaderBoolean_DisableBootDisplay" : 0x26000041,
"BcdOSLoaderBoolean_DisableVesaBios" : 0x26000042,
"BcdOSLoaderBoolean_DisableVgaMode" : 0x26000043,
"BcdOSLoaderInteger_ClusterModeAddressing" : 0x25000050,
"BcdOSLoaderBoolean_UsePhysicalDestination" : 0x26000051,
"BcdOSLoaderInteger_RestrictApicCluster" : 0x25000052,
"BcdOSLoaderBoolean_UseLegacyApicMode" : 0x26000054,
"BcdOSLoaderInteger_X2ApicPolicy" : 0x25000055,
"BcdOSLoaderBoolean_UseBootProcessorOnly" : 0x26000060,
"BcdOSLoaderInteger_NumberOfProcessors" : 0x25000061,
"BcdOSLoaderBoolean_ForceMaximumProcessors" : 0x26000062,
"BcdOSLoaderBoolean_ProcessorConfigurationFlags" : 0x25000063,
"BcdOSLoaderBoolean_MaximizeGroupsCreated" : 0x26000064,
"BcdOSLoaderBoolean_ForceGroupAwareness" : 0x26000065,
"BcdOSLoaderInteger_GroupSize" : 0x25000066,
"BcdOSLoaderInteger_UseFirmwarePciSettings" : 0x26000070,
"BcdOSLoaderInteger_MsiPolicy" : 0x25000071,
"BcdOSLoaderInteger_SafeBoot" : 0x25000080,
"BcdOSLoaderBoolean_SafeBootAlternateShell" : 0x26000081,
"BcdOSLoaderBoolean_BootLogInitialization" : 0x26000090,
"BcdOSLoaderBoolean_VerboseObjectLoadMode" : 0x26000091,
"BcdOSLoaderBoolean_KernelDebuggerEnabled" : 0x260000a0,
"BcdOSLoaderBoolean_DebuggerHalBreakpoint" : 0x260000a1,
"BcdOSLoaderBoolean_UsePlatformClock" : 0x260000A2,
"BcdOSLoaderBoolean_ForceLegacyPlatform" : 0x260000A3,
"BcdOSLoaderInteger_TscSyncPolicy" : 0x250000A6,
"BcdOSLoaderBoolean_EmsEnabled" : 0x260000b0,
"BcdOSLoaderInteger_DriverLoadFailurePolicy" : 0x250000c1,
"BcdOSLoaderInteger_BootMenuPolicy" : 0x250000C2,
"BcdOSLoaderBoolean_AdvancedOptionsOneTime" : 0x260000C3,
"BcdOSLoaderInteger_BootStatusPolicy" : 0x250000E0,
"BcdOSLoaderBoolean_DisableElamDrivers" : 0x260000E1,
"BcdOSLoaderInteger_HypervisorLaunchType" : 0x250000F0,
"BcdOSLoaderBoolean_HypervisorDebuggerEnabled" : 0x260000F2,
"BcdOSLoaderInteger_HypervisorDebuggerType" : 0x250000F3,
"BcdOSLoaderInteger_HypervisorDebuggerPortNumber" : 0x250000F4,
"BcdOSLoaderInteger_HypervisorDebuggerBaudrate" : 0x250000F5,
"BcdOSLoaderInteger_HypervisorDebugger1394Channel" : 0x250000F6,
"BcdOSLoaderInteger_BootUxPolicy" : 0x250000F7,
"BcdOSLoaderString_HypervisorDebuggerBusParams" : 0x220000F9,
"BcdOSLoaderInteger_HypervisorNumProc" : 0x250000FA,
"BcdOSLoaderInteger_HypervisorRootProcPerNode" : 0x250000FB,
"BcdOSLoaderBoolean_HypervisorUseLargeVTlb" : 0x260000FC,
"BcdOSLoaderInteger_HypervisorDebuggerNetHostIp" : 0x250000FD,
"BcdOSLoaderInteger_HypervisorDebuggerNetHostPort" : 0x250000FE,
"BcdOSLoaderInteger_TpmBootEntropyPolicy" : 0x25000100,
"BcdOSLoaderString_HypervisorDebuggerNetKey" : 0x22000110,
"BcdOSLoaderBoolean_HypervisorDebuggerNetDhcp" : 0x26000114,
"BcdOSLoaderInteger_HypervisorIommuPolicy" : 0x25000115,
"BcdOSLoaderInteger_XSaveDisable" : 0x2500012b
}
def format_value(bcd, bcd_value):
name = bcd.value_key(bcd_value)
(type, length) = bcd.value_type(bcd_value)
typename = ""
str_value = ""
if type == REG_SZ:
typename = "SZ"
str_value = bcd.value_string(bcd_value)
elif type == REG_DWORD:
typename = "DWORD"
dval = bcd.value_dword(bcd_value)
str_value = hex(dval) + " (" + str(bcd.value_dword(bcd_value)) + ")"
elif type == REG_BINARY:
typename = "BIN"
(length, value) = bcd.value_value(bcd_value)
str_value = value.hex()
elif type == REG_DWORD_BIG_ENDIAN:
typename = "DWORD_BE"
elif type == REG_EXPAND_SZ:
typename = "EXPAND SZ"
elif type == REG_FULL_RESOURCE_DESCRIPTOR:
typename = "RES DESC"
elif type == REG_LINK:
typename = "LINK"
elif type == REG_MULTI_SZ:
typename = "MULTISZ"
(length, str_value) = bcd.value_value(bcd_value)
str_value = str_value.decode('utf-16le')
str_value = str_value.replace("\0", ";")
#value = ";".join("\0".split(value))
elif type == REG_NONE:
typename = "NONE"
elif type == REG_QWORD:
typename = "QWORD"
elif type == REG_RESOURCE_LIST:
typename = "RES LIST"
elif type == REG_RESOURCE_REQUIREMENTS_LIST:
typename = "REQ LIST"
else:
typename = str(type)
str_value = "???"
return (typename, length, str_value)
def dump_all(root, depth = 0):
padding = "\t" * depth
children = bcd.node_children(root)
if len(children) > 0:
for child in children:
name = bcd.node_name(child)
print(f"{padding}{name}")
dump_all(child, depth + 1)
# print(f"Child: {child}")
#print(f"Values: {num_vals}")
return
values = bcd.node_values(root)
#print(f"Value list: {values}")
for v in values:
(type_name, length, str_value) = format_value(bcd, v)
name = bcd.value_key(v)
print(f"{padding}{name: <16}: [{type_name: <10}]; ({length: < 4}) {str_value}")
class WindowsBCD:
def __init__(self, filename):
self.filename = filename
self.bcd = Hivex(filename)
def dump(self, root=None, depth = 0):
padding = "\t" * depth
if root is None:
root = self.bcd.root()
children = self.bcd.node_children(root)
if len(children) > 0:
for child in children:
name = self.bcd.node_name(child)
print(f"{padding}{name}")
self.dump(child, depth + 1)
return
values = self.bcd.node_values(root)
for v in values:
(type_name, length, str_value) = format_value(self.bcd, v)
name = self.bcd.value_key(v)
print(f"{padding}{name: <16}: [{type_name: <10}]; ({length: < 4}) {str_value}")
def list(self):
root = self.bcd.root()
objects = self.bcd.node_get_child(root, "Objects")
for child in self.bcd.node_children(objects):
entry_id = self.bcd.node_name(child)
elements = self.bcd.node_get_child(child, "Elements")
description_entry = self.bcd.node_get_child(elements, "12000004")
if description_entry:
values = self.bcd.node_values(description_entry)
if values:
(type_name, length, str_value) = format_value(self.bcd, values[0])
print(f"{entry_id}: {str_value}")
else:
print(f"{entry_id}: [no description value!?]")
appdevice_entry = self.bcd.node_get_child(elements, "11000001")
if appdevice_entry:
values = self.bcd.node_values(appdevice_entry)
(length, data) = self.bcd.value_value(values[0])
hex = data.hex()
print(f"LEN: {length}, HEX: {hex}, RAW: {data}")
if len(data) > 10:
etype = struct.unpack_from('<I', data, offset = 16)
print(f"Type: {etype}")
else:
print(f"{entry_id}: [no description entry 12000004]")
parser = argparse.ArgumentParser(
prog="Windows BCD parser",
description="Parses the BCD",
)
parser.add_argument("--db", type=str, metavar='BCD file', help="Database to use")
parser.add_argument("--dump", action='store_true', help="Dumps the specified database")
parser.add_argument("--list", action='store_true', help="Lists boot entries in the specified database")
args = parser.parse_args()
bcdobj = WindowsBCD(args.db)
if args.dump:
# "/home/vadim/opengnsys/winboot/boot-copy/EFI/Microsoft/Boot/BCD"
#bcd = Hivex(args.dump)
#root = bcd.root()
#dump_all(root)
bcdobj.dump()
elif args.list:
bcdobj.list()

View File

@ -0,0 +1,124 @@
import logging
import subprocess
import re
# pylint: disable=locally-disabled, line-too-long, logging-fstring-interpolation, too-many-lines
class DiskLibrary:
def __init__(self):
self.logger = logging.getLogger("OpengnsysDiskLibrary")
self.logger.setLevel(logging.DEBUG)
def split_device_partition(self, device):
"""
Parses a device file like /dev/sda3 into the root device (/dev/sda) and partition number (3)
Args:
device (str): Device in /dev
Returns:
[base_device, partno]
"""
r = re.compile("^(.*?)(\\d+)$")
m = r.match(device)
disk = m.group(1)
partno = int(m.group(2))
self.logger.debug(f"{device} parsed into disk device {disk}, partition {partno}")
return (disk, partno)
def get_disk_json_data(self, device):
"""
Returns the partition JSON data dump for the entire disk, even if a partition is passed.
This is specifically in the format used by sfdisk.
Args:
device (str): Block device, eg, /dev/sda3
Returns:
str: JSON dump produced by sfdisk
"""
(disk, partno) = self.split_device_partition(device)
result = subprocess.run(["/usr/sbin/sfdisk", "--json", disk], check=True, capture_output=True, encoding='utf-8')
return result.stdout.strip()
def get_disk_uuid(self, device):
"""
Returns the UUID of the disk itself, if there's a GPT partition table.
Args:
device (str): Block device, eg, /dev/sda3
Returns:
str: UUID
"""
(disk, partno) = self.split_device_partition(device)
result = subprocess.run(["/usr/sbin/sfdisk", "--disk-id", disk], check=True, capture_output=True, encoding='utf-8')
return result.stdout.strip()
def set_disk_uuid(self, device, uuid):
(disk, partno) = self.split_device_partition(device)
subprocess.run(["/usr/sbin/sfdisk", "--disk-id", disk, uuid], check=True, encoding='utf-8')
def get_partition_uuid(self, device):
"""
Returns the UUID of the partition, if there's a GPT partition table.
Args:
device (str): Block device, eg, /dev/sda3
Returns:
str: UUID
"""
(disk, partno) = self.split_device_partition(device)
#result = subprocess.run(["/usr/sbin/sfdisk", "--part-uuid", disk, str(partno)], check=True, capture_output=True, encoding='utf-8')
#return result.stdout.strip()
guid = None
result = subprocess.run(["/usr/sbin/sgdisk", "--info", str(partno), disk], check=True, capture_output=True, encoding='utf-8')
for l in result.stdout.splitlines():
if 'Partition unique GUID' not in l: continue
guid = l.replace ('Partition unique GUID: ', '')
if guid is None:
self.logger.error (f'failed to get UUID of disk "{disk}" part "{partno}"')
return None
return guid
def set_partition_uuid(self, device, uuid):
(disk, partno) = self.split_device_partition(device)
subprocess.run(["/usr/sbin/sfdisk", "--part-uuid", disk, str(partno), uuid], check=True, encoding='utf-8')
def get_partition_type(self, device):
"""
Returns the type UUID of the partition, if there's a GPT partition table.
Args:
device (str): Block device, eg, /dev/sda3
Returns:
str: UUID
"""
(disk, partno) = self.split_device_partition(device)
result = subprocess.run(["/usr/sbin/sfdisk", "--part-type", disk, str(partno)], check=True, capture_output=True, encoding='utf-8')
return result.stdout.strip()
def set_partition_type(self, device, uuid):
(disk, partno) = self.split_device_partition(device)
subprocess.run(["/usr/sbin/sfdisk", "--part-type", disk, str(partno), uuid], check=True, encoding='utf-8')

View File

@ -0,0 +1,544 @@
import logging
import subprocess
import os
import json
import blkid
import time
from ntfs import *
# pylint: disable=locally-disabled, line-too-long, logging-fstring-interpolation, too-many-lines
class FilesystemLibrary:
def __init__(self, ntfs_implementation = NTFSImplementation.KERNEL):
self.logger = logging.getLogger("OpengnsysFilesystemLibrary")
self.logger.setLevel(logging.DEBUG)
self.mounts = {}
self.base_mount_path = "/mnt"
self.ntfs_implementation = ntfs_implementation
self.update_mounts()
def _rmmod(self, module):
self.logger.debug("Trying to unload module {module}...")
subprocess.run(["/usr/sbin/rmmod", module], check=False)
def _modprobe(self, module):
self.logger.debug("Trying to load module {module}...")
subprocess.run(["/usr/sbin/modprobe", module], check=True)
# _parse_mounts
def update_mounts(self):
"""
Update the current mount points by parsing the /proc/mounts file.
This method reads the /proc/mounts file to gather information about
the currently mounted filesystems. It stores this information in a
dictionary where the keys are the mount points and the values are
dictionaries containing details about each filesystem.
The details stored for each filesystem include:
- device: The device file associated with the filesystem.
- mountpoint: The directory where the filesystem is mounted.
- type: The type of the filesystem (e.g., ext4, vfat).
- options: Mount options associated with the filesystem.
- dump_freq: The dump frequency for the filesystem.
- passno: The pass number for filesystem checks.
The method also adds an entry for each mount point with a trailing
slash to ensure consistency in accessing the mount points.
Attributes:
mounts (dict): A dictionary where keys are mount points and values
are dictionaries containing filesystem details.
"""
filesystems = {}
self.logger.debug("Parsing /proc/mounts")
with open("/proc/mounts", 'r', encoding='utf-8') as mounts:
for line in mounts:
parts = line.split()
data = {}
data['device'] = parts[0]
data['mountpoint'] = parts[1]
data['type'] = parts[2]
data['options'] = parts[3]
data['dump_freq'] = parts[4]
data['passno'] = parts[5]
filesystems[data["mountpoint"]] = data
filesystems[data["mountpoint"] + "/"] = data
self.mounts = filesystems
def find_mountpoint(self, device):
"""
Find the mount point for a given device.
This method checks if the specified device is currently mounted and returns
the corresponding mount point if it is found.
Args:
device (str): The path to the device to check.
Returns:
str or None: The mount point of the device if it is mounted, otherwise None.
"""
norm = os.path.normpath(device)
self.logger.debug(f"Checking if {device} is mounted")
for mountpoint, mount in self.mounts.items():
#self.logger.debug(f"Item: {mount}")
#self.logger.debug(f"Checking: " + mount['device'])
if mount['device'] == norm:
return mountpoint
return None
def find_device(self, mountpoint):
"""
Find the device corresponding to a given mount point.
Args:
mountpoint (str): The mount point to search for.
Returns:
str or None: The device corresponding to the mount point if found,
otherwise None.
"""
self.update_mounts()
self.logger.debug("Finding device corresponding to mount point %s", mountpoint)
if mountpoint in self.mounts:
return self.mounts[mountpoint]['device']
else:
self.logger.warning("Failed to find mountpoint %s", mountpoint)
return None
def is_mounted(self, device = None, mountpoint = None):
def is_mounted(self, device=None, mountpoint=None):
"""
Check if a device or mountpoint is currently mounted.
Either checking by device or mountpoint is valid.
Args:
device (str, optional): The device to check if it is mounted.
Defaults to None.
mountpoint (str, optional): The mountpoint to check if it is mounted.
Defaults to None.
Returns:
bool: True if the device is mounted or the mountpoint is in the list
of mounts, False otherwise.
"""
self.update_mounts()
if device:
return not self.find_mountpoint(device) is None
else:
return mountpoint in self.mounts
def unmount(self, device = None, mountpoint = None):
def unmount(self, device=None, mountpoint=None):
"""
Unmounts a filesystem.
This method unmounts a filesystem either by the device name or the mountpoint.
If a device is provided, it finds the corresponding mountpoint and unmounts it.
If a mountpoint is provided directly, it unmounts the filesystem at that mountpoint.
Args:
device (str, optional): The device name to unmount. Defaults to None.
mountpoint (str, optional): The mountpoint to unmount. Defaults to None.
Raises:
subprocess.CalledProcessError: If the unmount command fails.
Logs:
Debug information about the unmounting process.
"""
if device:
self.logger.debug("Finding mountpoint of %s", device)
mountpoint = self.find_mountpoint(device)
if not mountpoint is None:
self.logger.debug(f"Unmounting {mountpoint}")
done = False
start_time = time.time()
timeout = 60
while not done and (time.time() - start_time) < timeout:
ret = subprocess.run(["/usr/bin/umount", mountpoint], check=False, capture_output=True, encoding='utf-8')
if ret.returncode == 0:
done=True
else:
if "target is busy" in ret.stderr:
self.logger.debug("Filesystem busy, waiting. %.1f seconds left", timeout - (time.time() - start_time))
time.sleep(0.1)
else:
raise subprocess.CalledProcessError(ret.returncode, ret.args, output=ret.stdout, stderr=ret.stderr)
# We've unmounted a new filesystem, update our filesystems list
self.update_mounts()
else:
self.logger.debug(f"{device} is not mounted")
def mount(self, device, mountpoint, filesystem = None):
"""
Mounts a device to a specified mountpoint.
Parameters:
device (str): The device to be mounted (e.g., '/dev/sda1').
mountpoint (str): The directory where the device will be mounted.
filesystem (str, optional): The type of filesystem to be used (e.g., 'ext4', 'ntfs'). Defaults to None.
Raises:
subprocess.CalledProcessError: If the mount command fails.
Logs:
Debug information about the mounting process, including the mount command, return code, stdout, and stderr.
Side Effects:
Creates the mountpoint directory if it does not exist.
Updates the internal list of mounted filesystems.
"""
self.logger.debug(f"Mounting {device} at {mountpoint}")
if not os.path.exists(mountpoint):
self.logger.debug(f"Creating directory {mountpoint}")
os.mkdir(mountpoint)
mount_cmd = ["/usr/bin/mount"]
if not filesystem is None:
mount_cmd = mount_cmd + ["-t", filesystem]
mount_cmd = mount_cmd + [device, mountpoint]
self.logger.debug(f"Mount command: {mount_cmd}")
result = subprocess.run(mount_cmd, check=True, capture_output = True)
self.logger.debug(f"retorno: {result.returncode}")
self.logger.debug(f"stdout: {result.stdout}")
self.logger.debug(f"stderr: {result.stderr}")
# We've mounted a new filesystem, update our filesystems list
self.update_mounts()
def ensure_mounted(self, device):
"""
Ensure that the given device is mounted.
This method attempts to mount the specified device to a path derived from
the base mount path and the device's basename. If the device is of type NTFS,
it uses the NTFSLibrary to handle the mounting process. For other filesystem
types, it uses a generic mount method.
Args:
device (str): The path to the device that needs to be mounted.
Returns:
str: The path where the device is mounted.
Logs:
- Info: When starting the mounting process.
- Debug: Various debug information including the mount path, filesystem type,
and success message.
Raises:
OSError: If there is an error creating the mount directory or mounting the device.
"""
self.logger.info("Mounting %s", device)
self.unmount(device = device)
path = os.path.join(self.base_mount_path, os.path.basename(device))
self.logger.debug(f"Will mount repo at {path}")
if not os.path.exists(path):
os.mkdir(path)
if self.filesystem_type(device) == "ntfs":
self.logger.debug("Handing a NTFS filesystem")
self._modprobe("ntfs3")
self.ntfsfix(device)
ntfs = NTFSLibrary(self.ntfs_implementation)
ntfs.mount_filesystem(device, path)
self.update_mounts()
else:
self.logger.debug("Handling a non-NTFS filesystem")
self.mount(device, path)
self.logger.debug("Successfully mounted at %s", path)
return path
def filesystem_type(self, device = None, mountpoint = None):
"""
Determine the filesystem type of a given device or mountpoint.
Args:
device (str, optional): The device to probe. If not provided, the device
will be determined based on the mountpoint.
mountpoint (str, optional): The mountpoint to find the device for. This
is used only if the device is not provided.
Returns:
str: The filesystem type of the device.
Raises:
KeyError: If the filesystem type cannot be determined from the probe.
Logs:
Debug: Logs the process of finding the device, probing the device, and
the determined filesystem type.
"""
if device is None:
self.logger.debug("Finding device for mountpoint %s", mountpoint)
device = self.find_device(mountpoint)
self.logger.debug(f"Probing {device}")
pr = blkid.Probe()
pr.set_device(device)
pr.enable_superblocks(True)
pr.set_superblocks_flags(blkid.SUBLKS_TYPE | blkid.SUBLKS_USAGE | blkid.SUBLKS_UUID | blkid.SUBLKS_UUIDRAW | blkid.SUBLKS_LABELRAW)
pr.do_safeprobe()
fstype = pr["TYPE"].decode('utf-8')
self.logger.debug(f"FS type is {fstype}")
return fstype
def is_filesystem(self, path):
"""
Check if the given path is a filesystem root.
Args:
path (str): The path to check.
Returns:
bool: True if the path is a filesystem root, False otherwise.
"""
# This is just an alias for better code readability
return self.is_mounted(mountpoint = path)
def create_filesystem(self, fs_type = None, fs_uuid = None, device = None):
"""
Create a filesystem on the specified device.
Parameters:
fs_type (str): The type of filesystem to create (e.g., 'ntfs', 'ext4', 'xfs', 'btrfs').
fs_uuid (str): The UUID to assign to the filesystem.
device (str): The device on which to create the filesystem (e.g., '/dev/sda1').
Raises:
RuntimeError: If the filesystem type is not recognized or if the filesystem creation command fails.
"""
self.logger.info(f"Creating filesystem {fs_type} with UUID {fs_uuid} in {device}")
if fs_type == "ntfs" or fs_type == "ntfs3":
self.logger.debug("Creating NTFS filesystem")
ntfs = NTFSLibrary(self.ntfs_implementation)
ntfs.create_filesystem(device, "NTFS")
ntfs.modify_uuid(device, fs_uuid)
else:
command = [f"/usr/sbin/mkfs.{fs_type}"]
command_args = []
if fs_type == "ext4" or fs_type == "ext3":
command_args = ["-U", fs_uuid, "-F", device]
elif fs_type == "xfs":
command_args = ["-m", f"uuid={fs_uuid}", "-f", device]
elif fs_type == "btrfs":
command_args = ["-U", fs_uuid, "-f", device]
else:
raise RuntimeError(f"Don't know how to create filesystem of type {fs_type}")
command = command + command_args
self.logger.debug(f"Creating Linux filesystem of type {fs_type} on {device}, command {command}")
result = subprocess.run(command, check = True, capture_output=True)
self.logger.debug(f"retorno: {result.returncode}")
self.logger.debug(f"stdout: {result.stdout}")
self.logger.debug(f"stderr: {result.stderr}")
def mklostandfound(self, path):
"""
Recreate the lost+found if necessary.
When cloning at the root of a filesystem, cleaning the contents
removes the lost+found directory. This is a special directory that requires the use of
a tool to recreate it.
It may fail if the filesystem does not need it. We consider this harmless and ignore it.
The command is entirely skipped on NTFS, as mklost+found may malfunction if run on it,
and has no useful purpose.
"""
if self.is_filesystem(path):
if self.filesystem_type(mountpoint=path) == "ntfs":
self.logger.debug("Not running mklost+found on NTFS")
return
curdir = os.getcwd()
result = None
try:
self.logger.debug(f"Re-creating lost+found in {path}")
os.chdir(path)
result = subprocess.run(["/usr/sbin/mklost+found"], check=True, capture_output=True)
except subprocess.SubprocessError as e:
self.logger.warning(f"Error running mklost+found: {e}")
if result:
self.logger.debug(f"retorno: {result.returncode}")
self.logger.debug(f"stdout: {result.stdout}")
self.logger.debug(f"stderr: {result.stderr}")
os.chdir(curdir)
def ntfsfix(self, device):
"""
Run the ntfsfix command on the specified device.
This method uses the ntfsfix utility to fix common NTFS problems on the given device.
This allows mounting an unclean NTFS filesystem.
Args:
device (str): The path to the device to be fixed.
Raises:
subprocess.CalledProcessError: If the ntfsfix command fails.
"""
self.logger.debug(f"Running ntfsfix on {device}")
subprocess.run(["/usr/bin/ntfsfix", "-d", device], check=True)
def unload_ntfs(self):
"""
Unloads the NTFS filesystem module.
This is a function added as a result of NTFS kernel module troubleshooting,
to try to ensure that NTFS code is only active as long as necessary.
The module is internally loaded as needed, so there's no load_ntfs function.
It may be removed in the future.
Raises:
RuntimeError: If the module cannot be removed.
"""
self._rmmod("ntfs3")
def find_boot_device(self):
"""
Searches for the EFI boot partition on the system.
This method scans the system's partitions to locate the EFI boot partition,
which is identified by the GUID "C12A7328-F81F-11D2-BA4B-00A0C93EC93B".
Returns:
str: The device node of the EFI partition if found, otherwise None.
Logs:
- Debug messages indicating the progress of the search.
- A warning message if the EFI partition is not found.
"""
disks = []
self.logger.debug("Looking for EFI partition")
with open("/proc/partitions", "r", encoding='utf-8') as partitions_file:
line_num=0
for line in partitions_file:
if line_num >=2:
data = line.split()
disk = data[3]
disks.append(disk)
self.logger.debug(f"Disk: {disk}")
line_num = line_num + 1
for disk in disks:
self.logger.debug("Loading partitions for disk %s", disk)
#disk_json_data = subprocess.run(["/usr/sbin/sfdisk", "-J", f"/dev/{disk}"], check=False, capture_output=True)
sfdisk_out = subprocess.run(["/usr/sbin/sfdisk", "-J", f"/dev/{disk}"], check=False, capture_output=True)
if sfdisk_out.returncode == 0:
disk_json_data = sfdisk_out.stdout
disk_data = json.loads(disk_json_data)
for part in disk_data["partitiontable"]["partitions"]:
self.logger.debug("Checking partition %s", part)
if part["type"] == "C12A7328-F81F-11D2-BA4B-00A0C93EC93B":
self.logger.debug("EFI partition found at %s", part["node"])
return part["node"]
else:
self.logger.debug("sfdisk returned with code %i, error %s", sfdisk_out.returncode, sfdisk_out.stderr)
self.logger.warning("Failed to find EFI partition!")
def temp_unmount(self, mountpoint):
"""
Temporarily unmounts the filesystem at the given mountpoint.
This method finds the device associated with the specified mountpoint,
and returns the information to remount it with temp_remount.
The purpose of this function is to temporarily unmount a filesystem for
actions like fsck, and to mount it back afterwards.
Args:
mountpoint (str): The mountpoint of the filesystem to unmount.
Returns:
dict: A dictionary containing the information needed to remount the filesystem.
"""
device = self.find_device(mountpoint)
fs = self.filesystem_type(mountpoint = mountpoint)
data = {"mountpoint" : mountpoint, "device" :device, "filesystem" : fs}
self.logger.debug("Temporarily unmounting device %s, mounted on %s, fs type %s", mountpoint, device, fs)
self.unmount(mountpoint = mountpoint)
return data
def temp_remount(self, unmount_data):
"""
Remounts a filesystem unmounted with temp_unmount
This method remounts a filesystem using the data provided by temp_unmount
Args:
unmount_data (dict): A dictionary containing the data needed to remount the filesystem.
Returns:
None
"""
self.logger.debug("Remounting temporarily unmounted device %s on %s, fs type %s", unmount_data["device"], unmount_data["mountpoint"], unmount_data["filesystem"])
self.mount(device = unmount_data["device"], mountpoint=unmount_data["mountpoint"], filesystem=unmount_data["filesystem"])

View File

@ -0,0 +1,52 @@
#!/usr/bin/env python3
import unittest
import logging
import os
import sys
import urllib.request
import tarfile
import subprocess
from shutil import rmtree
from pathlib import Path
parent_dir = str(Path(__file__).parent.parent.absolute())
sys.path.append(parent_dir)
sys.path.append("/opengnsys/installer")
print(parent_dir)
from gitlib import OpengnsysGitLibrary
class GitTests(unittest.TestCase):
def setUp(self):
self.logger = logging.getLogger("OpengnsysTest")
self.oggit = OpengnsysGitLibrary()
self.logger.info("setUp()")
if not hasattr(self, 'init_complete'):
self.init_complete = True
def test_init(self):
self.assertIsNotNone(self.oggit)
def test_acls(self):
self.oggit.ogCreateAcl()
def test_sync_local(self):
# self.oggit.ogSyncLocalGitImage()
None
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)20s - [%(levelname)5s] - %(message)s')
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logger.info("Inicio del programa")
unittest.main()

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,22 @@
def parse_kernel_cmdline():
"""Parse the kernel arguments to obtain configuration parameters in Oglive
OpenGnsys passes data in the kernel arguments, for example:
[...] group=Aula_virtual ogrepo=192.168.2.1 oglive=192.168.2.1 [...]
Returns:
dict: Dict of configuration parameters and their values.
"""
params = {}
with open("/proc/cmdline", encoding='utf-8') as cmdline:
line = cmdline.readline()
parts = line.split()
for part in parts:
if "=" in part:
key, value = part.split("=")
params[key] = value
return params

View File

@ -0,0 +1,111 @@
import logging
import subprocess
from enum import Enum
class NTFSImplementation(Enum):
KERNEL = 1
NTFS3G = 2
class NTFSLibrary:
"""
A library for managing NTFS filesystems.
Attributes:
logger (logging.Logger): Logger for the class.
implementation (NTFSImplementation): The implementation to use for mounting NTFS filesystems.
"""
def __init__(self, implementation):
"""
Initializes the instance with the given implementation.
Args:
implementation: The implementation to be used by the instance.
Attributes:
logger (logging.Logger): Logger instance for the class, set to debug level.
implementation: The implementation provided during initialization.
"""
self.logger = logging.getLogger("NTFSLibrary")
self.logger.setLevel(logging.DEBUG)
self.implementation = implementation
self.logger.debug("Initializing")
def create_filesystem(self, device, label):
"""
Creates an NTFS filesystem on the specified device with the given label.
Args:
device (str): The device path where the NTFS filesystem will be created.
label (str): The label to assign to the NTFS filesystem.
Returns:
None
Logs:
Logs the creation process with the device and label information.
"""
self.logger.info(f"Creating NTFS in {device} with label {label}")
subprocess.run(["/usr/sbin/mkntfs", device, "-Q", "-L", label], check=True)
def mount_filesystem(self, device, mountpoint):
"""
Mounts a filesystem on the specified mountpoint using the specified NTFS implementation.
Args:
device (str): The device path to be mounted (e.g., '/dev/sda1').
mountpoint (str): The directory where the device will be mounted.
Raises:
ValueError: If the NTFS implementation is unknown.
"""
self.logger.info(f"Mounting {device} in {mountpoint} using implementation {self.implementation}")
if self.implementation == NTFSImplementation.KERNEL:
subprocess.run(["/usr/bin/mount", "-t", "ntfs3", device, mountpoint], check = True)
elif self.implementation == NTFSImplementation.NTFS3G:
subprocess.run(["/usr/bin/ntfs-3g", device, mountpoint], check = True)
else:
raise ValueError("Unknown NTFS implementation: {self.implementation}")
def modify_uuid(self, device, uuid):
"""
Modify the UUID of an NTFS device.
This function changes the UUID of the specified NTFS device to the given UUID.
It reads the current UUID from the device, logs the change, and writes the new UUID.
Args:
device (str): The path to the NTFS device file.
uuid (str): The new UUID to be set, in hexadecimal string format.
Raises:
IOError: If there is an error opening or writing to the device file.
"""
ntfs_uuid_offset = 0x48
ntfs_uuid_length = 8
binary_uuid = bytearray.fromhex(uuid)
binary_uuid.reverse()
self.logger.info(f"Changing UUID on {device} to {uuid}")
with open(device, 'r+b') as ntfs_dev:
self.logger.debug("Reading %i bytes from offset %i", ntfs_uuid_length, ntfs_uuid_offset)
ntfs_dev.seek(ntfs_uuid_offset)
prev_uuid = bytearray(ntfs_dev.read(ntfs_uuid_length))
prev_uuid.reverse()
prev_uuid_hex = bytearray.hex(prev_uuid)
self.logger.debug(f"Previous UUID: {prev_uuid_hex}")
self.logger.debug("Writing...")
ntfs_dev.seek(ntfs_uuid_offset)
ntfs_dev.write(binary_uuid)

View File

@ -0,0 +1,13 @@
#!/usr/bin/env python3
def _hex_to_bin( hex_str):
while len(hex_str) != 16:
hex_str = "0" + hex_str
hex_int = int(hex_str, 16)
binary = bin(hex_int)[2:].zfill(64)
return binary
print( _hex_to_bin("0ACA"))