oggit/gitlib/gitlib.py

1440 lines
54 KiB
Python

#!/usr/bin/env python3
### NOTES:
# Install:
# python3-git
# python3.8
# Must have working locales, or unicode strings will fail. Install 'locales', configure /etc/locale.gen, run locale-gen.
#
import os
import shutil
import git
import argparse
import tempfile
import shutil
import logging
import subprocess
import libarchive
import pwd
import grp
import datetime
import json
import sys
from pathlib import Path
import xattr
import posix1e
import base64
import blkid
import stat
import time
from enum import Enum
class OgProgressPrinter(git.RemoteProgress):
def __init(self, logger):
self.logger = logger
def update(self, op_code, cur_count, max_count=None, message=""):
self.logger.debug(f"Progress: {op_code} {cur_count}/{max_count}: {message}")
class OperationTimer:
def __init__(self, parent, operation_name):
self.operation_name = operation_name
self.parent = parent
self.start = time.time()
def __enter__(self):
self.start = time.time()
def __exit__(self, *args):
elapsed = round(time.time() - self.start, 3)
self.parent.logger.info(f"{self.operation_name} took {elapsed}s")
class RequirementException(Exception):
"""Excepción que indica que nos falta algún requisito
Duplicado de git_installer.
Attributes:
message (str): Mensaje de error mostrado al usuario
"""
def __init__(self, message):
"""Inicializar RequirementException.
Args:
message (str): Mensaje de error mostrado al usuario
"""
super().__init__(message)
self.message = message
class NTFSImplementation(Enum):
Kernel = 1
NTFS3G = 2
class NTFSLibrary:
def __init__(self, implementation):
self.logger = logging.getLogger("NTFSLibrary")
self.logger.setLevel(logging.DEBUG)
self.implementation = implementation
self.logger.debug("Initializing")
None
def create_filesystem(self, device, label):
self.logger.info(f"Creating NTFS in {device} with label {label}")
subprocess.run(["/usr/sbin/mkntfs", device, "-Q", "-L", label])
def mount_filesystem(self, device, mountpoint):
self.logger.info(f"Mounting {device} in {mountpoint} using implementation {self.implementation}")
if self.implementation == NTFSImplementation.Kernel:
subprocess.run(["/usr/bin/mount", "-t", "ntfs3", device, mountpoint], check = True)
elif self.implementation == NTFSImplementation.NTFS3G:
subprocess.run(["/usr/bin/ntfs-3g", device, mountpoint], check = True)
else:
raise ValueError("Unknown NTFS implementation: {self.implementation}")
def _hex_to_bin(self, hex_str):
while len(hex_str) != 16:
hex_str = "0" + hex_str
hex_int = int(hex_str, 16)
binary = bin(hex_int)[2:].zfill(64)
return binary
def _bin_to_hex(self, binary_data):
binary_int = int(binary_data, 256)
hex_str = hex(binary_int)[2:]
hex_str = hex_str.zfill((len(binary_data) + 3) // 4)
return hex_str
def modify_uuid(self, device, uuid):
"""Modifica el UUID de un sistema de archivos NTFS
Args:
device (_type_): Dispositivo
uuid (_type_): UUID nuevo (8 bytes)
"""
ntfs_uuid_offset = 48
ntfs_uuid_length = 8
binary_uuid = self._hex_to_bin(uuid)
self.logger.info(f"Changing UUID on {device} to {uuid}")
with open(device, 'r+b') as ntfs_dev:
ntfs_dev.seek(ntfs_uuid_offset)
prev_uuid = ntfs_dev.read(ntfs_uuid_length)
#prev_uuid_hex = self._bin_to_hex(prev_uuid)
#self.logger.debug(f"Previous UUID: {prev_uuid_hex}")
ntfs_dev.seek(ntfs_uuid_offset)
ntfs_dev.write(binary_uuid)
class OpengnsysGitLibrary:
"""Libreria de git"""
def __init__(self, require_cache = True, ntfs_implementation = NTFSImplementation.Kernel):
self.logger = logging.getLogger("OpengnsysGitLibrary")
self.logger.setLevel(logging.DEBUG)
self.logger.debug(f"Initializing. Cache = {require_cache}, ntfs = {ntfs_implementation}")
#self.repo_server = "192.168.2.1"
self.mounts = self._parse_mounts()
self.repo_user = "opengnsys"
self.repo_image_path = "/opt/opengnsys/images"
self.ntfs_implementation = ntfs_implementation
self.cache_dir = self._runBashFunction("ogMountCache", [])
# Si no hay cache, se va a crear el .git en el FS directamente
if (not self.cache_dir) and require_cache:
raise RequirementException("Failed to mount cache partition. Cache partition may be missing.")
self.default_ignore_list = [
'/proc/*', # filesystem virtual
'!/proc/.opengnsys-keep',
'/sys/*', # filesystem virtual
'!/sys/.opengnsys-keep',
'/dev/*', # dispositivos -- no soportados por git
'!/dev/.opengnsys-keep',
'/run/*', # info temporal
'!/run/.opengnsys-keep',
'/var/run/*', # info temporal
'!/var/run/.opengnsys-keep',
'/tmp/*', # archivos temporales
'!/tmp/.opengnsys-keep',
'/var/tmp/*', # archivos temporales
'!/var/tmp/.opengnsys-keep',
'/mnt/*', # otros sistemas de archivos
'!/mnt/.opengnsys-keep',
"/lost+found", # lost+found es un directorio especial. Si es necesario lo recreamos
"/$Recycle.Bin",
"/$WinREAgent",
'/PerfLogs'
"/DumpStack.log.tmp",
"/pagefile.sys",
"/swapfile.sys",
"/Recovery",
"/System Volume Information"
]
self.fully_ignored_dirs=[
'proc',
'sys',
'dev',
'run',
'var/run',
'tmp',
'var/tmp',
'mnt',
'$Recycle.Bin',
'$WinREAgent',
'PerfLogs',
'Recovery',
'System Volume Information'
]
self.kernel_args = self._parse_kernel_cmdline()
self.repo_server = self.kernel_args["ogrepo"]
"""Add any untracked files the code might have missed.
This is a workaround for a bug and it comes with a significant
performance penalty.
"""
self.debug_check_for_untracked_files = True
self.logger.debug(f"Git repository: {self.repo_server}")
def _is_efi(self):
"""Determina si hemos arrancado con EFI
Returns:
Bool: Si hemos arrancado con EFI
"""
return os.path.exists("/sys/firmware/efi")
def _parse_mounts(self):
filesystems = {}
self.logger.debug("Parsing /proc/mounts")
with open("/proc/mounts", 'r') as mounts:
for line in mounts:
parts = line.split()
data = {}
data['device'] = parts[0]
data['mountpoint'] = parts[1]
data['type'] = parts[2]
data['options'] = parts[3]
data['dump_freq'] = parts[4]
data['passno'] = parts[5]
filesystems[data["mountpoint"]] = data
filesystems[data["mountpoint"] + "/"] = data
return filesystems
def _find_mountpoint(self, device):
norm = os.path.normpath(device)
self.logger.debug(f"Checking if {device} is mounted")
for mountpoint, mount in self.mounts.items():
#self.logger.debug(f"Item: {mount}")
#self.logger.debug(f"Checking: " + mount['device'])
if mount['device'] == norm:
return mountpoint
return None
def _is_device_mounted(self, device):
return not self._find_mountpoint(device) is None
def _unmount_device(self, device):
mountpoint = self._find_mountpoint(device)
if not mountpoint is None:
self.logger.debug(f"Unmounting {mountpoint}")
subprocess.run(["/usr/bin/umount", mountpoint], check=True)
else:
self.logger.debug(f"{device} is not mounted")
def _rmmod(self, module):
self.logger.debug("Trying to unload module {module}...")
# TODO: modprobe not working on oglive
subprocess.run(["/usr/sbin/rmmod", module], check=False)
def _modprobe(self, module):
self.logger.debug("Trying to load module {module}...")
# TODO: modprobe not working on oglive
subprocess.run(["/usr/sbin/modprobe", module], check=False)
def _mount(self, device, mountpoint, filesystem):
self.logger.debug(f"Mounting {device} at {mountpoint}")
mount_cmd = ["/usr/bin/mount"]
if not filesystem is None:
mount_cmd = mount_cmd + ["-t", filesystem]
mount_cmd = mount_cmd + [mountpoint]
self.logger.debug(f"Mount command: {mount_cmd}")
subprocess.run(mount_cmd, check=True)
def _ntfsfix(self, device):
self.logger.debug(f"Running ntfsfix on {device}")
subprocess.run(["/usr/bin/ntfsfix", "-d", device], check=True)
def _filesystem_type(self, device):
self.logger.debug(f"Probing {device}")
pr = blkid.Probe()
pr.set_device(device)
pr.enable_superblocks(True)
pr.set_superblocks_flags(blkid.SUBLKS_TYPE | blkid.SUBLKS_USAGE | blkid.SUBLKS_UUID | blkid.SUBLKS_UUIDRAW | blkid.SUBLKS_LABELRAW)
pr.do_safeprobe()
fstype = pr["TYPE"].decode('utf-8')
self.logger.debug(f"FS type is {fstype}")
return fstype
def _write_ignore_list(self, base_path):
ignore_file = base_path + "/.gitignore"
self.logger.debug("Creating ignore list: " + ignore_file)
with open(ignore_file, 'w') as f:
f.write("\n".join(self.default_ignore_list))
f.write("\n")
def _parse_kernel_cmdline(self):
"""Obtener parámetros de configuración de la linea de comandos del kernel
Opengnsys nos pasa parametros por linea de comando del kernel, por ejemplo:
[...] group=Aula_virtual ogrepo=192.168.2.1 oglive=192.168.2.1 [...]
Returns:
dict: Diccionario de clave/valor de parámetros
"""
params = {}
self.logger.debug("Parsing kernel parameters")
with open("/proc/cmdline") as cmdline:
line = cmdline.readline()
parts = line.split()
for part in parts:
if "=" in part:
key, value = part.split("=")
params[key] = value
self.logger.debug("%i parameters found" % len(params))
return params
def _is_filesystem(self, path):
with open('/proc/mounts', 'r') as mounts:
for mount in mounts:
parts = mount.split()
self.logger.debug(f"Parts: {parts}")
mountpoint = parts[1]
if os.path.normpath(mountpoint) == os.path.normpath(path):
self.logger.debug(f"Directory {path} is a filesystem root")
return True
self.logger.debug(f"Directory {path} is not a filesystem root")
return False
def _mklostandfound(self, path):
"""Recrear el lost+found si es necesario.
Cuando clonamos en el raíz de un sistema de archivos, al limpiar los contenidos,
eliminamos el lost+found. Este es un directorio especial que requiere el uso de
una herramienta para recrearlo.
Puede fallar en caso de que el sistema de archivos no lo necesite.
"""
if self._is_filesystem(path):
curdir = os.getcwd()
result = None
try:
self.logger.debug(f"Re-creating lost+found in {path}")
os.chdir(path)
result = subprocess.run(["/usr/sbin/mklost+found"], check=True, capture_output=True)
except Exception as e:
self.logger.warning(f"Error running mklost+found: {e}")
if result:
self.logger.debug(f"retorno: {result.returncode}")
self.logger.debug(f"stdout: {result.stdout}")
self.logger.debug(f"stderr: {result.stderr}")
os.chdir(curdir)
def _ntfs_secaudit(self, data):
self.logger.debug(f"Saving NTFS metadata for {data['device']}")
metadata_file = os.path.join(data["metadata_dir"], "ntfs_secaudit.txt")
self.logger.debug(f"Unmounting {data['mountpoint']}...")
subprocess.run(["/usr/bin/umount", data["mountpoint"]], check = True)
result = subprocess.run(["/usr/bin/ntfssecaudit", "-b", data["device"]], check=True, capture_output=True)
self.logger.debug(f"Remounting {data['device']} on {data['mountpoint']}...")
if data["mount_fs"] == "fuseblk":
self.logger.debug("Mount was FUSE")
subprocess.run(["/usr/bin/mount", data["device"], data["mountpoint"]], check=True)
else:
self.logger.debug(f"Mount was {data['mount_fs']}")
subprocess.run(["/usr/bin/mount", data["device"], "-t", data["mount_fs"], data["mountpoint"]], check=True)
self.logger.debug("Writing NTFS audit metadata...")
with open(metadata_file + ".new", "w", encoding='utf-8') as meta:
meta.write(result.stdout.decode('utf-8'))
os.rename(metadata_file + ".new", metadata_file)
def _create_filesystems(self, fs_data, fs_map):
for mountpoint in fs_map:
dest_device = fs_map[mountpoint]
data = fs_data[mountpoint]
fs_type = data["type"]
fs_uuid = data["uuid"]
self.logger.debug(f"Creating filesystem {fs_type} with UUID {fs_uuid} in {dest_device}")
if fs_type == "ntfs" or fs_type == "ntfs3":
self.logger.debug("Creating NTFS filesystem")
ntfs = NTFSLibrary(self.ntfs_implementation)
ntfs.create_filesystem(dest_device, "NTFS")
#ntfs.modify_uuid(dest_device, fs_uuid)
else:
command = [f"/usr/sbin/mkfs.{fs_type}"]
command_args = []
if fs_type == "ext4" or fs_type == "ext3":
command_args = ["-U", fs_uuid, "-F", dest_device]
elif fs_type == "xfs":
command_args = ["-m", f"uuid={fs_uuid}", "-f", dest_device]
elif fs_type == "btrfs":
command_args = ["-U", fs_uuid, "-f", dest_device]
else:
raise RuntimeError(f"Don't know how to create filesystem of type {fs_type}")
command = command + command_args
self.logger.debug(f"Creating Linux filesystem of type {fs_type} on {dest_device}, command {command}")
result = subprocess.run(command, check = True, capture_output=True)
self.logger.debug(f"retorno: {result.returncode}")
self.logger.debug(f"stdout: {result.stdout}")
self.logger.debug(f"stderr: {result.stderr}")
def _mount(self, device, directory):
self.logger.debug(f"Mounting {device} in {directory}")
if not os.path.exists(directory):
os.mkdir(directory)
result = subprocess.run(["/usr/bin/mount", device, directory], check = True, capture_output=True)
self.logger.debug(f"retorno: {result.returncode}")
self.logger.debug(f"stdout: {result.stdout}")
self.logger.debug(f"stderr: {result.stderr}")
def _grub_install(self, boot_device, root_directory):
"""Instalar grub
Args:
device (str): Dispositivo de arranque
root_directory (str): Punto de montaje de sistema raiz
Raises:
RequirementException: Si falta binario de GRUB
"""
if os.path.exists("/usr/sbin/grub2-install"):
self.logger.debug("Installing Grub 2.x (NOT IMPLEMENTED)")
elif os.path.exists("/usr/sbin/grub-install"):
self.logger.debug("Installing Grub 1.x")
root = ""
result = subprocess.run(["/usr/sbin/grub-install", "--force", "--root-directory", root_directory, boot_device], check = True, capture_output=True)
self.logger.debug(f"retorno: {result.returncode}")
self.logger.debug(f"stdout: {result.stdout}")
self.logger.debug(f"stderr: {result.stderr}")
else:
raise RequirementException("Couldn't find /usr/sbin/grub2-install or /usr/sbin/grub-install")
def _efi_install(self, boot_device, root_directory):
"""Instalar EFI"""
self.logger.info(f"Instalando datos EFI en {boot_device}")
meta_dir = os.path.join(root_directory, ".opengnsys-metadata")
efi_files_dir = os.path.join(meta_dir, "efi_data")
shutil.copytree(efi_files_dir, boot_device)
def _find_boot_device(self):
disks = []
self.logger.debug("Looking for EFI partition")
with open("/proc/partitions", "r") as partitions_file:
line_num=0
for line in partitions_file:
if line_num >=2:
data = line.split()
disk = data[3]
disks.append(disk)
self.logger.debug(f"Disk: {disk}")
line_num = line_num + 1
for disk in disks:
self.logger.debug("Loading partitions for disk {disk}")
disk_json_data = subprocess.run(["/usr/sbin/sfdisk", "-J", disk], check=True, capture_output=True)
disk_data = json.loads(disk_json_data)
for part in disk_data["partitiontable"]["partitions"]:
self.logger.debug("Checking partition {part}")
if part["type"] == "C12A7328-F81F-11D2-BA4B-00A0C93EC93B":
return part["node"]
self.logger.warning("Failed to find EFI partition!")
def _delete_contents(self, path):
self.logger.info(f"Deleting contents of {path}")
for filename in os.listdir(path):
file_path = os.path.join(path, filename)
try:
self.logger.debug(f"Deleting {file_path}")
if os.path.isfile(file_path) or os.path.islink(file_path):
os.unlink(file_path)
elif os.path.isdir(file_path):
shutil.rmtree(file_path)
except Exception as e:
self.logger.warning('Failed to delete %s. Error: %s' % (file_path, e))
def _runBashFunction(self, function, arguments):
# Create a temporary file
self.logger.debug(f"Running bash function: {function} {arguments}")
with tempfile.NamedTemporaryFile(mode='w', delete=False) as temp_file:
temp_file.write("#!/bin/bash\n")
temp_file.write("for lib in /opt/oglive/rootfs/opt/opengnsys/lib/engine/bin/*.lib ; do\n")
temp_file.write(" source $lib\n")
temp_file.write("done\n")
#temp_file.write("source /opt/oglive/rootfs/opt/opengnsys/lib/engine/bin/Cache.lib")
#temp_file.write("source /opt/oglive/rootfs/opt/opengnsys/lib/engine/bin/Git.lib")
temp_file.write(f"{function} \"$@\"\n")
# Make the temporary file executable
os.chmod(temp_file.name, 0o755)
self.logger.debug(f"File: {temp_file.name}")
# Run the temporary file
command = [temp_file.name] + arguments
self.logger.debug(f"Running: {command} {arguments}")
result = subprocess.run(command, shell=False, capture_output=True, text=True)
output = result.stdout.strip()
self.logger.debug(f"STDOUT: {output}")
self.logger.debug(f"STDERR: {result.stderr}")
# temp_file.delete()
return output
# os.system(temp_file.name)
def _getOgRepository(self, name):
return f"{self.repo_user}@{self.repo_server}:{self.repo_image_path}/{name}.git"
def _ogGetOsType(self):
return "Linux"
def _get_repo_metadata(self, repo):
"""Obtiene metadatos de un repositorio remoto sin clonar el repo entero.
Esto resuelve el problema de tener metadatos del sistema de archivos en el propio repo,
sobre los que necesitamos actuar antes de poder clonarlo.
Args:
repo (str): Nombre del repositorio (linux, windows, mac)
Returns:
dict: Clave/valor con contenidos de filesystems.json y metadata.json
"""
results = {}
tempdir = tempfile.TemporaryDirectory()
repo_url = self._getOgRepository(repo)
wanted_files = ["filesystems.json", "metadata.json"]
self.logger.debug(f"Cloning metadata for repository {repo_url}")
result = subprocess.run(["git", "archive", "--remote", repo_url, "HEAD:.opengnsys-metadata/"], capture_output=True, check=True)
tar_data = result.stdout
with libarchive.memory_reader(tar_data) as metadata:
self.logger.debug(f"Archive: {metadata}")
for entry in metadata:
self.logger.debug(entry)
if entry.pathname in wanted_files:
self.logger.debug(f"Extracting {entry}")
data = bytearray()
for block in entry.get_blocks():
data = data + block
text = data.decode('utf-8')
results[entry.pathname] = json.loads(text)
self.logger.debug(f"Contents: {text}")
return results
def _create_metadata(self, path):
"""Calcular metadatos para un filesystem
Aquí recorremos todo el sistema de archivos para:
1. Encontrar directorios vacíos y rellenarlos para que git los conserve.
2. Obtener todas las ACLs
3. Obtener todos los atributos extendidos.
4. Renombrar archivos .gitignore
5. Buscar puntos de montaje y obtener información sobre ellos
6. Metadatos adicionales, como el tipo de arranque
7. NTFS secaudit, que debe realizarse al final del proceso porque hay
que desmontar el filesystem.
Para archivos vacíos, generamos una lista que podemos usar después para eliminar los archivos
.opengnsys-keep. Esto se hace porque hay casos en los que un archivo inesperado puede causar
problemas. Por ejemplo, sshfs por defecto se niega a montar cosas en un directorio que contiene
archivos.
Renombramos los archivos .gitignore en subdirectorios porque git los aplicaría a nuestro proceso.
Escribimos todos los datos en JSON para asegurarnos de que no hay problemas con espacios, fines de
linea ni otros caracteres especiales. Esto también asegura una entrada por linea, lo que podemos
usar para acelerar el rendimiento, usando el git para obtener la diferencia entre un estado anterior
y el actual.
Args:
path (str): Ruta base del sistema de archivos
"""
self.logger.info(f"Creating metadata for {path}")
return_data = { 'symlinks': [] }
seen_roots = {}
filesystems_data = {}
ntfs_secaudit_list = []
git_dir = os.path.normpath(os.path.join(path, ".git"))
meta_dir = os.path.join(path, ".opengnsys-metadata")
if not os.path.exists(meta_dir):
os.mkdir(meta_dir, mode=0o700)
# https://jsonlines.org/
metadata_file = open(os.path.join(meta_dir, "metadata.json.new"), "w")
metadata = {}
metadata["efi_boot"] = self._is_efi()
if self._is_efi():
self.logger.debug("Copying EFI data")
efi_files_dir = os.path.join(meta_dir, "efi_data")
if os.path.exists(efi_files_dir):
shutil.rmtree(efi_files_dir)
boot_device = self._find_boot_device
shutil.copytree(boot_device, efi_files_dir)
empties_file = open(os.path.join(meta_dir, "empty_directories.jsonl.new"), "w")
specials_file = open(os.path.join(meta_dir, "special_files.jsonl.new"), "w")
acls_file = open(os.path.join(meta_dir, "acls.jsonl.new"), "w")
xattrs_file = open(os.path.join(meta_dir, "xattrs.jsonl.new"), "w")
gitignores_file = open(os.path.join(meta_dir, "gitignores.jsonl.new"), "w")
filesystems_file = open(os.path.join(meta_dir, "filesystems.json.new"), "w")
ntfs = False
for root, subdirs, files in os.walk(path):
#print(f"ROOT: {root}, subdirs: {subdirs}, files: {files}")
root_norm = os.path.normpath(root)
root_rel = root[len(path):None]
#print(f"A: {root}")
#print(f"B: {git_dir}")
if root_norm.startswith(git_dir):
self.logger.debug(f"Ignoring git directory: {root_norm}")
# No examinamos el .git del raíz
continue
if not root in seen_roots:
seen_roots[root]=1
if root in self.mounts:
mount = self.mounts[root]
root_path_rel = root[len(path):None]
if len(root_path_rel) == 0 or root_path_rel[0] != "/":
root_path_rel = "/" + root_path_rel
pr = blkid.Probe()
pr.set_device(mount['device'])
pr.enable_superblocks(True)
pr.set_superblocks_flags(blkid.SUBLKS_TYPE | blkid.SUBLKS_USAGE | blkid.SUBLKS_UUID | blkid.SUBLKS_UUIDRAW | blkid.SUBLKS_LABELRAW)
pr.do_safeprobe()
fs = mount['type']
orig_fs = fs
if mount['type'] == 'fuseblk':
fs = pr["TYPE"].decode('utf-8')
self.logger.warning(f"FUSE mount detected, replacing type with blkid detected type: {fs}")
self.logger.warning("FUSE has lower performance, a native mount is recommended.")
filesystems_data[root_path_rel] = {}
filesystems_data[root_path_rel]['type'] = fs
filesystems_data[root_path_rel]['size'] = pr.size
filesystems_data[root_path_rel]['sectors'] = pr.sectors
filesystems_data[root_path_rel]['sector_size'] = pr.sector_size
filesystems_data[root_path_rel]['uuid'] = str(pr["UUID"], 'utf-8')
filesystems_data[root_path_rel]['uuid_raw'] = str(base64.b64encode(pr["UUID_RAW"]), 'utf-8')
# TODO: Esto de momento no funciona -- LABEL no se encuentra
#filesystems_data[root_path_rel]['label'] = pr["LABEL"]
#filesystems_data[root_path_rel]['label_raw'] = pr["LABEL_RAW"]
self.logger.debug("Filesystem: {0}, relative {1}. Type {2}, size {3}, UUID {4}".format(root, root_path_rel, mount['type'], pr.size, str(pr["UUID"], 'utf-8')))
if fs == 'ntfs' or fs == 'ntfs3':
self.logger.info("NTFS detected, will do a secaudit")
ntfs_secaudit_list.append({
'device' : mount['device'],
'mountpoint' : root,
'relative_path' : root_path_rel,
'mount_fs' : orig_fs,
'metadata_dir' : meta_dir
})
#self._ntfs_secaudit(root, os.path.join(meta_dir, "ntfs_secaudit.txt"))
ntfs = True
#self.logger.debug(f"Root rel: {root_rel}")
if len(files) == 1 and files[0] == ".opengnsys-keep":
# Si ya tenemos un .opengnsys-keep, lo ignoramos
files.pop(0)
# Ignoramos todo el contenido en directorios como /dev, pero el directorio
# debe existir.
if (len(subdirs) == 0 and len(files) == 0) or (root_rel in self.fully_ignored_dirs):
keep_file = os.path.join(root, ".opengnsys-keep")
Path(keep_file).touch(mode=0o644, exist_ok = True)
self.logger.debug(f"Empty directory: {root}")
#root_rel = root[len(path):None]
empties_file.write(json.dumps({"dir" : root_rel}) + "\n")
for file in files:
full_path = os.path.join(root,file)
full_path_rel = full_path[len(path):None]
# Relative path can't start with a /, git will take it as an
# absolute path pointing to /, and not a file within the repo.
while full_path_rel[0] == '/':
full_path_rel = full_path_rel[1:None]
#self.logger.debug(f"Checking {full_path}:")
if not ntfs and os.path.isfile(full_path) and not os.path.islink(full_path):
# docs: https://pylibacl.k1024.org/module.html#posix1e.ACL.to_any_text
xattrs = str(xattr.get_all(full_path))
acls = posix1e.ACL(file=full_path)
xattrs_json = json.dumps({"file": full_path_rel, "xattrs" : xattrs})
#acls_json = json.dumps({"file": full_path_rel, "acl" : str(acls.to_any_text())})
# __getstate__ nos permite exportar el estado entero de la ACL
# TODO: posiblemente formato de texto mejor?
acl_data = str(base64.b64encode(acls.__getstate__()), 'utf-8')
acls_json = json.dumps({"file": full_path_rel, "acl" : acl_data })
xattrs_file.write(xattrs_json + "\n")
acls_file.write(acls_json + "\n")
if os.path.isfile(full_path) and file == ".gitignore" and root != path:
# TODO: tener en cuenta archivos ya renombrados
logger.debug(f"Found .gitignore: {full_path}")
renamed_file_path = full_path + "-opengnsys-renamed"
gitignores_json = json.dumps({"file": full_path_rel})
gitignores_file.write(gitignores_json + "\n")
os.rename(full_path, renamed_file_path)
#print(f"\tXATTRS: {xattrs}")
#print(f"\tACLs: {acls_json}")
if os.path.exists(full_path):
if not os.path.islink(full_path):
stat_data = os.stat(full_path)
stat_mode = stat_data.st_mode
if not (stat.S_ISDIR(stat_mode) or stat.S_ISREG(stat_mode) or stat.S_ISLNK(stat_mode)):
# Si no es un directorio o un archivo, conservamos los datos necesarios para recrearlo
stat_json_data = {
"file" : full_path_rel,
"mode" : stat_mode,
"uid" : stat_data.st_uid,
"gid" : stat_data.st_gid,
"rdev" : stat_data.st_rdev
}
stat_json = json.dumps(stat_json_data)
specials_file.write(stat_json + "\n")
# Git falla al encontrarse con archivos especiales como dispositivos. De momento debemos
# eliminarlos.
os.unlink(full_path)
if os.path.islink(full_path):
self.logger.debug(f"Symlink: {full_path_rel}")
return_data['symlinks'].append(full_path_rel)
self.logger.debug("Finishing...")
filesystems_file.write(json.dumps(filesystems_data, indent=4) + "\n")
metadata_file.write(json.dumps(metadata, indent=4) + "\n")
empties_file.close()
specials_file.close()
xattrs_file.close()
acls_file.close()
gitignores_file.close()
filesystems_file.close()
metadata_file.close()
os.rename(os.path.join(meta_dir, "empty_directories.jsonl.new"), os.path.join(meta_dir, "empty_directories.jsonl"))
os.rename(os.path.join(meta_dir, "special_files.jsonl.new"), os.path.join(meta_dir, "special_files.jsonl"))
os.rename(os.path.join(meta_dir, "acls.jsonl.new"), os.path.join(meta_dir, "acls.jsonl"))
os.rename(os.path.join(meta_dir, "xattrs.jsonl.new"), os.path.join(meta_dir, "xattrs.jsonl"))
os.rename(os.path.join(meta_dir, "gitignores.jsonl.new"), os.path.join(meta_dir, "gitignores.jsonl"))
os.rename(os.path.join(meta_dir, "filesystems.json.new"), os.path.join(meta_dir, "filesystems.json"))
os.rename(os.path.join(meta_dir, "metadata.json.new"), os.path.join(meta_dir, "metadata.json"))
self.logger.debug("Processing pending NTFS secaudits...")
for audit in ntfs_secaudit_list:
self._ntfs_secaudit(audit)
self.logger.debug("Metadata updated")
return return_data
def _restore_metadata(self, path, destructive_only=False):
"""Restrauracion de metadatos creados por _createmetadata
Args:
path (str): Ruta destino
destructive_only (bool): Solo restaurar lo que se modifique durante un commit
Notes:
El git no maneja archivos de tipo dispositivo o socket correctamente. Por tanto,
debemos guardar datos sobre ellos antes del commit, y eliminarlos antes de que
git pueda verlos y confundirse.
destructive_only=True solo restaura este tipo de metadatos, los que modificamos
en el sistema de archivos real antes del commit. Esto se hace para dejar el
sistema de archivos en el mismo estado que tenia antes del commit.
"""
self.logger.debug("Initializing")
mounts = self._parse_mounts()
self.logger.debug(f"Restoring metadata in {path}")
meta_dir = os.path.join(path, ".opengnsys-metadata")
if not os.path.exists(meta_dir):
self.logger.error(f"Metadata directory not found: {meta_dir}")
return
if not destructive_only:
self.logger.debug("Processing empty_directories.jsonl")
with open(os.path.join(meta_dir, "empty_directories.jsonl"), "r") as empties_file:
for line in empties_file:
empties_data = json.loads(line)
empty_dir = empties_data['dir']
# os.path.join no acepta /foo como una ruta relativa para concatenar
if empty_dir.startswith("/"):
empty_dir = empty_dir[1:]
empty_dir_keep = os.path.join(path, empty_dir, ".opengnsys-keep")
self.logger.debug(f"Empty directory: {empty_dir}")
if os.path.exists(empty_dir_keep):
self.logger.debug(f"Deleting: {empty_dir_keep}")
os.unlink(empty_dir_keep)
if not destructive_only:
self.logger.debug("Processing acls.jsonl")
with open(os.path.join(meta_dir, "acls.jsonl"), "r") as acls_file:
for line in acls_file:
# docs: https://pylibacl.k1024.org/module.html#posix1e.ACL.to_any_text
acls_data = json.loads(line)
#self.logger.debug(f"Data: {acls_data}")
acl_file = acls_data['file']
acl_text = base64.b64decode(bytes(acls_data['acl'], 'utf-8'))
if acl_file.startswith("/"):
acl_file = acl_file[1:]
acl_file_path = os.path.join(path, acl_file)
#self.logger.debug(f"TXT: {acl_text}" )
acl = posix1e.ACL(data = acl_text)
#self.logger.debug(f"ACL: {acl_text}" )
self.logger.debug(f"Applying ACL to {acl_file_path}")
#acl.applyto(acl_file_path)
if not destructive_only:
self.logger.debug("Processing xattrs.jsonl")
with open(os.path.join(meta_dir, "xattrs.jsonl"), "r") as xattrs_file:
for line in xattrs_file:
xattrs_data = json.loads(line)
xattrs_file = xattrs_data['file']
if xattrs_file.startswith("/"):
xattrs_file = xattrs_file[1:]
xattrs_file_path = os.path.join(path, xattrs_file)
#self.logger.debug(f"Line: {line}")
self.logger.debug("Processing gitignores.jsonl")
with open(os.path.join(meta_dir, "gitignores.jsonl"), "r") as gitignores_file:
for line in gitignores_file:
#self.logger.debug(f"Line: {line}")
gitignores_data = json.loads(line)
gitignores_file = gitignores_data['file']
if gitignores_file.startswith("/"):
gitignores_file = gitignores_file[1:]
orig_file_path = os.path.join(path, gitignores_file)
renamed_file_path = orig_file_path + "-opengnsys-renamed"
#self.logger.debug(f"Checking: {renamed_file_path}")
if os.path.exists(renamed_file_path):
self.logger.debug(f"Renaming {renamed_file_path} => {orig_file_path}")
os.rename(renamed_file_path, orig_file_path)
self.logger.debug("Processing special_files.jsonl")
with open(os.path.join(meta_dir, "special_files.jsonl"), "r") as specials_file:
for line in specials_file:
#self.logger.debug(f"Line: {line}")
data = json.loads(line)
filename = data['file']
full_path = os.path.join(path, filename)
file_mode = data['mode']
try:
if stat.S_ISSOCK(file_mode):
self.logger.debug(f"Restoring socket {filename}")
os.mknod(full_path, mode = file_mode)
elif stat.S_ISFIFO(file_mode):
self.logger.debug(f"Restoring FIFO {filename}")
os.mknod(full_path, mode = file_mode)
elif stat.S_ISBLK(file_mode):
self.logger.debug(f"Restoring block device {filename}")
os.mknod(full_path, mode = file_mode, device = data['rdev'])
elif stat.S_ISCHR(file_mode):
self.logger.debug(f"Restoring character device {filename}")
os.mknod(full_path, mode = file_mode, device = data['rdev'])
else:
self.logger.warning(f"Unknown file type for {filename}: {file_mode}")
except FileExistsError as exists:
self.logger.debug(f"Exists: {full_path}")
os.chown(full_path, data['uid'], data['gid'])
self.logger.debug("Metadata restoration completed.")
def _configure_repo(self, repo):
"""
#ogGitConfig
#@brief Configura usuario y permisos de git.
#@return
"""
self.logger.debug(f"Configuring repository {repo}")
repo.config_writer().add_value("user", "name", "OpenGnsys").release()
repo.config_writer().add_value("user", "email", "OpenGnsys@opengnsys.com").release()
repo.config_writer().add_value("core", "filemode", "false").release()
def ogUuidUpdate(str_repo, str_path_image, int_ndisk, int_part):
"""
#ogUuidUpdate str_repo str_path_image int_ndisk str_repo
#@brief Actualiza el UUID de un sistema de ficheros en los archivos de configuración.
#@param str_repo repositorio de imágenes o caché local
#@param str_path_image camino dOk, you can convince me that I'm not an "artist". But so what then? I'll keep generating pictures anyway. I don't make them because I yearn for the coveted status of "artist", but because I enjoy making whatever you want to call I'm making, and the exact terminology is about the least interesting part of the entire thing.e la imagen
#@param int_ndisk nº de orden del disco
#@param int_part nº de partición
#@return (nada, por determinar)
#@exception OG_ERR_FORMAT formato incorrecto.
#@exception OG_ERR_NOTFOUND fichero o dispositivo no encontrado.
"""
# Comprobamos que sea un sistema linux
if ogGetOsType(str_repo, str_path_image) != "Linux":
ogRaiseError(OG_ERR_NOTFOUND, "Linux system.")
return
# Comprobamos que existe archivo de información de la imagen
INFOFILE = ogGetPath(str_repo, f".{str_path_image}.img.jsonl")
if INFOFILE == "":
return ogRaiseError(OG_ERR_NOTFOUND, INFOFILE)
# Comprobamos que exista la partición
MNTDIR = ogMount(int_ndisk, int_part)
if MNTDIR is None:
ogRaiseError(OG_ERR_NOTFOUND, f"Device {str_repo} {str_path_image}")
return
DEVICE = ogDiskToDev(int_ndisk, int_part)
UUID = subprocess.check_output(["blkid", "-o", "value", "-s", "UUID", DEVICE]).decode().strip()
OLDUUID = subprocess.check_output(["jq", ".uuid", INFOFILE]).decode().strip().replace('"', '')
# Para sistemas UEFI también cambio los archivos de la ESP
if ogIsEfiActive():
GRUBEFI = ogMount(ogGetEsp())
GRUBEFI = f"{GRUBEFI}/boot/grubMBR/boot/grub/grub.cfg"
else:
GRUBEFI = ""
# Obtenemos el dispositivo en formato del grub
grub_probe = os.environ.get("grub_probe", f"{OGBIN}/grub-probe1.99_{os.uname().machine}")
ROOT = subprocess.check_output([grub_probe, "--device", ogDiskToDev(int_ndisk, int_part), "--target=drive"]).decode().strip().replace('()', "''")
# Cambiamos UUID en la configuración (fstab y grub)
files = [
f"{MNTDIR}/etc/fstab",
f"{MNTDIR}/{{,boot/}}{GRUBEFI}",
f"{MNTDIR}/{{,boot/}}{{grubMBR,grubPARTITION}}/boot/grub{{,2}}/{{menu.lst,grub.cfg}}",
f"{GRUBEFI}"
]
for f in files:
if os.path.isfile(f):
with open(f, "r+") as file:
content = file.read()
content = content.replace(OLDUUID, UUID).replace("'hd.,gpt.'", ROOT).replace("hd.,msdos.", ROOT)
file.seek(0)
file.write(content)
file.truncate()
def initRepo(self, device, repo_name):
self._unmount_device(device)
path = os.path.join("/mnt", os.path.basename(device))
self.logger.debug(f"Will mount repo at {path}")
if not os.path.exists(path):
os.path.mkdir(path)
if self._filesystem_type(device) == "ntfs":
self.logger.debug("Handing a NTFS filesystem")
self._modprobe("ntfs3")
self._ntfsfix(device)
ntfs = NTFSLibrary(self.ntfs_implementation)
ntfs.mount_filesystem(device, path)
else:
self._mount(device, path)
self.logger.info("Initializing repository: " + path)
git_dir = os.path.join(path, ".git")
real_git_dir = os.path.join(self.cache_dir, f"git-{repo_name}")
if os.path.exists(real_git_dir):
self.logger.debug(f"Removing existing repository {real_git_dir}")
shutil.rmtree(real_git_dir)
if os.path.exists(git_dir) or os.path.islink(git_dir):
if os.path.islink(git_dir) or os.path.isfile(git_dir):
self.logger.debug(f"Removing gitdir: {git_dir}")
os.unlink(git_dir)
elif os.path.isdir(git_dir):
# We want to host git in the cache partition, .git shouldn't be a directory under the
# filesystem.
self.logger.warning(f"Removing directory-type gitdir, this should be a link or a file: {git_dir}")
shutil.rmtree(git_dir)
else:
raise RuntimeError("Git dir is of an unrecognized file type!")
# if not os.path.exists(git_dir):
#self.logger.debug("Creating " + git_dir)
#with open(git_dir, "w") as git_dir:
# git_dir.write(f"gitdir: {real_git_dir}\n")
self.logger.debug(f"Initializing repo in cache at {real_git_dir}")
#os.mkdir(real_git_dir)
#with git.Repo.init(real_git_dir, bare=True) as temprepo:
# self._configure_repo(temprepo)
os.symlink(real_git_dir, git_dir)
repo = git.Repo.init(path)
self._configure_repo(repo)
self._write_ignore_list(path)
metadata_ret = self._create_metadata(path)
self.logger.debug(f"Building list of files to add from path {path}")
add_files = []
# Nota: repo.index.add(".") agrega archivos pero git después cree que
# no han sido agregados?
for ent in os.listdir(path):
if repo.ignored(ent) or ent == ".git" or ent == "." or ent == "..":
self.logger.debug(f"Ignoring: {ent}")
elif ent in self.fully_ignored_dirs:
# FIXME: repo.index.add tiene un bug que ignora Force=true
#repo.index.add("dev/.opengnsys-keep")
self.logger.debug("Fully ignored dir: {ent}")
add_files.append(f"{ent}/.opengnsys-keep")
else:
self.logger.debug(f"Adding: {ent}")
add_files.append(ent)
#repo.index.add(ent, force=False)
for lnk in metadata_ret['symlinks']:
self.logger.debug(f"Adding symlink: {lnk}")
add_files.append(lnk)
self.logger.debug("Adding %d files" % len(add_files))
with OperationTimer(self, "add all files"):
subprocess.run(["git", "add"] + add_files, check=True, cwd=path)
#repo.index.add(items = add_files, force=True, )
# FIXME: This shouldn't actually happen, we shouldn't have any untracked files
if self.debug_check_for_untracked_files:
self.logger.info("Checking for untracked files...")
with OperationTimer(self, "add untracked files"):
untracked_list = repo.untracked_files
if untracked_list:
self.logger.warning(f"Untracked files: {untracked_list}")
self.logger.warning("Adding %d untracked files" % len(untracked_list))
#repo.index.add(items = untracked_list, force=True)
subprocess.run(["git", "add"] + untracked_list, check=True, cwd=path)
self.logger.debug("Committing")
repo.index.commit("Initial commit")
# Restaurar cosas modificadas para git
self._restore_metadata(path, destructive_only=True)
#self.logger.debug("Commit done, will unmount now")
#self._umount_device(device)
self._rmmod("ntfs3")
repo_url = self._getOgRepository(repo_name)
self.logger.debug(f"Creating remote origin: {repo_url}")
if "origin" in repo.remotes:
repo.delete_remote("origin")
origin = repo.create_remote("origin", repo_url)
self.logger.debug("Fetching origin")
origin.fetch()
# repo.create_head
# repo.heads.master.set_tracking_branch(origin.refs.master)
self.logger.debug("Uploading to ogrepository")
repo.git.push("--set-upstream", "origin", repo.head.ref, "--force") # force = True)
def cloneRepo(self, repo_name, destination, boot_device):
self.logger.info(f"Cloning repo: {repo_name} => {destination}")
repo_url = self._getOgRepository(repo_name)
self.logger.debug(f"URL: {repo_url}")
all_metadata = self._get_repo_metadata(repo_name)
metadata = all_metadata["metadata.json"]
fs_data = all_metadata["filesystems.json"]
if len(fs_data.keys()) == 0:
raise RequirementException("El repositorio contiene metadatos incorrectos. Falta información sobre sistemas de archivos en filesystems.json.")
if not "efi_boot" in metadata:
raise RequirementException("El repositorio contiene metadatos incorrectos. Falta información de metadatos en metadata.json")
repo_is_efi = metadata["efi_boot"]
efi = self._is_efi()
self.logger.debug(f"Repository made for EFI: {repo_is_efi}")
self.logger.debug(f"Our system using EFI : {efi}")
if repo_is_efi != efi:
raise RequirementException("Repositorio usa sistema de arranque incompatible con sistema actual")
filesystem_map = {"/" : destination}
self._create_filesystems(fs_data, filesystem_map)
destination_dir = "/mnt/repo-" + repo_name
self._mount(destination, destination_dir)
self._delete_contents(destination_dir)
repo = git.Repo.clone_from(repo_url, destination_dir)
if repo_is_efi:
self._efi_install(boot_device, destination_dir)
else:
self._grub_install(boot_device, destination_dir)
self._mklostandfound(destination_dir)
self._restore_metadata(destination_dir)
def commitRepo(self, path):
"""
Commit all current changes to the local data
"""
repo = git.Repo(path)
metadata_ret = self._create_metadata(path)
# Restaurar cosas modificadas para git
self._restore_metadata(path, destructive_only=True)
def restoreRepo(self, path):
"""
Restore the repository to the state it had before the non-committed modifications
"""
repo = git.Repo(path)
repo.head.reset(index=True, working_tree=True)
# Restaurar cosas modificadas para git
self._restore_metadata(path, destructive_only=True)
def pushRepo(self, path):
"""
Push local changes to ogrepository
Use commitRepo() first to save local changes.
"""
repo = git.Repo(path)
self.logger.debug("Uploading to ogrepository")
repo.git.push("--set-upstream", "origin", repo.head.ref, "--force") # force = True)
def pullRepo(self, path):
"""
Pull changes from ogrepository
This unconditionally overwrites remote changes. There is no conflict resolution.
"""
repo = git.Repo(path)
self.logger.debug("Downloading from ogrepository")
repo.git.fetch()
repo.head.reset(index=True, working_tree=True)
# Restaurar cosas modificadas para git
self._restore_metadata(path, destructive_only=True)
if __name__ == '__main__':
# python no cree que nuestra consola usa utf-8.
# esto arregla las tildes y las eñes
sys.stdout.reconfigure(encoding='utf-8')
logger = logging.getLogger(__package__)
logger.setLevel(logging.DEBUG)
streamLog = logging.StreamHandler()
streamLog.setLevel(logging.DEBUG)
fileLog = logging.FileHandler("gitlib.log")
fileLog.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(name)24s - [%(levelname)5s] - %(message)s')
streamLog.setFormatter(formatter)
fileLog.setFormatter(formatter)
logger.addHandler(streamLog)
logger.addHandler(fileLog)
logger.info("Inicio del programa")
parser = argparse.ArgumentParser(
prog="OpenGnsys Git Library",
description="Funciones de Git",
)
#parser.add_argument("--init-repo", type=str, metavar='DIR', help="Inicializar repositorio desde DIR")
parser.add_argument("--init-repo-from", type=str, metavar='DEV', help="Inicializar repositorio desde DEV")
parser.add_argument("--clone-repo-to", type=str, metavar='DIR', help="Clonar repositorio a DIR. Elimina todos los datos en ruta destino!")
parser.add_argument("--repo", type=str, help="Repositorio en ogrepository (linux, windows, mac)")
parser.add_argument("--boot-device", type=str, help="Dispositivo de arranque")
parser.add_argument("--commit", type=str, metavar='DIR', help="Commit de cambios en el directorio")
parser.add_argument("--restore", type=str, metavar='DIR', help="Eliminar cambios en el directorio")
parser.add_argument("--push", type=str, metavar='DIR', help="Subir cambios a ogrepository")
parser.add_argument("--pull", type=str, metavar='DIR', help="Bajar cambios de ogrepository")
parser.add_argument("--ntfs-type", type=str, metavar="FS", help="Tipo de NTFS, 'kernel' o 'fuse'")
parser.add_argument("--test-metadata", type=str, metavar="DIR", help="Test metadata generation")
parser.add_argument("--test-restore-metadata", type=str, metavar="DIR", help="Test metadata restoration")
parser.add_argument("--test-restore-metadata-destructive", type=str, metavar="DIR", help="Test metadata restoration, destructive parts only")
parser.add_argument("--test-clone-metadata", type=str, metavar="REPO", help="Test metadata cloning")
args = parser.parse_args()
logger.debug("Inicio")
ntfs_impl = NTFSImplementation.Kernel
if args.ntfs_type == "kernel":
ntfs_impl = NTFSImplementation.Kernel
elif args.ntfs_type == "fuse":
ntfs_impl = NTFSImplementation.NTFS3G
else:
raise ValueError("Unknown NTFS implementation: {args.ntfs_type}")
og_git = OpengnsysGitLibrary(ntfs_implementation = ntfs_impl)
# og_git._runBashFunction("ogMountCache", [])
# if args.init_repo:
# #og_git.initRepo("/mnt/sda1", "linux")
# with OperationTimer(og_git, "git init"):
# og_git.initRepo(args.init_repo, args.repo)
if args.init_repo_from:
with OperationTimer(og_git, "git init"):
og_git.initRepo(args.init_repo_from, args.repo)
elif args.clone_repo_to:
#og_git.cloneRepo("linux", "/opt/opengnsys/cache/cloned")
with OperationTimer(og_git, "git clone"):
og_git.cloneRepo(args.repo, args.clone_repo_to, args.boot_device)
#og_git._restore_metadata("/opt/opengnsys/cache/cloned")
#og_git._restore_metadata(args.clone_repo_to)
elif args.commit:
with OperationTimer(og_git, "git commit"):
og_git.commitRepo(args.commit)
elif args.restore:
with OperationTimer(og_git, "git restore"):
og_git.restoreRepo(args.restore)
elif args.push:
with OperationTimer(og_git, "git push"):
og_git.pushRepo(args.push)
elif args.pull:
with OperationTimer(og_git, "git pull"):
og_git.pullRepo(args.pull)
elif args.test_metadata:
og_git._create_metadata(args.test_metadata)
elif args.test_restore_metadata:
og_git._restore_metadata(args.test_restore_metadata)
elif args.test_restore_metadata_destructive:
og_git._restore_metadata(path = args.test_restore_metadata_destructive, destructive_only=True)
elif args.test_clone_metadata:
og_git._get_repo_metadata(args.test_clone_metadata)
else:
print("Debe especificar una acción")
#