#!/usr/bin/env python3 """Script para la instalación del repositorio git""" import os import sys sys.path.insert(0, "/opt/opengnsys-modules/python3/dist-packages") import shutil import argparse import tempfile import logging import subprocess import sys import pwd import grp from termcolor import cprint import git import libarchive from libarchive.extract import * #from libarchive.entry import FileType import urllib.request import pathlib import socket import time import requests import tempfile import hashlib import datetime #FORGEJO_VERSION="8.0.3" FORGEJO_VERSION="9.0.0" FORGEJO_URL=f"https://codeberg.org/forgejo/forgejo/releases/download/v{FORGEJO_VERSION}/forgejo-{FORGEJO_VERSION}-linux-amd64" def show_error(*args): """ Imprime un mensaje de error Args: *args: Argumentos igual que a la función print Returns: None """ cprint(*args, "red", attrs = ["bold"], file=sys.stderr) class RequirementException(Exception): """Excepción que indica que nos falta algún requisito Attributes: message (str): Mensaje de error mostrado al usuario """ def __init__(self, message): """Inicializar RequirementException. Args: message (str): Mensaje de error mostrado al usuario """ super().__init__(message) self.message = message class FakeTemporaryDirectory: """Imitación de TemporaryDirectory para depuración""" def __init__(self, dirname): self.name = dirname os.makedirs(dirname, exist_ok=True) def __str__(self): return self.name class Oglive: """Interfaz a utilidad oglivecli Esto es probablemente temporal hasta que se haga una conversión de oglivecli """ def __init__(self): self.__logger = logging.getLogger("Oglive") self.binary = "/opt/opengnsys/bin/oglivecli" self.__logger.debug("Inicializando") def _cmd(self, args): cmd = [self.binary] + args self.__logger.debug("comando: %s", cmd) proc = subprocess.run(cmd, shell=False, check=True, capture_output=True) out_text = proc.stdout.decode('utf-8').strip() self.__logger.debug("salida: %s", out_text) return out_text def get_default(self): """Devuelve el cliente por defecto""" self.__logger.debug("get_default()") return self._cmd(["get-default"]) def get_clients(self): """Devuelve la lista de clientes en un dict""" self.__logger.debug("get_clients()") lines = self._cmd(["list"]).splitlines() clients = {} for line in lines: (number, name) = line.split() clients[number] = name self.__logger.debug("Clientes: %s", clients) return clients class OpengnsysGitInstaller: """Instalador de OpenGnsys""" def __init__(self): """Inicializar clase""" self.__logger = logging.getLogger("OpengnsysGitInstaller") self.__logger.setLevel(logging.DEBUG) self.__logger.debug("Inicializando") self.testmode = False self.base_path = "/opt/opengnsys" self.git_basedir = "base.git" self.email = "OpenGnsys@opengnsys.com" self.forgejo_user = "oggit" self.forgejo_password = "opengnsys" self.forgejo_organization = "opengnsys" self.forgejo_port = 3000 self.set_ssh_user_group("oggit", "oggit") self.temp_dir = None self.script_path = os.path.realpath(os.path.dirname(__file__)) # Possible names for SSH public keys self.ssh_key_users = ["root", "opengnsys"] self.key_names = ["id_rsa.pub", "id_ed25519.pub", "id_ecdsa.pub", "id_ed25519_sk.pub", "id_ecdsa_sk.pub"] # Possible names for SSH key in oglive self.key_paths = ["scripts/ssl/id_rsa.pub", "scripts/ssl/id_ed25519.pub", "scripts/ssl/id_ecdsa.pub", "scripts/ssl/id_ed25519_sk.pub", "scripts/ssl/id_ecdsa_sk.pub"] self.key_paths_dict = {} for kp in self.key_paths: self.key_paths_dict[kp] = 1 self.oglive = Oglive() def set_testmode(self, value): """Establece el modo de prueba""" self.testmode = value def set_ignoresshkey(self, value): """Ignorar requisito de clave de ssh para el instalador""" self.ignoresshkey = value def set_usesshkey(self, value): """Usar clave de ssh especificada""" self.usesshkey = value def set_basepath(self, value): """Establece ruta base de OpenGnsys Valor por defecto: /opt/opengnsys """ self.base_path = value def _get_tempdir(self): """Obtiene el directorio temporal""" if self.testmode: dirname = "/tmp/ogtemp" if os.path.exists(dirname): shutil.rmtree(dirname) dir=FakeTemporaryDirectory(dirname) self.__logger.debug("Modo de prueba, temp=/tmp/ogtemp") return dir else: dir = tempfile.TemporaryDirectory() self.__logger.debug("Temp = %s", dir) return dir def _cleanup(self): """Limpia el directorio temporal""" if self.temp_dir: shutil.rmtree(self.temp_dir, ignore_errors=True) def set_ssh_user_group(self, username, groupname): self.ssh_group = groupname self.ssh_user = username try: self.ssh_gid = grp.getgrnam(self.ssh_group).gr_gid self.__logger.info("Group %s exists with gid %i", self.ssh_group, self.ssh_gid) except KeyError: self.__logger.info("Need to create group %s", self.ssh_group) subprocess.run(["/usr/sbin/groupadd", "--system", self.ssh_group], check=True) self.ssh_gid = grp.getgrnam(groupname).gr_gid try: self.ssh_uid = pwd.getpwnam(self.ssh_user).pw_uid self.__logger.info("User %s exists with gid %i", self.ssh_user, self.ssh_uid) except KeyError: self.__logger.info("Need to create user %s", self.ssh_user) subprocess.run(["/usr/sbin/useradd", "--gid", str(self.ssh_gid), "-m", "--system", self.ssh_user], check=True) self.ssh_uid = pwd.getpwnam(username).pw_uid self.ssh_homedir = pwd.getpwnam(username).pw_dir def init_git_repo(self, reponame): """Inicializa un repositorio Git""" # Creamos repositorio ogdir_images = os.path.join(self.base_path, "images") self.__logger.info("Creando repositorio de GIT %s", reponame) os.makedirs(os.path.join(ogdir_images, self.git_basedir), exist_ok=True) repo_path=os.path.join(ogdir_images, reponame) shutil.rmtree(repo_path, ignore_errors=True) # Marcar como directorio seguro # Nota: no usar GitPython. Config global falla, aunque hay indicaciones de que # git.Repo(path=None) es valido. Posiblemente bug de GitPython. subprocess.run(["git", "config", "--global", "add" "safe.directory", repo_path]) self.__logger.debug("Inicializando repositorio: " + repo_path) repo = git.Repo.init(repo_path, bare = True) self.__logger.info("Configurando repositorio de GIT") repo.config_writer().set_value("user", "name", "OpenGnsys").release() repo.config_writer().set_value("user", "email", self.email).release() self._recursive_chown(repo_path, ouid=self.ssh_uid, ogid=self.ssh_gid) def _add_line_to_file(self, filename, new_line): """Agrega una línea a un archivo""" found = False self.__logger.debug("Agregando linea: %s a %s", new_line, filename) with open(filename, "a+", encoding="utf-8") as f: f.seek(0) for line in f: if line.strip() == new_line.strip(): found = True if not found: self.__logger.debug("Agregando linea: %s", new_line) f.write(new_line + "\n") else: self.__logger.debug("Linea ya presente") def _recursive_chown(self, path, ouid, ogid): """Cambia el propietario y grupo de forma recursiva""" for dirpath, _, filenames in os.walk(path): os.chown(dirpath, uid=ouid, gid=ogid) for filename in filenames: os.chown(os.path.join(dirpath, filename), uid=ouid, gid=ogid) def _wait_for_port(self, host, port): self.__logger.info("Waiting for %s:%i to be up", host, port) timeout = 60 start_time = time.time() ready = False while not ready and (time.time() - start_time) < 60: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: s.connect((host, port)) ready = True s.close() except TimeoutError: self.__logger.debug("Timed out, no connection yet.") except OSError as oserr: self.__logger.debug("%s, no connection yet. %.1f seconds left.", oserr.strerror, timeout - (time.time() - start_time)) time.sleep(0.1) if ready: self.__logger.info("Connection established.") else: self.__logger.error("Timed out waiting for connection!") raise TimeoutError("Timed out waiting for connection!") def add_ssh_key_from_squashfs(self, oglive_num = None): if oglive_num is None: self.__logger.info("Using default oglive") oglive_num = int(self.oglive.get_default()) else: self.__logger.info("Using oglive %i", oglive_num) oglive_client = self.oglive.get_clients()[str(oglive_num)] self.__logger.info("Oglive is %s", oglive_client) keys = installer.extract_ssh_keys(oglive_num = oglive_num) for k in keys: timestamp = '{:%Y-%m-%d %H:%M:%S}'.format(datetime.datetime.now()) installer.add_forgejo_sshkey(k, f"Key for {oglive_client} ({timestamp})") def extract_ssh_keys(self, oglive_num = None): public_keys = [] squashfs = "ogclient.sqfs" tftp_dir = os.path.join(self.base_path, "tftpboot") if oglive_num is None: self.__logger.info("Reading from default oglive") oglive_num = self.oglive.get_default() else: self.__logger.info("Reading from oglive %i", oglive_num) oglive_client = self.oglive.get_clients()[str(oglive_num)] self.__logger.info("Oglive is %s", oglive_client) client_squashfs_path = os.path.join(tftp_dir, oglive_client, squashfs) self.__logger.info("Mounting %s", client_squashfs_path) mount_tempdir = tempfile.TemporaryDirectory() ssh_keys_dir = os.path.join(mount_tempdir.name, "root", ".ssh") subprocess.run(["mount", client_squashfs_path, mount_tempdir.name], check=True) for file in os.listdir(ssh_keys_dir): full_path = os.path.join(ssh_keys_dir, file) if file.endswith(".pub"): self.__logger.info("Found public key: %s", full_path) with open(full_path, "r", encoding="utf-8") as keyfile: keydata = keyfile.read().strip() public_keys = public_keys + [keydata] subprocess.run(["umount", mount_tempdir.name], check=True) return public_keys def _extract_ssh_key_from_initrd(self): public_key="" INITRD = "oginitrd.img" tftp_dir = os.path.join(self.base_path, "tftpboot") default_num = self.oglive.get_default() default_client = self.oglive.get_clients()[default_num] client_initrd_path = os.path.join(tftp_dir, default_client, INITRD) #self.temp_dir = self._get_tempdir() if self.usesshkey: with open(self.usesshkey, 'r') as f: public_key = f.read().strip() else: if os.path.isfile(client_initrd_path): #os.makedirs(temp_dir, exist_ok=True) #os.chdir(self.temp_dir.name) self.__logger.debug("Descomprimiendo %s", client_initrd_path) public_key = None with libarchive.file_reader(client_initrd_path) as initrd: for file in initrd: self.__logger.debug("Archivo: %s", file) pathname = file.pathname; if pathname.startswith("./"): pathname = pathname[2:] if pathname in self.key_paths_dict: data = bytearray() for block in file.get_blocks(): data = data + block public_key = data.decode('utf-8').strip() break else: print(f"No se encuentra la imagen de initrd {client_initrd_path}") exit(2) return public_key def set_ssh_key_in_initrd(self, client_num = None): INITRD = "oginitrd.img" tftp_dir = os.path.join(self.base_path, "tftpboot") if client_num is None: self.__logger.info("Will modify default client") client_num = self.oglive.get_default() ogclient = self.oglive.get_clients()[client_num] client_initrd_path = os.path.join(tftp_dir, ogclient, INITRD) client_initrd_path_new = client_initrd_path + ".new" self.__logger.debug("initrd path for ogclient %s is %s", ogclient, client_initrd_path) temp_dir = tempfile.TemporaryDirectory() temp_dir_path = temp_dir.name #temp_dir_path = "/tmp/extracted" if os.path.exists(temp_dir_path): shutil.rmtree(temp_dir_path) pathlib.Path(temp_dir_path).mkdir(parents=True, exist_ok = True) self.__logger.debug("Uncompressing initrd %s into %s", client_initrd_path, temp_dir_path) os.chdir(temp_dir_path) libarchive.extract_file(client_initrd_path, flags = EXTRACT_UNLINK | EXTRACT_OWNER | EXTRACT_PERM | EXTRACT_FFLAGS | EXTRACT_TIME) ssh_key_dir = os.path.join(temp_dir_path, "scripts", "ssl") client_key_path = os.path.join(ssh_key_dir, "id_ed25519") authorized_keys_path = os.path.join(ssh_key_dir, "authorized_keys") oglive_public_key = "" # Create a SSH key on the oglive, if needed pathlib.Path(ssh_key_dir).mkdir(parents=True, exist_ok=True) if os.path.exists(client_key_path): self.__logger.info("Creating SSH key not necessary, it already is in the initrd") else: self.__logger.info("Writing new SSH key into %s", client_key_path) subprocess.run(["/usr/bin/ssh-keygen", "-t", "ed25519", "-N", "", "-f", client_key_path], check=True) with open(client_key_path + ".pub", "r", encoding="utf-8") as pubkey: oglive_public_key = pubkey.read() # Add our public keys to the oglive, so that we can log in public_keys = "" for username in self.ssh_key_users: self.__logger.debug("Looking for keys in user %s", username) homedir = pwd.getpwnam(username).pw_dir for key in self.key_names: key_path = os.path.join(homedir, ".ssh", key) self.__logger.debug("Checking if we have %s...", key_path) if os.path.exists(key_path): with open(key_path, "r", encoding='utf-8') as public_key_file: self.__logger.info("Adding %s to authorized_keys", key_path) public_key = public_key_file.read() public_keys = public_keys + public_key + "\n" self.__logger.debug("Writing %s", authorized_keys_path) with open(authorized_keys_path, "w", encoding='utf-8') as auth_keys: auth_keys.write(public_keys) # hardlinks in the source package are not correctly packaged back as hardlinks. # Taking the easy option of turning them into symlinks for now. file_hashes = {} with libarchive.file_writer(client_initrd_path_new, "cpio_newc", "zstd") as writer: file_list = [] for root, subdirs, files in os.walk(temp_dir_path): proot = pathlib.PurePosixPath(root) relpath = proot.relative_to(temp_dir_path) for file in files: abs_path = os.path.join(root, file) full_path = os.path.join(relpath, file) #self.__logger.debug("%s", abs_path) digest = None if os.path.islink(abs_path): self.__logger.debug("%s is a symlink", abs_path) continue if not os.path.exists(abs_path): self.__logger.debug("%s does not exist", abs_path) continue stat_data = os.stat(abs_path) with open(full_path, "rb") as in_file: digest = hashlib.file_digest(in_file, "sha256").hexdigest() if stat_data.st_size > 0 and not os.path.islink(full_path): if digest in file_hashes: target_path = pathlib.Path(file_hashes[digest]) link_path = target_path.relative_to(relpath, walk_up=True) self.__logger.debug("%s was a duplicate of %s, linking to %s", full_path, file_hashes[digest], link_path) os.unlink(full_path) #os.link(file_hashes[digest], full_path) os.symlink(link_path, full_path) else: file_hashes[digest] = full_path writer.add_files(".", recursive=True ) os.rename(client_initrd_path, client_initrd_path + ".old") if os.path.exists(client_initrd_path + ".sum"): os.rename(client_initrd_path + ".sum", client_initrd_path + ".sum.old") os.rename(client_initrd_path_new, client_initrd_path) with open(client_initrd_path, "rb") as initrd_file: hexdigest = hashlib.file_digest(initrd_file, "sha256").hexdigest() with open(client_initrd_path + ".sum", "w", encoding="utf-8") as digest_file: digest_file.write(hexdigest + "\n") self.__logger.info("Updated initrd %s", client_initrd_path) timestamp = '{:%Y-%m-%d %H:%M:%S}'.format(datetime.datetime.now()) self.add_forgejo_sshkey(oglive_public_key, f"Key for {ogclient} ({timestamp})") def install(self): """Instalar Ejecuta todo el proceso de instalación incluyendo: * Dependencias * Configuración de authorized_keys * Configuración de ssh * Creación de repositorio Raises: RequirementException: No ejecutado por usuario root RequirementException: No ejecutado en Debian o Ubuntu RequirementException: Falta clave pública RequirementException: Python < 3.8 """ self.__logger.info("install()") ogdir_images = os.path.join(self.base_path, "images") ENGINECFG = os.path.join(self.base_path, "client/etc/engine.cfg") os.environ["PATH"] += os.pathsep + os.path.join(self.base_path, "bin") tftp_dir = os.path.join(self.base_path, "tftpboot") INITRD = "oginitrd.img" self.temp_dir = self._get_tempdir() SSHUSER = "opengnsys" # Control básico de errores. self.__logger.debug("Comprobando euid") if os.geteuid() != 0: raise RequirementException("Sólo ejecutable por root") if not os.path.exists("/etc/debian_version"): raise RequirementException("Instalación sólo soportada en Debian y Ubuntu") MIN_PYTHON = (3, 8) if sys.version_info < MIN_PYTHON: raise RequirementException(f"Python %s.%s mínimo requerido.\n" % MIN_PYTHON) self.__logger.debug("Instalando dependencias") subprocess.run(["apt-get", "install", "-y", "git"], check=True) def _install_template(self, template, destination, keysvalues): self.__logger.info("Writing template %s into %s", template, destination) data = "" with open(template, "r", encoding="utf-8") as template_file: data = template_file.read() for key in keysvalues.keys(): data = data.replace("{" + key + "}", keysvalues[key]) with open(destination, "w+", encoding="utf-8") as out_file: out_file.write(data) def _runcmd(self, cmd): self.__logger.debug("Running: %s", cmd) ret = subprocess.run(cmd, check=True,capture_output=True, encoding='utf-8') return ret.stdout.strip() def install_forgejo(self): self.__logger.info("Installing Forgejo") bin_path = os.path.join(self.base_path, "bin", "forgejo") conf_dir_path = os.path.join(self.base_path, "etc", "forgejo") lfs_dir_path = os.path.join(self.base_path, "images", "git-lfs") git_dir_path = os.path.join(self.base_path, "images", "git") forgejo_work_dir_path = os.path.join(self.base_path, "var", "lib", "forgejo/work") forgejo_db_dir_path = os.path.join(self.base_path, "var", "lib", "forgejo/db") forgejo_data_dir_path = os.path.join(self.base_path, "var", "lib", "forgejo/data") forgejo_db_path = os.path.join(forgejo_db_dir_path, "forgejo.db") forgejo_log_dir_path = os.path.join(self.base_path, "log", "forgejo") conf_path = os.path.join(conf_dir_path, "app.ini") self.__logger.debug("Stopping opengnsys-forgejo service") subprocess.run(["systemctl", "stop", "opengnsys-forgejo"], check=False) self.__logger.debug("Downloading from %s into %s", FORGEJO_URL, bin_path) urllib.request.urlretrieve(FORGEJO_URL, bin_path) os.chmod(bin_path, 0o755) if os.path.exists(forgejo_db_path): self.__logger.debug("Removing old configuration") os.unlink(forgejo_db_path) else: self.__logger.debug("Old configuration not present, ok.") self.__logger.debug("Wiping old data") for dir in [conf_dir_path, git_dir_path, lfs_dir_path, forgejo_work_dir_path, forgejo_data_dir_path, forgejo_db_dir_path]: if os.path.exists(dir): self.__logger.debug("Removing %s", dir) shutil.rmtree(dir) self.__logger.debug("Creating directories") pathlib.Path(conf_dir_path).mkdir(parents=True, exist_ok=True) pathlib.Path(git_dir_path).mkdir(parents=True, exist_ok=True) pathlib.Path(lfs_dir_path).mkdir(parents=True, exist_ok=True) pathlib.Path(forgejo_work_dir_path).mkdir(parents=True, exist_ok=True) pathlib.Path(forgejo_data_dir_path).mkdir(parents=True, exist_ok=True) pathlib.Path(forgejo_db_dir_path).mkdir(parents=True, exist_ok=True) pathlib.Path(forgejo_log_dir_path).mkdir(parents=True, exist_ok=True) os.chown(lfs_dir_path, self.ssh_uid, self.ssh_gid) os.chown(git_dir_path, self.ssh_uid, self.ssh_gid) os.chown(forgejo_data_dir_path, self.ssh_uid, self.ssh_gid) os.chown(forgejo_work_dir_path, self.ssh_uid, self.ssh_gid) os.chown(forgejo_db_dir_path, self.ssh_uid, self.ssh_gid) os.chown(forgejo_log_dir_path, self.ssh_uid, self.ssh_gid) data = { "forgejo_user" : self.ssh_user, "forgejo_group" : self.ssh_group, "forgejo_port" : str(self.forgejo_port), "forgejo_bin" : bin_path, "forgejo_app_ini" : conf_path, "forgejo_work_path" : forgejo_work_dir_path, "forgejo_data_path" : forgejo_data_dir_path, "forgejo_db_path" : forgejo_db_path, "forgejo_repository_root" : git_dir_path, "forgejo_lfs_path" : lfs_dir_path, "forgejo_log_path" : forgejo_log_dir_path, "forgejo_hostname" : self._runcmd("hostname"), "forgejo_lfs_jwt_secret" : self._runcmd([bin_path,"generate", "secret", "LFS_JWT_SECRET"]), "forgejo_jwt_secret" : self._runcmd([bin_path,"generate", "secret", "JWT_SECRET"]), "forgejo_internal_token" : self._runcmd([bin_path,"generate", "secret", "INTERNAL_TOKEN"]), "forgejo_secret_key" : self._runcmd([bin_path,"generate", "secret", "SECRET_KEY"]) } self._install_template(os.path.join(self.script_path, "forgejo-app.ini"), conf_path, data) self._install_template(os.path.join(self.script_path, "forgejo.service"), "/etc/systemd/system/opengnsys-forgejo.service", data) self.__logger.debug("Reloading systemd and starting service") subprocess.run(["systemctl", "daemon-reload"], check=True) subprocess.run(["systemctl", "enable", "opengnsys-forgejo"], check=True) subprocess.run(["systemctl", "restart", "opengnsys-forgejo"], check=True) self.__logger.info("Waiting for forgejo to start") self._wait_for_port("localhost", self.forgejo_port) self.__logger.info("Configuring forgejo") def run_forge_cmd(args): cmd = [bin_path, "--config", conf_path] + args self.__logger.debug("Running command: %s", cmd) ret = subprocess.run(cmd, check=False, capture_output=True, encoding='utf-8', user=self.ssh_user) if ret.returncode == 0: return ret.stdout.strip() else: self.__logger.error("Failed to run command: %s, return code %i", cmd, ret.returncode) self.__logger.error("stdout: %s", ret.stdout) self.__logger.error("stderr: %s", ret.stderr) raise RuntimeError("Failed to run necessary command") run_forge_cmd(["admin", "doctor", "check"]) run_forge_cmd(["admin", "user", "create", "--username", self.forgejo_user, "--password", self.forgejo_password, "--email", self.email]) token = run_forge_cmd(["admin", "user", "generate-access-token", "--username", self.forgejo_user, "-t", "gitapi", "--scopes", "all", "--raw"]) with open(os.path.join(self.base_path, "etc", "ogGitApiToken.cfg"), "w+", encoding='utf-8') as token_file: token_file.write(token) ssh_key = self._extract_ssh_key_from_initrd() self.add_forgejo_sshkey(ssh_key, "Default key") def add_forgejo_repo(self, repository_name, description = ""): token = "" with open(os.path.join(self.base_path, "etc", "ogGitApiToken.cfg"), "r", encoding='utf-8') as token_file: token = token_file.read().strip() self.__logger.info("Adding repository %s for Forgejo", repository_name) r = requests.post( f"http://localhost:{self.forgejo_port}/api/v1/user/repos", json={ "auto_init" : False, "default_branch" : "main", "description" : description, "name" : repository_name, "private" : False }, headers={ 'Authorization' : f"token {token}" }, timeout = 60 ) self.__logger.info("Request status was %i, content %s", r.status_code, r.content) def add_forgejo_sshkey(self, pubkey, description = ""): token = "" with open(os.path.join(self.base_path, "etc", "ogGitApiToken.cfg"), "r", encoding='utf-8') as token_file: token = token_file.read().strip() self.__logger.info("Adding SSH key to Forgejo: %s (%s)", pubkey, description) r = requests.post( f"http://localhost:{self.forgejo_port}/api/v1/user/keys", json={ "key" : pubkey, "read_only" : False, "title" : description }, headers={ 'Authorization' : f"token {token}" }, timeout = 60 ) self.__logger.info("Request status was %i, content %s", r.status_code, r.content) def add_forgejo_organization(self, pubkey, description = ""): token = "" with open(os.path.join(self.base_path, "etc", "ogGitApiToken.cfg"), "r", encoding='utf-8') as token_file: token = token_file.read().strip() self.__logger.info("Adding SSH key to Forgejo: %s", pubkey) r = requests.post( f"http://localhost:{self.forgejo_port}/api/v1/user/keys", json={ "key" : pubkey, "read_only" : False, "title" : description }, headers={ 'Authorization' : f"token {token}" }, timeout = 60 ) self.__logger.info("Request status was %i, content %s", r.status_code, r.content) if __name__ == '__main__': sys.stdout.reconfigure(encoding='utf-8') opengnsys_log_dir = "/opt/opengnsys/log" logger = logging.getLogger(__package__) logger.setLevel(logging.DEBUG) streamLog = logging.StreamHandler() streamLog.setLevel(logging.INFO) if not os.path.exists(opengnsys_log_dir): os.mkdir(opengnsys_log_dir) logFilePath = f"{opengnsys_log_dir}/git_installer.log" fileLog = logging.FileHandler(logFilePath) fileLog.setLevel(logging.DEBUG) formatter = logging.Formatter('%(asctime)s - %(name)24s - [%(levelname)5s] - %(message)s') streamLog.setFormatter(formatter) fileLog.setFormatter(formatter) logger.addHandler(streamLog) logger.addHandler(fileLog) parser = argparse.ArgumentParser( prog="OpenGnsys Installer", description="Script para la instalación del repositorio git", ) parser.add_argument('--forgejo-only', action='store_true', help="Solo instalar forgejo") parser.add_argument('--forgejo-addrepos', action='store_true', help="Solo agregar repositorios forgejo") parser.add_argument('--testmode', action='store_true', help="Modo de prueba") parser.add_argument('--ignoresshkey', action='store_true', help="Ignorar clave de SSH") parser.add_argument('--usesshkey', type=str, help="Usar clave SSH especificada") parser.add_argument('--test-createuser', action='store_true') parser.add_argument('--extract-ssh-key', action='store_true', help="Extract SSH key from oglive squashfs") parser.add_argument('--set-ssh-key', action='store_true', help="Read SSH key from oglive squashfs and set it in Forgejo") parser.add_argument('--extract-ssh-key-from-initrd', action='store_true', help="Extract SSH key from oglive initrd (obsolete)") parser.add_argument('--set-ssh-key-in-initrd', action='store_true', help="Configure SSH key in oglive (obsolete)") parser.add_argument('--oglive', type=int, metavar='NUM', help = "Do SSH key manipulation on this oglive") parser.add_argument('--quiet', action='store_true', help="Quiet console output") parser.add_argument("-v", "--verbose", action="store_true", help = "Verbose console output") args = parser.parse_args() if args.quiet: streamLog.setLevel(logging.WARNING) if args.verbose: streamLog.setLevel(logging.DEBUG) installer = OpengnsysGitInstaller() installer.set_testmode(args.testmode) installer.set_ignoresshkey(args.ignoresshkey) installer.set_usesshkey(args.usesshkey) logger.debug("Inicio de instalación") try: if args.forgejo_only: installer.install_forgejo() elif args.forgejo_addrepos: installer.add_forgejo_repo("linux") elif args.test_createuser: installer.set_ssh_user_group("oggit2", "oggit2") elif args.extract_ssh_key: keys = installer.extract_ssh_keys(oglive_num = args.oglive) print(f"{keys}") elif args.extract_ssh_key_from_initrd: key = installer._extract_ssh_key_from_initrd() print(f"{key}") elif args.set_ssh_key: installer.add_ssh_key_from_squashfs(oglive_num=args.oglive) elif args.set_ssh_key_in_initrd: installer.set_ssh_key_in_initrd() else: installer.install() installer.install_forgejo() installer.add_forgejo_repo("windows", "Windows") installer.add_forgejo_repo("linux", "Linux") installer.add_forgejo_repo("mac", "Mac") except RequirementException as req: show_error(f"Requisito para la instalación no satisfecho: {req.message}") exit(1)