oggit/gitlib/gitlib.py

1641 lines
66 KiB
Python

#!/usr/bin/env python3
# pylint: disable=locally-disabled, line-too-long, logging-fstring-interpolation, too-many-lines
### NOTES:
# Install:
# python3-git
# python3.8
# Must have working locales, or unicode strings will fail. Install 'locales', configure /etc/locale.gen, run locale-gen.
#
import os
import sys
sys.path.insert(0, "/usr/share/opengnsys-modules/python3/dist-packages")
import shutil
import argparse
import tempfile
import logging
import subprocess
import json
from pathlib import Path
import base64
import stat
import time
import git
import libarchive
import xattr
import posix1e
import blkid
from filesystem import *
from disk import *
from ntfs import *
import re
import uuid
from tqdm import tqdm
class OgProgressPrinter(git.RemoteProgress):
"""
A class to print progress updates for Git operations.
This class extends `git.RemoteProgress` to provide custom logging and
printing of progress updates to the standard error stream.
Attributes:
logger (Logger): The logger instance used to log debug messages.
prev_len (int): The length of the previous status string printed.
Methods:
__init__(parentLogger):
Initializes the OgProgressPrinter with a logger instance.
update(op_code, cur_count, max_count=None, message=""):
Updates the progress status and prints it to the standard error stream.
__del__():
Ensures a newline is printed when the instance is deleted.
"""
def __init__(self, parentLogger):
super().__init__()
self.logger = parentLogger
if sys.stdin.isatty():
self.progress = tqdm()
def update(self, op_code, cur_count, max_count=None, message=""):
self.logger.debug(f"Progress: {op_code} {cur_count}/{max_count}: {message}")
if self.progress:
self.progress.total = max_count
self.progress.n = cur_count
self.progress.desc = message
self.progress.refresh()
def __del__(self):
print("\n", file=sys.stderr)
class OperationTimer:
def __init__(self, parent, operation_name):
self.operation_name = operation_name
self.parent = parent
self.start = time.time()
def __enter__(self):
self.start = time.time()
def __exit__(self, *args):
elapsed = round(time.time() - self.start, 3)
self.parent.logger.info(f"{self.operation_name} took {elapsed}s")
class RequirementException(Exception):
"""Excepción que indica que nos falta algún requisito
Duplicado de git_installer.
Attributes:
message (str): Mensaje de error mostrado al usuario
"""
def __init__(self, message):
"""Inicializar RequirementException.
Args:
message (str): Mensaje de error mostrado al usuario
"""
super().__init__(message)
self.message = message
class OpengnsysGitLibrary:
"""OpenGnsys Git Library"""
def __init__(self, require_cache = True, ntfs_implementation = NTFSImplementation.KERNEL):
"""
Initializes the Git library for OpenGnsys.
Args:
require_cache (bool): Indicates whether a cache partition is required. Defaults to True.
ntfs_implementation (NTFSImplementation): Specifies the NTFS implementation to use. Defaults to NTFSImplementation.KERNEL.
Raises:
RequirementException: If the cache partition is required but cannot be mounted.
Attributes:
logger (logging.Logger): Logger instance for the Git library.
mounts (list): List of mounted filesystems.
repo_user (str): Username for the repository.
repo_image_path (str): Path to the repository images.
ntfs_implementation (NTFSImplementation): NTFS implementation being used.
cache_dir (str): Directory for the cache.
default_ignore_list (list): List of default paths to ignore.
fully_ignored_dirs (list): List of directories to fully ignore.
kernel_args (dict): Parsed kernel command line arguments.
repo_server (str): Server address for the repository.
debug_check_for_untracked_files (bool): Flag to check for untracked files for debugging purposes.
"""
self.logger = logging.getLogger("OpengnsysGitLibrary")
self.logger.setLevel(logging.DEBUG)
self.logger.debug(f"Initializing. Cache = {require_cache}, ntfs = {ntfs_implementation}")
self.fs = FilesystemLibrary(ntfs_implementation = ntfs_implementation)
self.disk = DiskLibrary()
#self.ntfs = NTFSLibrary()
#self.repo_server = "192.168.2.1"
self.repo_user = "oggit"
self.repo_image_path = "oggit"
self.ntfs_implementation = ntfs_implementation
self.cache_dir = self._runBashFunction("ogMountCache", [])
# Si no hay cache, se va a crear el .git en el FS directamente
if (not self.cache_dir) and require_cache:
raise RequirementException("Failed to mount cache partition. Cache partition may be missing.")
self.default_ignore_list = [
'/proc/*', # filesystem virtual
'!/proc/.opengnsys-keep',
'/sys/*', # filesystem virtual
'!/sys/.opengnsys-keep',
'/dev/*', # dispositivos -- no soportados por git
'!/dev/.opengnsys-keep',
'/run/*', # info temporal
'!/run/.opengnsys-keep',
'/var/run/*', # info temporal
'!/var/run/.opengnsys-keep',
'/tmp/*', # archivos temporales
'!/tmp/.opengnsys-keep',
'/var/tmp/*', # archivos temporales
'!/var/tmp/.opengnsys-keep',
'/mnt/*', # otros sistemas de archivos
'!/mnt/.opengnsys-keep',
"/lost+found", # lost+found es un directorio especial. Si es necesario lo recreamos
"/$Recycle.Bin",
"/$WinREAgent",
'/PerfLogs'
"/DumpStack.log.tmp",
"/pagefile.sys",
"/swapfile.sys",
"/Recovery",
"/System Volume Information"
]
self.fully_ignored_dirs=[
'proc',
'sys',
'dev',
'run',
'var/run',
'tmp',
'var/tmp',
'mnt',
'$Recycle.Bin',
'$WinREAgent',
'PerfLogs',
'Recovery',
'System Volume Information'
]
"""List of files to rename before commit. This is done for files that may interfere with Git, such as inner git repositories."""
self.rename_list = [
'.git',
'.gitignore',
'.gitattributes'
]
self.kernel_args = self._parse_kernel_cmdline()
self.repo_server = self.kernel_args["ogrepo"]
if not self.repo_server:
self.logger.warning("ogrepo kernel argument wasn't passed, or was empty. Defaulting to oglive.")
self.repo_server = self.kernel_args["oglive"]
"""Add any untracked files the code might have missed.
This is a workaround for a bug and it comes with a significant
performance penalty.
"""
self.debug_check_for_untracked_files = True
if not self.repo_server:
self.logger.error("Git repository not known, we may not have booted correctly? Check ogrepo and oglive kernel arguments.")
else:
self.logger.debug(f"Git repository: {self.repo_server}")
def _is_efi(self):
"""Determina si hemos arrancado con EFI
Returns:
Bool: Si hemos arrancado con EFI
"""
return os.path.exists("/sys/firmware/efi")
def _write_ignore_list(self, base_path):
ignore_file = base_path + "/.gitignore"
self.logger.debug("Creating ignore list: %s", ignore_file)
with open(ignore_file, 'w', encoding='utf-8') as f:
f.write("\n".join(self.default_ignore_list))
f.write("\n")
def _parse_kernel_cmdline(self):
"""Parse the kernel arguments to obtain configuration parameters in Oglive
OpenGnsys passes data in the kernel arguments, for example:
[...] group=Aula_virtual ogrepo=192.168.2.1 oglive=192.168.2.1 [...]
Returns:
dict: Dict of configuration parameters and their values.
"""
params = {}
self.logger.debug("Parsing kernel parameters")
with open("/proc/cmdline", encoding='utf-8') as cmdline:
line = cmdline.readline()
parts = line.split()
for part in parts:
if "=" in part:
key, value = part.split("=")
params[key] = value
self.logger.debug("%i parameters found", len(params))
return params
def _ntfs_secaudit(self, data):
self.logger.debug(f"Saving NTFS metadata for {data['device']}")
metadata_file = os.path.join(data["metadata_dir"], "ntfs_secaudit.txt")
self.logger.debug(f"Unmounting {data['mountpoint']}...")
subprocess.run(["/usr/bin/umount", data["mountpoint"]], check = True)
result = subprocess.run(["/usr/bin/ntfssecaudit", "-b", data["device"]], check=True, capture_output=True)
self.logger.debug(f"Remounting {data['device']} on {data['mountpoint']}...")
if data["mount_fs"] == "fuseblk":
self.logger.debug("Mount was FUSE")
subprocess.run(["/usr/bin/mount", data["device"], data["mountpoint"]], check=True)
else:
self.logger.debug(f"Mount was {data['mount_fs']}")
subprocess.run(["/usr/bin/mount", data["device"], "-t", data["mount_fs"], data["mountpoint"]], check=True)
self.logger.debug("Writing NTFS audit metadata...")
with open(metadata_file + ".new", "w", encoding='utf-8') as meta:
meta.write(result.stdout.decode('utf-8'))
os.rename(metadata_file + ".new", metadata_file)
def _ntfs_restore_secaudit(self, path):
self.logger.debug("Restoring NTFS metadata for %s", path)
if not self.fs.is_filesystem(path):
self.logger.error("Path %s is not a filesystem!")
return
if self.fs.filesystem_type(mountpoint = path) != "ntfs":
self.logger.error("Path %s is not NTFS!", path)
return
metadata_file = os.path.join(path,".opengnsys-metadata", "ntfs_secaudit.txt")
secaudit_data = ""
self.logger.debug("Reading audit metadata from %s...", metadata_file)
with open(metadata_file, "rb") as meta:
secaudit_data = meta.read()
self.logger.debug("Read %i bytes", len(secaudit_data))
device = self.fs.find_device(path)
mountdata = self.fs.temp_unmount(path)
self.logger.info("Restoring secaudit data...")
result = subprocess.run(["/usr/bin/ntfssecaudit", "-se", device], check=False, capture_output=True, input=secaudit_data)
if result.returncode == 0:
self.logger.debug("Completed, return code %i", result.returncode)
self.logger.debug("STDOUT: %s", result.stdout)
self.logger.debug("STDERR: %s", result.stderr)
else:
# An error return code can be returned for reasons like missing files, so we deal this
# as non-fatal.
self.logger.error("Completed, return code %i", result.returncode)
self.logger.error("STDOUT: %s", result.stdout)
self.logger.error("STDERR: %s", result.stderr)
self.fs.temp_remount(mountdata)
def _create_filesystems(self, fs_data, fs_map):
for mountpoint in fs_map:
dest_device = fs_map[mountpoint]
data = fs_data[mountpoint]
self.fs.create_filesystem(device = dest_device, fs_type = data["type"], fs_uuid = data["uuid"])
def _grub_install(self, root_directory, boot_device):
"""
Install GRUB boot loader on the specified boot device.
This method checks for the presence of GRUB 2.x and GRUB 1.x installers
and attempts to install the appropriate version. If neither installer is
found, a RequirementException is raised.
Args:
boot_device (str): The device on which to install the GRUB boot loader (e.g., '/dev/sda').
root_directory (str): The root directory where GRUB files should be installed.
Raises:
RequirementException: If neither GRUB 2.x nor GRUB 1.x installer is found.
subprocess.CalledProcessError: If the GRUB installation command fails.
Logs:
Debug information about the installation process, including the return code,
stdout, and stderr of the GRUB installation command.
"""
if os.path.exists("/usr/sbin/grub2-install"):
self.logger.debug("Installing Grub 2.x (NOT IMPLEMENTED)")
elif os.path.exists("/usr/sbin/grub-install"):
self.logger.debug("Installing Grub 1.x")
result = subprocess.run(["/usr/sbin/grub-install", "--force", "--root-directory", root_directory, boot_device], check = True, capture_output=True)
self.logger.debug(f"retorno: {result.returncode}")
self.logger.debug(f"stdout: {result.stdout}")
self.logger.debug(f"stderr: {result.stderr}")
else:
raise RequirementException("Couldn't find /usr/sbin/grub2-install or /usr/sbin/grub-install")
def _get_system_uuid(self):
self.logger.debug("Obtaining system UUID...")
result = subprocess.run(["/usr/sbin/dmidecode", "-s", "system-uuid"], check=True, capture_output=True, encoding='utf-8')
uuid = result.stdout.strip()
self.logger.debug("UUID is %s", uuid)
return uuid
def _efi_install(self, root_directory, config_name = None):
"""
Install EFI data on the specified boot device.
Copies EFI data from a metadata directory within the root directory
to the specified boot device. It logs the process of installing the EFI data.
Boot device is detected automatically
Args:
root_directory (str): The root directory containing the metadata and EFI data.
Raises:
shutil.Error: If an error occurs during the copying of the EFI data.
"""
boot_device = self.fs.find_boot_device()
boot_mount = self.fs.find_mountpoint(boot_device)
self.logger.info(f"Installing EFI files in {boot_mount}")
meta_dir = os.path.join(root_directory, ".opengnsys-metadata")
efi_files_dir = os.path.join(meta_dir, "efi_data")
if os.path.exists(efi_files_dir):
self.logger.debug("Copying EFI files")
shutil.copytree(efi_files_dir, boot_mount, dirs_exist_ok=True)
else:
self.logger.error("No general EFI files found")
sys_efi_files_dir = ""
if config_name:
self.logger.debug("Custom EFI config %s specified...", config_name)
sys_efi_files_dir = os.path.join(meta_dir, f"efi_data.{config_name}")
else:
uuid = self._get_system_uuid()
self.logger.debug("Checking if we have system-specific EFI data for system id %s...", uuid)
sys_efi_files_dir = os.path.join(meta_dir, f"efi_data.{uuid}")
if os.path.exists(sys_efi_files_dir):
self.logger.info("This system has specific EFI data, overriding default...")
shutil.copytree(sys_efi_files_dir, boot_mount, dirs_exist_ok=True)
else:
self.logger.debug("No system-specific EFI data.")
def _efi_copy(self, root_directory, system_specific = False, config_name = None):
meta_dir = os.path.join(root_directory, ".opengnsys-metadata")
boot_device = self.fs.find_boot_device()
boot_mount = self.fs.find_mountpoint(boot_device)
efi_files_dir = ""
if not system_specific:
self.logger.debug("Copying default EFI data")
efi_files_dir = os.path.join(meta_dir, "efi_data")
if os.path.exists(efi_files_dir):
shutil.rmtree(efi_files_dir)
shutil.copytree(boot_mount, efi_files_dir)
else:
if config_name:
self.logger.debug("Copying EFI data for preset %s", config_name)
efi_files_dir = os.path.join(meta_dir, f"efi_data.{config_name}")
else:
uuid = self._get_system_uuid()
self.logger.debug("Copying EFI data for system %s", uuid)
efi_files_dir = os.path.join(meta_dir, f"efi_data.{uuid}")
# TODO: On Windows we can probably get away with just copying:
# EFI/Microsoft/Boot/BCD*
if os.path.exists(efi_files_dir):
shutil.rmtree(efi_files_dir)
shutil.copytree(boot_mount, efi_files_dir)
def _delete_contents(self, path):
self.logger.info(f"Deleting contents of {path}")
for filename in os.listdir(path):
file_path = os.path.join(path, filename)
try:
self.logger.debug(f"Deleting {file_path}")
if os.path.isfile(file_path) or os.path.islink(file_path):
os.unlink(file_path)
elif os.path.isdir(file_path):
shutil.rmtree(file_path)
except OSError as e:
self.logger.warning('Failed to delete %s. Error: %s', file_path, e)
def _runBashFunction(self, function, arguments):
"""
Executes an OpenGnsys bash function with given arguments.
This method creates a temporary bash script that sources all `.lib` files in a specific directory,
writes the specified bash function and its arguments to the script, makes the script executable,
and then runs the script. The output and errors from the script execution are captured and logged.
This is a temporary migration convenience function, it won't be present once the rest of the
code is migrated to Python.
Args:
function (str): The name of the bash function to execute.
arguments (list): A list of arguments to pass to the bash function.
Returns:
str: The standard output from the executed bash function.
Logs:
- Debug information about the bash function and arguments being run.
- The path of the temporary file created.
- The command being executed.
- The standard output and standard error from the script execution.
"""
# Create a temporary file
self.logger.debug(f"Running bash function: {function} {arguments}")
with tempfile.NamedTemporaryFile(mode='w', delete=False) as temp_file:
temp_file.write("#!/bin/bash\n")
temp_file.write("for lib in /opt/opengnsys/client/lib/engine/bin/*.lib ; do\n")
temp_file.write(" source $lib\n")
temp_file.write("done\n")
#temp_file.write("source /opt/oglive/rootfs/opt/opengnsys/lib/engine/bin/Cache.lib")
#temp_file.write("source /opt/oglive/rootfs/opt/opengnsys/lib/engine/bin/Git.lib")
temp_file.write(f"{function} \"$@\"\n")
# Make the temporary file executable
os.chmod(temp_file.name, 0o755)
self.logger.debug(f"File: {temp_file.name}")
# Run the temporary file
command = [temp_file.name] + arguments
self.logger.debug(f"Running: {command} {arguments}")
result = subprocess.run(command, shell=False, capture_output=True, text=True, check=True)
output = result.stdout.strip()
self.logger.debug(f"STDOUT: {output}")
self.logger.debug(f"STDERR: {result.stderr}")
return output
def _getOgRepository(self, name):
return f"{self.repo_user}@{self.repo_server}:{self.repo_image_path}/{name}.git"
def _ogGetOsType(self):
return "Linux"
def _get_repo_metadata(self, repo):
"""Obtiene metadatos de un repositorio remoto sin clonar el repo entero.
Esto resuelve el problema de tener metadatos del sistema de archivos en el propio repo,
sobre los que necesitamos actuar antes de poder clonarlo.
Args:
repo (str): Nombre del repositorio (linux, windows, mac)
Returns:
dict: Clave/valor con contenidos de filesystems.json y metadata.json
"""
results = {}
tempdir = tempfile.TemporaryDirectory()
repo_url = self._getOgRepository(repo)
wanted_files = ["filesystems.json", "metadata.json"]
self.logger.debug(f"Cloning metadata for repository {repo_url}")
result = subprocess.run(["git", "archive", "--remote", repo_url, "HEAD:.opengnsys-metadata/"], capture_output=True, check=True)
tar_data = result.stdout
with libarchive.memory_reader(tar_data) as metadata:
self.logger.debug(f"Archive: {metadata}")
for entry in metadata:
self.logger.debug(entry)
if entry.pathname in wanted_files:
self.logger.debug(f"Extracting {entry}")
data = bytearray()
for block in entry.get_blocks():
data = data + block
text = data.decode('utf-8')
results[entry.pathname] = json.loads(text)
self.logger.debug(f"Contents: {text}")
return results
def _create_metadata(self, path, initial_creation=False):
"""Calculate metadata for a filesystem
Here we traverse the entire filesystem to:
1. Find empty directories and fill them so that git preserves them.
2. Obtain all ACLs.
3. Obtain all extended attributes.
4. Rename .gitignore files.
5. Find mount points and obtain information about them.
6. Additional metadata, such as the boot type.
7. NTFS secaudit, which must be performed at the end of the process because the filesystem needs to be unmounted.
For empty files, we generate a list that we can use later to delete the
.opengnsys-keep files. This is done because there are cases where an unexpected
file can cause problems. For example, sshfs by default refuses to mount things
in a directory that contains files.
We rename the .gitignore files in subdirectories because git would apply them
to our process.
We write all data in JSON to ensure there are no issues with spaces, line endings,
or other special characters. This also ensures one entry per line, which we can use
to speed up performance by using git to get the difference between a previous state
and the current one.
Args:
path (str): Base path of the filesystem
initial_creation (bool): This is being called from the initial repository creation
"""
self.logger.info(f"Creating metadata for {path}")
return_data = { 'symlinks': [] }
seen_roots = {}
filesystems_data = {}
ntfs_secaudit_list = []
path_norm = os.path.normpath(path)
git_dir = os.path.normpath(os.path.join(path, ".git"))
meta_dir = os.path.join(path, ".opengnsys-metadata")
if not os.path.exists(meta_dir):
os.mkdir(meta_dir, mode=0o700)
# https://jsonlines.org/
metadata_file = open(os.path.join(meta_dir, "metadata.json.new"), "w", encoding='utf-8')
metadata = {}
metadata["efi_boot"] = self._is_efi()
if self._is_efi():
# If we're doing the initial repository creation, then we're creating the initial,
# non-system-specific EFI data.
self._efi_copy(root_directory=path, system_specific=not initial_creation)
empties_file = open(os.path.join(meta_dir, "empty_directories.jsonl.new"), "w", encoding='utf-8')
specials_file = open(os.path.join(meta_dir, "special_files.jsonl.new"), "w", encoding='utf-8')
acls_file = open(os.path.join(meta_dir, "acls.jsonl.new"), "w", encoding='utf-8')
perms_file = open(os.path.join(meta_dir, "unix_permissions.jsonl.new"), "w", encoding='utf-8')
xattrs_file = open(os.path.join(meta_dir, "xattrs.jsonl.new"), "w", encoding='utf-8')
renamed_file = open(os.path.join(meta_dir, "renamed.jsonl.new"), "w", encoding='utf-8')
filesystems_file = open(os.path.join(meta_dir, "filesystems.json.new"), "w", encoding='utf-8')
partitions_file = open(os.path.join(meta_dir, "partitions.json.new"), "w", encoding='utf-8')
partitions_file.write(self.disk.get_disk_json_data(self.fs.find_device(mountpoint=path)))
ntfs = False
for root, subdirs, files in os.walk(path):
#print(f"ROOT: {root}, subdirs: {subdirs}, files: {files}")
root_norm = os.path.normpath(root)
root_rel = root[len(path):None]
#print(f"A: {root}")
#print(f"B: {git_dir}")
if root_norm.startswith(git_dir):
self.logger.debug(f"Ignoring git directory: {root_norm}")
# No examinamos el .git del raíz
continue
if root_rel.startswith("/.opengnsys-metadata"):
self.logger.debug("Ignoring metadata directory: %s", root_norm)
continue
if not root in seen_roots:
seen_roots[root]=1
mount_found = False
if root in self.fs.mounts:
mount = self.fs.mounts[root]
root_path_rel = root[len(path):None]
if len(root_path_rel) == 0 or root_path_rel[0] != "/":
root_path_rel = "/" + root_path_rel
pr = blkid.Probe()
pr.set_device(mount['device'])
pr.enable_superblocks(True)
pr.set_superblocks_flags(blkid.SUBLKS_TYPE | blkid.SUBLKS_USAGE | blkid.SUBLKS_UUID | blkid.SUBLKS_UUIDRAW | blkid.SUBLKS_LABELRAW)
pr.do_safeprobe()
fs = mount['type']
orig_fs = fs
if mount['type'] == 'fuseblk':
fs = pr["TYPE"].decode('utf-8')
self.logger.warning(f"FUSE mount detected, replacing type with blkid detected type: {fs}")
self.logger.warning("FUSE has lower performance, a native mount is recommended.")
filesystems_data[root_path_rel] = {}
filesystems_data[root_path_rel]['type'] = fs
filesystems_data[root_path_rel]['size'] = pr.size
filesystems_data[root_path_rel]['sectors'] = pr.sectors
filesystems_data[root_path_rel]['sector_size'] = pr.sector_size
filesystems_data[root_path_rel]['uuid'] = str(pr["UUID"], 'utf-8')
filesystems_data[root_path_rel]['uuid_raw'] = str(base64.b64encode(pr["UUID_RAW"]), 'utf-8')
filesystems_data[root_path_rel]["part_uuid"] = self.disk.get_partition_uuid(mount['device'])
filesystems_data[root_path_rel]["part_type"] = self.disk.get_partition_type(mount['device'])
filesystems_data[root_path_rel]["disk_uuid"] = self.disk.get_disk_uuid(mount['device'])
# TODO: Esto de momento no funciona -- LABEL no se encuentra
#filesystems_data[root_path_rel]['label'] = pr["LABEL"]
#filesystems_data[root_path_rel]['label_raw'] = pr["LABEL_RAW"]
self.logger.debug("Filesystem: {0}, relative {1}. Type {2}, size {3}, UUID {4}".format(root, root_path_rel, mount['type'], pr.size, str(pr["UUID"], 'utf-8')))
if fs == 'ntfs' or fs == 'ntfs3':
self.logger.info("NTFS detected, will do a secaudit")
ntfs_secaudit_list.append({
'device' : mount['device'],
'mountpoint' : root,
'relative_path' : root_path_rel,
'mount_fs' : orig_fs,
'metadata_dir' : meta_dir
})
#self._ntfs_secaudit(root, os.path.join(meta_dir, "ntfs_secaudit.txt"))
ntfs = True
else:
if root_norm == path_norm:
errstr = f"""We've failed to find metadata for the root filesystem!
Root: {root}
FS data: """
errstr = errstr + json.dumps(self.fs.mounts, indent=4)
self.logger.error(errstr)
raise RuntimeError(errstr)
#self.logger.debug(f"Root rel: {root_rel}")
if len(files) == 1 and files[0] == ".opengnsys-keep":
# Si ya tenemos un .opengnsys-keep, lo ignoramos
files.pop(0)
# Ignoramos todo el contenido en directorios como /dev, pero el directorio
# debe existir.
if (len(subdirs) == 0 and len(files) == 0) or (root_rel in self.fully_ignored_dirs):
keep_file = os.path.join(root, ".opengnsys-keep")
Path(keep_file).touch(mode=0o644, exist_ok = True)
self.logger.debug(f"Empty directory: {root}")
#root_rel = root[len(path):None]
empties_file.write(json.dumps({"dir" : root_rel}) + "\n")
for file in files:
full_path = os.path.join(root,file)
full_path_rel = full_path[len(path):None]
# Relative path can't start with a /, git will take it as an
# absolute path pointing to /, and not a file within the repo.
while full_path_rel[0] == '/':
full_path_rel = full_path_rel[1:None]
#self.logger.debug(f"Checking {full_path}:")
if not ntfs and os.path.isfile(full_path) and not os.path.islink(full_path):
# docs: https://pylibacl.k1024.org/module.html#posix1e.ACL.to_any_text
# Git doesn't preserve setuid, we've got to store it separately
stat_data = os.stat(full_path)
perms_json = json.dumps({
"path" : full_path_rel,
"mode" : stat_data.st_mode,
"uid" : stat_data.st_uid,
"gid" : stat_data.st_gid
})
xattrs = str(xattr.get_all(full_path))
acls = posix1e.ACL(file=full_path)
xattrs_json = json.dumps({"file": full_path_rel, "xattrs" : xattrs})
#acls_json = json.dumps({"file": full_path_rel, "acl" : str(acls.to_any_text())})
# __getstate__ nos permite exportar el estado entero de la ACL
# TODO: posiblemente formato de texto mejor?
acl_data = str(base64.b64encode(acls.__getstate__()), 'utf-8')
acls_json = json.dumps({"file": full_path_rel, "acl" : acl_data })
perms_file.write(perms_json + "\n")
xattrs_file.write(xattrs_json + "\n")
acls_file.write(acls_json + "\n")
#self.logger.debug(f"Checking if {file} is ignored")
if os.path.exists(full_path):
if not os.path.islink(full_path):
stat_data = os.stat(full_path)
stat_mode = stat_data.st_mode
if not (stat.S_ISDIR(stat_mode) or stat.S_ISREG(stat_mode) or stat.S_ISLNK(stat_mode)):
# Si no es un directorio o un archivo, conservamos los datos necesarios para recrearlo
stat_json_data = {
"file" : full_path_rel,
"mode" : stat_mode,
"uid" : stat_data.st_uid,
"gid" : stat_data.st_gid,
"rdev" : stat_data.st_rdev
}
stat_json = json.dumps(stat_json_data)
specials_file.write(stat_json + "\n")
# Git falla al encontrarse con archivos especiales como dispositivos. De momento debemos
# eliminarlos.
os.unlink(full_path)
if os.path.islink(full_path):
self.logger.debug(f"Symlink: {full_path_rel}")
return_data['symlinks'].append(full_path_rel)
if os.path.isfile(full_path) and file in self.rename_list and root != path:
# Process this last so that all the metadata references the real names.
self.logger.debug(f"Found file to rename: {full_path}")
renamed_file_path = full_path + "-opengnsys-renamed"
renamed_file_path_rel = full_path_rel + "-opengnsys-renamed"
renamed_json = json.dumps({"path": full_path_rel, "renamed" : renamed_file_path_rel})
renamed_file.write(renamed_json + "\n")
os.rename(full_path, renamed_file_path)
for subdir in subdirs:
full_path = os.path.join(root, subdir)
full_path_rel = full_path[len(path):None]
# Relative path can't start with a /, git will take it as an
# absolute path pointing to /, and not a file within the repo.
while full_path_rel[0] == '/':
full_path_rel = full_path_rel[1:None]
if os.path.isdir(full_path) and subdir in self.rename_list and root != path:
self.logger.debug(f"Found directory to rename: {full_path}")
renamed_dir_path = full_path + "-opengnsys-renamed"
renamed_dir_path_rel = full_path_rel + "-opengnsys-renamed"
renamed_json = json.dumps({"path": full_path_rel, "renamed" : renamed_dir_path_rel})
renamed_file.write(renamed_json + "\n")
os.rename(full_path, renamed_dir_path)
if not ntfs and os.path.isdir(full_path) and not os.path.islink(full_path):
stat_data = os.stat(full_path)
perms_json = json.dumps({
"path" : full_path_rel,
"mode" : stat_data.st_mode,
"uid" : stat_data.st_uid,
"gid" : stat_data.st_gid
})
xattrs = str(xattr.get_all(full_path))
acls = posix1e.ACL(file=full_path)
xattrs_json = json.dumps({"file": full_path_rel, "xattrs" : xattrs})
# __getstate__ nos permite exportar el estado entero de la ACL
acl_data = str(base64.b64encode(acls.__getstate__()), 'utf-8')
acls_json = json.dumps({"file": full_path_rel, "acl" : acl_data })
perms_file.write(perms_json + "\n")
xattrs_file.write(xattrs_json + "\n")
acls_file.write(acls_json + "\n")
self.logger.debug("Finishing...")
if len(filesystems_data) < 1:
self.logger.error("Filesystems data doesn't contain anything, this is probably a bug!")
filesystems_file.write(json.dumps(filesystems_data, indent=4) + "\n")
metadata_file.write(json.dumps(metadata, indent=4) + "\n")
empties_file.close()
specials_file.close()
xattrs_file.close()
acls_file.close()
perms_file.close()
renamed_file.close()
filesystems_file.close()
metadata_file.close()
partitions_file.close()
os.rename(os.path.join(meta_dir, "empty_directories.jsonl.new"), os.path.join(meta_dir, "empty_directories.jsonl"))
os.rename(os.path.join(meta_dir, "special_files.jsonl.new"), os.path.join(meta_dir, "special_files.jsonl"))
os.rename(os.path.join(meta_dir, "acls.jsonl.new"), os.path.join(meta_dir, "acls.jsonl"))
os.rename(os.path.join(meta_dir, "unix_permissions.jsonl.new"), os.path.join(meta_dir, "unix_permissions.jsonl"))
os.rename(os.path.join(meta_dir, "xattrs.jsonl.new"), os.path.join(meta_dir, "xattrs.jsonl"))
os.rename(os.path.join(meta_dir, "renamed.jsonl.new"), os.path.join(meta_dir, "renamed.jsonl"))
os.rename(os.path.join(meta_dir, "filesystems.json.new"), os.path.join(meta_dir, "filesystems.json"))
os.rename(os.path.join(meta_dir, "metadata.json.new"), os.path.join(meta_dir, "metadata.json"))
os.rename(os.path.join(meta_dir, "partitions.json.new"), os.path.join(meta_dir, "partitions.json"))
self.logger.debug("Processing pending NTFS secaudits...")
for audit in ntfs_secaudit_list:
self._ntfs_secaudit(audit)
self.logger.debug("Metadata updated")
return return_data
def _restore_metadata(self, path, destructive_only=False, set_device_uuids=False):
"""Restore the metadata created by _create_metadata
Args:
path (str): Destination path
destructive_only (bool): Only restore what is modified during a commit
Notes:
Git does not handle device or socket type files correctly. Therefore,
we must save data about them before the commit, and delete them before
git can see them and get confused.
destructive_only=True only restores the metadata that we modify
in the real file system before the commit. This is done to leave the
file system in the same state it had before the commit.
"""
self.logger.debug("Initializing")
self.logger.info(f"Restoring metadata in {path}")
meta_dir = os.path.join(path, ".opengnsys-metadata")
if not os.path.exists(meta_dir):
self.logger.error(f"Metadata directory not found: {meta_dir}")
return
if set_device_uuids:
# Windows boot manager uses partition UUIDs in at least some cases. One option to make booting work
# is to store all such UUIDs and restore them on the destination machine.
self.logger.info("Processing filesystems.json")
with open(os.path.join(meta_dir, "filesystems.json"), "r", encoding='utf-8') as filesystems_file:
filesystems = json.loads(filesystems_file.read())
disk_device = self.fs.find_device(path)
if "disk_uuid" in filesystems["/"]:
self.logger.info("Restoring device and partition UUIDs on %s", disk_device)
prev_uuid = self.disk.get_disk_uuid(disk_device)
new_uuid = filesystems["/"]["disk_uuid"]
if new_uuid != prev_uuid:
self.logger.info("Setting disk UUID to %s (was %s)", new_uuid, prev_uuid)
self.disk.set_disk_uuid(disk_device, new_uuid)
else:
self.logger.info("Not setting disk UUID, already was correct")
prev_uuid = self.disk.get_partition_uuid(disk_device)
new_uuid = filesystems["/"]["part_uuid"]
if new_uuid != prev_uuid:
self.logger.info("Setting partition UUID to %s (was %s)", new_uuid, prev_uuid)
self.disk.set_partition_uuid(disk_device, new_uuid)
else:
self.logger.info("Not setting partition UUID, already was correct")
prev_uuid = self.disk.get_partition_type(disk_device)
new_uuid = filesystems["/"]["part_type"]
if new_uuid != prev_uuid:
self.logger.info("Setting partition type to %s (was %s)", new_uuid, prev_uuid)
self.disk.set_partition_type(disk_device, new_uuid)
else:
self.logger.info("Not setting partition type, already was correct")
self.logger.info("Done setting disk UUIDs")
else:
self.logger.warning("Partition UUID data not present in metadata, skipping")
# Process renames first so that all the filenames are as they should be
# for the following steps.
self.logger.info("Processing renamed.jsonl")
with open(os.path.join(meta_dir, "renamed.jsonl"), "r", encoding='utf-8') as gitignores_file:
for line in gitignores_file:
if line.isspace():
self.logger.debug("Empty line, skipping")
continue
#self.logger.debug(f"Line: {line}")
renamed_data = json.loads(line)
orig_file = renamed_data['path']
renamed_file = renamed_data['renamed']
if renamed_file.startswith("/"):
renamed_file = renamed_file[1:]
orig_file_path = os.path.join(path, orig_file)
renamed_file_path = os.path.join(path, renamed_file)
#self.logger.debug(f"Checking: {renamed_file_path}")
if os.path.exists(renamed_file_path):
self.logger.debug(f"Renaming {renamed_file_path} => {orig_file_path}")
os.rename(renamed_file_path, orig_file_path)
else:
if os.path.exists(orig_file_path):
self.logger.warning(f"Can't rename {renamed_file_path} => {orig_file_path}: Already renamed")
else:
self.logger.warning(f"Can't rename {renamed_file_path} => {orig_file_path}: Source file not found")
if not destructive_only:
self.logger.info("Processing empty_directories.jsonl")
with open(os.path.join(meta_dir, "empty_directories.jsonl"), "r", encoding='utf-8') as empties_file:
for line in empties_file:
if line.isspace():
self.logger.debug("Empty line, skipping")
continue
empties_data = json.loads(line)
empty_dir = empties_data['dir']
# os.path.join no acepta /foo como una ruta relativa para concatenar
if empty_dir.startswith("/"):
empty_dir = empty_dir[1:]
empty_dir_keep = os.path.join(path, empty_dir, ".opengnsys-keep")
self.logger.debug(f"Empty directory: {empty_dir}")
full_empty_dir = os.path.join(path, empty_dir)
Path(full_empty_dir).mkdir(parents=True, exist_ok=True)
if os.path.exists(empty_dir_keep):
self.logger.debug(f"Deleting: {empty_dir_keep}")
os.unlink(empty_dir_keep)
if not destructive_only:
self.logger.info("Processing unix_permissions.jsonl")
with open(os.path.join(meta_dir, "unix_permissions.jsonl"), "r", encoding='utf-8') as acls_file:
for line in acls_file:
if line.isspace():
self.logger.debug("Empty line, skipping")
continue
perms_data = json.loads(line)
#self.logger.debug(f"Data: {acls_data}")
perms_path = perms_data['path']
file_perms = perms_data['mode']
file_uid = perms_data['uid']
file_gid = perms_data['gid']
if perms_path.startswith("/"):
perms_path = perms_path[1:]
perms_file_path = os.path.join(path, perms_path)
if os.path.exists(perms_file_path):
self.logger.debug(f"Applying permissions {file_perms}, owner {file_uid}, group {file_gid} to {perms_file_path}")
# chown clears suid bits, must come first
os.chown(perms_file_path, file_uid, file_gid)
os.chmod(perms_file_path, file_perms)
else:
self.logger.warning(f"Can't apply permissions to {perms_file_path}, file doesn't exist.")
if not destructive_only:
self.logger.info("Processing acls.jsonl")
with open(os.path.join(meta_dir, "acls.jsonl"), "r", encoding='utf-8') as acls_file:
for line in acls_file:
if line.isspace():
self.logger.debug("Empty line, skipping")
continue
# docs: https://pylibacl.k1024.org/module.html#posix1e.ACL.to_any_text
acls_data = json.loads(line)
#self.logger.debug(f"Data: {acls_data}")
acl_file = acls_data['file']
acl_text = base64.b64decode(bytes(acls_data['acl'], 'utf-8'))
if acl_file.startswith("/"):
acl_file = acl_file[1:]
acl_file_path = os.path.join(path, acl_file)
#self.logger.debug(f"TXT: {acl_text}" )
acl = posix1e.ACL(data = acl_text)
#self.logger.debug(f"ACL: {acl_text}" )
self.logger.debug(f"Applying ACL to {acl_file_path}")
if os.path.exists(acl_file_path):
acl.applyto(acl_file_path)
if not destructive_only:
self.logger.info("Processing xattrs.jsonl")
with open(os.path.join(meta_dir, "xattrs.jsonl"), "r", encoding='utf-8') as xattrs_file:
for line in xattrs_file:
if line.isspace():
self.logger.debug("Empty line, skipping")
continue
xattrs_data = json.loads(line)
xattrs_file = xattrs_data['file']
if xattrs_file.startswith("/"):
xattrs_file = xattrs_file[1:]
xattrs_file_path = os.path.join(path, xattrs_file)
#self.logger.debug(f"Line: {line}")
self.logger.info("Processing special_files.jsonl")
with open(os.path.join(meta_dir, "special_files.jsonl"), "r", encoding='utf-8') as specials_file:
for line in specials_file:
if line.isspace():
self.logger.debug("Empty line, skipping")
continue
#self.logger.debug(f"Line: {line}")
data = json.loads(line)
filename = data['file']
full_path = os.path.join(path, filename)
file_mode = data['mode']
try:
if stat.S_ISSOCK(file_mode):
self.logger.debug(f"Restoring socket {filename}")
os.mknod(full_path, mode = file_mode)
elif stat.S_ISFIFO(file_mode):
self.logger.debug(f"Restoring FIFO {filename}")
os.mknod(full_path, mode = file_mode)
elif stat.S_ISBLK(file_mode):
self.logger.debug(f"Restoring block device {filename}")
os.mknod(full_path, mode = file_mode, device = data['rdev'])
elif stat.S_ISCHR(file_mode):
self.logger.debug(f"Restoring character device {filename}")
os.mknod(full_path, mode = file_mode, device = data['rdev'])
else:
self.logger.warning(f"Unknown file type for {filename}: {file_mode}")
# chown clears suid bit, so even though it's redundant in most cases and already
# done above, set the full perms on the file again anyway.
os.chown(full_path, data['uid'], data['gid'])
os.chmod(full_path, file_mode)
except FileExistsError as exists:
self.logger.debug(f"Exists: {full_path}")
except OSError as oserr:
self.logger.warning(f"Failed to create special file {full_path}: Error {oserr.errno}: {oserr.strerror}")
self.logger.info("Metadata restoration completed.")
def _configure_repo(self, repo):
"""
#ogGitConfig
#@brief Configura usuario y permisos de git.
#@return
"""
self.logger.debug(f"Configuring repository {repo}")
repo.config_writer().add_value("user", "name", "OpenGnsys").release()
repo.config_writer().add_value("user", "email", "OpenGnsys@opengnsys.com").release()
repo.config_writer().add_value("core", "filemode", "false").release()
def initRepo(self, device, repo_name):
"""
Initialize a Git repository on a specified device.
This method mounts the device, initializes a Git repository, configures it,
and sets up a remote origin. It handles both NTFS and other filesystem types.
Args:
device (str): The device path to initialize the repository on.
repo_name (str): The name of the repository to be created.
Raises:
RuntimeError: If the .git directory is of an unrecognized file type.
Notes:
- The method mounts the device to /mnt/{device_basename}.
- The .git directory is created in a cache partition and symlinked to the device.
- The repository is initialized and configured, and an initial commit is made.
- The method sets up a remote origin and pushes the initial commit.
"""
path = self.fs.ensure_mounted(device)
self.logger.info("Initializing repository: %s", path)
git_dir = os.path.join(path, ".git")
real_git_dir = os.path.join(self.cache_dir, f"git-{repo_name}")
if os.path.exists(real_git_dir):
self.logger.debug(f"Removing existing repository {real_git_dir}")
shutil.rmtree(real_git_dir)
if os.path.exists(git_dir) or os.path.islink(git_dir):
if os.path.islink(git_dir) or os.path.isfile(git_dir):
self.logger.debug(f"Removing gitdir: {git_dir}")
os.unlink(git_dir)
elif os.path.isdir(git_dir):
# We want to host git in the cache partition, .git shouldn't be a directory under the
# filesystem.
self.logger.warning(f"Removing directory-type gitdir, this should be a link or a file: {git_dir}")
shutil.rmtree(git_dir)
else:
raise RuntimeError("Git dir is of an unrecognized file type!")
# if not os.path.exists(git_dir):
#self.logger.debug("Creating " + git_dir)
#with open(git_dir, "w") as git_dir:
# git_dir.write(f"gitdir: {real_git_dir}\n")
self.logger.debug(f"Initializing repo in cache at {real_git_dir}")
#os.mkdir(real_git_dir)
#with git.Repo.init(real_git_dir, bare=True) as temprepo:
# self._configure_repo(temprepo)
os.symlink(real_git_dir, git_dir)
repo = git.Repo.init(path)
self._configure_repo(repo)
self._write_ignore_list(path)
metadata_ret = self._create_metadata(path, initial_creation=True)
self.logger.debug(f"Building list of files to add from path {path}")
add_files = []
# Nota: repo.index.add(".") agrega archivos pero git después cree que
# no han sido agregados?
for ent in os.listdir(path):
if repo.ignored(ent) or ent == ".git" or ent == "." or ent == "..":
self.logger.debug(f"Ignoring: {ent}")
elif ent in self.fully_ignored_dirs:
# FIXME: repo.index.add tiene un bug que ignora Force=true
#repo.index.add("dev/.opengnsys-keep")
self.logger.debug("Fully ignored dir: {ent}")
add_files.append(f"{ent}/.opengnsys-keep")
else:
self.logger.debug(f"Adding: {ent}")
add_files.append(ent)
#repo.index.add(ent, force=False)
for lnk in metadata_ret['symlinks']:
self.logger.debug(f"Adding symlink: {lnk}")
add_files.append(lnk)
add_files_new = []
for file in add_files:
if os.path.exists(os.path.join(path, file)):
add_files_new = add_files_new + [file]
else:
self.logger.warning(f"We wanted to add {file} but it wasn't found. Please debug.")
add_files = add_files_new
self.logger.info("Adding %d files", len(add_files))
with OperationTimer(self, "add all files"):
#subprocess.run(["git", "add"] + add_files, check=True, cwd=path)
repo.index.add(items = add_files, force=True )
# FIXME: This shouldn't actually happen, we shouldn't have any untracked files
if self.debug_check_for_untracked_files:
self.logger.info("Checking for untracked files...")
with OperationTimer(self, "add untracked files"):
untracked_list = repo.untracked_files
if untracked_list:
self.logger.warning(f"Untracked files: {untracked_list}")
self.logger.warning("Adding %d untracked files", len(untracked_list))
#repo.index.add(items = untracked_list, force=True)
subprocess.run(["git", "add"] + untracked_list, check=True, cwd=path)
self.logger.info("Committing")
repo.index.commit("Initial commit")
# Restaurar cosas modificadas para git
self._restore_metadata(path, destructive_only=True, set_device_uuids=False)
#self.logger.debug("Commit done, will unmount now")
#self._umount_device(device)
if self.fs.filesystem_type(mountpoint = path) == "ntfs":
self.fs.unload_ntfs()
repo_url = self._getOgRepository(repo_name)
self.logger.debug(f"Creating remote origin: {repo_url}")
if "origin" in repo.remotes:
repo.delete_remote("origin")
origin = repo.create_remote("origin", repo_url)
self.logger.debug("Fetching origin")
origin.fetch()
# repo.create_head
# repo.heads.master.set_tracking_branch(origin.refs.master)
self.logger.info("Uploading to ogrepository")
repo.git.push("--set-upstream", "origin", repo.head.ref, "--force")
def cloneRepo(self, repo_name, destination, boot_device):
"""
Clones a repository to a specified destination and sets up the bootloader.
Args:
repo_name (str): The name of the repository to clone.
destination (str): The destination directory where the repository will be cloned.
boot_device (str): The boot device to install the bootloader.
Raises:
RequirementException: If the repository metadata is incorrect or if the repository's
boot system is incompatible with the current system.
Logs:
Info: Logs the start of the cloning process.
Debug: Logs the repository URL, EFI compatibility of the repository and the system.
"""
self.logger.info(f"Cloning repo: {repo_name} => {destination}")
repo_url = self._getOgRepository(repo_name)
real_git_dir = os.path.join(self.cache_dir, f"git-{repo_name}")
if os.path.exists(real_git_dir):
self.logger.debug(f"Removing existing repository {real_git_dir}")
shutil.rmtree(real_git_dir)
self.logger.debug(f"URL: {repo_url}")
all_metadata = self._get_repo_metadata(repo_name)
metadata = all_metadata["metadata.json"]
fs_data = all_metadata["filesystems.json"]
if len(fs_data.keys()) == 0:
raise RequirementException("El repositorio contiene metadatos incorrectos. Falta información sobre sistemas de archivos en filesystems.json.")
if not "efi_boot" in metadata:
raise RequirementException("El repositorio contiene metadatos incorrectos. Falta información de metadatos en metadata.json")
repo_is_efi = metadata["efi_boot"]
efi = self._is_efi()
self.logger.debug(f"Repository made for EFI: {repo_is_efi}")
self.logger.debug(f"Our system using EFI : {efi}")
if repo_is_efi != efi:
raise RequirementException("Repositorio usa sistema de arranque incompatible con sistema actual")
self.fs.unmount(device = destination)
filesystem_map = {"/" : destination}
self._create_filesystems(fs_data, filesystem_map)
destination_dir = "/mnt/repo-" + repo_name
self.fs.mount(destination, destination_dir)
self._delete_contents(destination_dir)
self.logger.info("Cloning repository from %s", repo_url)
repo = git.Repo.clone_from(repo_url, destination_dir, multi_options = [f"--separate-git-dir={real_git_dir}"], progress=OgProgressPrinter(self.logger))
if repo_is_efi:
self._efi_install(root_directory=destination_dir)
else:
self._grub_install(root_directory=destination_dir, boot_device=boot_device)
self.fs.mklostandfound(destination_dir)
self._restore_metadata(destination_dir, set_device_uuids=True)
if self.fs.filesystem_type(mountpoint = destination_dir) == "ntfs":
self._ntfs_restore_secaudit(destination_dir)
self.logger.info("Clone completed.")
def commit(self, path = None, device = None, message = None):
"""
Commit all current changes to the local data
"""
if path is None:
path = self.fs.ensure_mounted(device)
self.logger.info("Committing changes to repository")
repo = git.Repo(path)
self._create_metadata(path, initial_creation=False)
self.logger.info("Adding files")
repo.index.add("*")
self.logger.info("Creating commit")
repo.index.commit(message)
# Restaurar cosas modificadas para git
self._restore_metadata(path, destructive_only=True)
def restoreRepo(self, path):
"""
Restore the repository to the state it had before the non-committed modifications
"""
self.logger.info("Undoing any user changes to the filesystem")
repo = git.Repo(path)
repo.head.reset(index=True, working_tree=True)
# Restaurar cosas modificadas para git
self._restore_metadata(path, destructive_only=True)
def push(self, path = None, device = None):
"""
Push local changes to ogrepository
Use commit() first to save local changes.
"""
if path is None:
path = self.fs.ensure_mounted(device)
repo = git.Repo(path)
self.logger.info("Uploading to ogrepository")
repo.git.push("--set-upstream", "origin", repo.head.ref, "--force") # force = True)
def fetch(self, path = None, device = None):
"""
Fetch updates from ogrepository. Doesn't change the filesystem.
"""
if path is None:
path = self.fs.ensure_mounted(device)
repo = git.Repo(path)
self.logger.info("Fetching from ogrepository")
origin = repo.remotes.origin
if origin:
self.logger.debug("Fetching from origin")
origin.fetch()
else:
self.logger.error("Origin not found, can't fetch")
def pullRepo(self, path):
"""
Pull changes from ogrepository
This unconditionally overwrites remote changes. There is no conflict resolution.
"""
repo = git.Repo(path)
self.logger.debug("Downloading from ogrepository")
repo.git.fetch()
repo.head.reset(index=True, working_tree=True)
# Restaurar cosas modificadas para git
self._restore_metadata(path, destructive_only=True)
if __name__ == '__main__':
# python no cree que nuestra consola usa utf-8.
# esto arregla las tildes y las eñes
sys.stdout.reconfigure(encoding='utf-8')
opengnsys_log_dir = "/opt/opengnsys/log"
logger = logging.getLogger(__package__)
logger.setLevel(logging.DEBUG)
streamLog = logging.StreamHandler()
streamLog.setLevel(logging.INFO)
if not os.path.exists(opengnsys_log_dir):
os.mkdir(opengnsys_log_dir)
logFilePath = f"{opengnsys_log_dir}/gitlib.log"
fileLog = logging.FileHandler(logFilePath)
fileLog.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(name)24s - [%(levelname)5s] - %(message)s')
streamLog.setFormatter(formatter)
fileLog.setFormatter(formatter)
logger.addHandler(streamLog)
logger.addHandler(fileLog)
logger.info("Program start, logging details to %s", logFilePath)
parser = argparse.ArgumentParser(
prog="OpenGnsys Git Library",
description="Funciones de Git",
)
#parser.add_argument("--init-repo", type=str, metavar='DIR', help="Inicializar repositorio desde DIR")
parser.add_argument("--init-repo-from", type=str, metavar='DEV', help="Inicializar repositorio desde DEV")
parser.add_argument("--clone-repo-to", type=str, metavar='DEV', help="Clonar repositorio a DIR. Elimina todos los datos en ruta destino!")
parser.add_argument("--repo", type=str, help="Repositorio en ogrepository (linux, windows, mac)")
parser.add_argument("--boot-device", type=str, help="Dispositivo de arranque")
parser.add_argument("--commit", type=str, metavar='DEV', help="Commit de cambios en el directorio")
parser.add_argument("--restore", type=str, metavar='DEV', help="Eliminar cambios en el directorio")
parser.add_argument("--push", type=str, metavar='DEV', help="Subir cambios a ogrepository")
parser.add_argument("--pull", type=str, metavar='DEV', help="Bajar cambios de ogrepository")
parser.add_argument("--fetch", type=str, metavar='DEV', help="Fetch changes from ogrepository")
parser.add_argument("--efi-config", type=str, metavar="NAME", help="Name of the custom EFI configuration to deploy")
parser.add_argument("--ntfs-type", type=str, metavar="FS", help="Tipo de NTFS, 'kernel' o 'fuse'")
parser.add_argument("--test-create-metadata", type=str, metavar="DIR", help="Test metadata generation")
parser.add_argument("--test-restore-metadata", type=str, metavar="DIR", help="Test metadata restoration")
parser.add_argument("--test-restore-metadata-destructive", type=str, metavar="DIR", help="Test metadata restoration, destructive parts only")
parser.add_argument("--test-clone-metadata", type=str, metavar="REPO", help="Test metadata cloning")
parser.add_argument("--test-efi-install", type=str, metavar="DIR", help = "Install EFI")
parser.add_argument("-m", "--message", type=str, metavar="MSG", help="Commit message")
parser.add_argument("--test-set-ntfsid", type=str, metavar="ID", help="Set NTFS ID")
parser.add_argument("--test-restore-secaudit",type=str, metavar="DIR", help="Test restoring NTFS secaudit")
parser.add_argument("--test-get-part-uuid", type=str, metavar="PART", help="Get partition UUID")
parser.add_argument("--device", type=str, metavar="DEV", help="Device to set the UUID on")
parser.add_argument("-v", "--verbose", action="store_true", help = "Verbose console output")
args = parser.parse_args()
if args.verbose:
streamLog.setLevel(logging.DEBUG)
logger.debug("Starting")
ntfs_impl = NTFSImplementation.NTFS3G
if not args.ntfs_type is None:
if args.ntfs_type == "kernel":
ntfs_impl = NTFSImplementation.KERNEL
elif args.ntfs_type == "fuse":
ntfs_impl = NTFSImplementation.NTFS3G
else:
raise ValueError(f"Unknown NTFS implementation: {args.ntfs_type}")
og_git = OpengnsysGitLibrary(ntfs_implementation = ntfs_impl)
# og_git._runBashFunction("ogMountCache", [])
# if args.init_repo:
# #og_git.initRepo("/mnt/sda1", "linux")
# with OperationTimer(og_git, "git init"):
# og_git.initRepo(args.init_repo, args.repo)
if args.init_repo_from:
with OperationTimer(og_git, "git init"):
og_git.initRepo(args.init_repo_from, args.repo)
elif args.clone_repo_to:
#og_git.cloneRepo("linux", "/opt/opengnsys/cache/cloned")
with OperationTimer(og_git, "git clone"):
og_git.cloneRepo(args.repo, args.clone_repo_to, args.boot_device)
#og_git._restore_metadata("/opt/opengnsys/cache/cloned")
#og_git._restore_metadata(args.clone_repo_to)
elif args.commit:
with OperationTimer(og_git, "git commit"):
og_git.commit(device = args.commit, message = args.message)
elif args.restore:
with OperationTimer(og_git, "git restore"):
og_git.restoreRepo(args.restore)
elif args.push:
with OperationTimer(og_git, "git push"):
og_git.push(device = args.push)
elif args.fetch:
with OperationTimer(og_git, "git fetch"):
og_git.fetch(device = args.fetch)
elif args.pull:
with OperationTimer(og_git, "git pull"):
og_git.pullRepo(args.pull)
elif args.test_create_metadata:
og_git._create_metadata(args.test_create_metadata, initial_creation=False) # pylint: disable=protected-access
elif args.test_restore_metadata:
og_git._restore_metadata(args.test_restore_metadata, set_device_uuids=True) # pylint: disable=protected-access
elif args.test_restore_metadata_destructive:
og_git._restore_metadata(path = args.test_restore_metadata_destructive, destructive_only=True) # pylint: disable=protected-access
elif args.test_clone_metadata:
og_git._get_repo_metadata(args.test_clone_metadata) # pylint: disable=protected-access
elif args.test_set_ntfsid:
ntfs = NTFSLibrary(ntfs_impl)
ntfs.modify_uuid(args.device, args.test_set_ntfsid)
elif args.test_efi_install:
og_git._efi_install(root_directory=args.test_efi_install, config_name = args.efi_config) # pylint: disable=protected-access
elif args.test_restore_secaudit:
og_git._ntfs_restore_secaudit(args.test_restore_secaudit) # pylint: disable=protected-access
else:
print("Please specify an action.")
parser.print_help()
sys.exit(1)
#
# Make sure all filesystem changes are written, just in case the oglive is rebooted without an unmount
os.sync()