#!/usr/bin/env python3 """Script para la instalación del repositorio git""" import os import shutil import argparse import tempfile import logging import subprocess import sys import pwd import grp from termcolor import cprint import git import libarchive import urllib.request import pathlib import socket import time import requests #FORGEJO_VERSION="8.0.3" FORGEJO_VERSION="9.0.0" FORGEJO_URL=f"https://codeberg.org/forgejo/forgejo/releases/download/v{FORGEJO_VERSION}/forgejo-{FORGEJO_VERSION}-linux-amd64" def show_error(*args): """ Imprime un mensaje de error Args: *args: Argumentos igual que a la función print Returns: None """ cprint(*args, "red", attrs = ["bold"], file=sys.stderr) class RequirementException(Exception): """Excepción que indica que nos falta algún requisito Attributes: message (str): Mensaje de error mostrado al usuario """ def __init__(self, message): """Inicializar RequirementException. Args: message (str): Mensaje de error mostrado al usuario """ super().__init__(message) self.message = message class FakeTemporaryDirectory: """Imitación de TemporaryDirectory para depuración""" def __init__(self, dirname): self.name = dirname os.makedirs(dirname, exist_ok=True) def __str__(self): return self.name class Oglive: """Interfaz a utilidad oglivecli Esto es probablemente temporal hasta que se haga una conversión de oglivecli """ def __init__(self): self.__logger = logging.getLogger("Oglive") self.binary = "/opt/opengnsys/bin/oglivecli" self.__logger.debug("Inicializando") def _cmd(self, args): cmd = [self.binary] + args self.__logger.debug("comando: %s", cmd) proc = subprocess.run(cmd, shell=False, check=True, capture_output=True) out_text = proc.stdout.decode('utf-8').strip() self.__logger.debug("salida: %s", out_text) return out_text def get_default(self): """Devuelve el cliente por defecto""" self.__logger.debug("get_default()") return self._cmd(["get-default"]) def get_clients(self): """Devuelve la lista de clientes en un dict""" self.__logger.debug("get_clients()") lines = self._cmd(["list"]).splitlines() clients = {} for line in lines: (number, name) = line.split() clients[number] = name self.__logger.debug("Clientes: %s", clients) return clients class OpengnsysGitInstaller: """Instalador de OpenGnsys""" def __init__(self): """Inicializar clase""" self.__logger = logging.getLogger("OpengnsysGitInstaller") self.__logger.setLevel(logging.DEBUG) self.__logger.debug("Inicializando") self.testmode = False self.base_path = "/opt/opengnsys" self.git_basedir = "base.git" self.email = "OpenGnsys@opengnsys.com" self.forgejo_user = "oggit" self.forgejo_password = "opengnsys" self.forgejo_organization = "opengnsys" self.forgejo_port = 3000 self.set_ssh_user_group("oggit", "oggit") self.temp_dir = None self.script_path = os.path.realpath(os.path.dirname(__file__)) # Possible names for SSH key self.key_paths = ["scripts/ssl/id_rsa.pub", "scripts/ssl/id_ed25519.pub", "scripts/ssl/id_ecdsa.pub", "scripts/ssl/id_ed25519_sk.pub", "scripts/ssl/id_ecdsa_sk.pub"] self.key_paths_dict = {} for kp in self.key_paths: self.key_paths_dict[kp] = 1 self.oglive = Oglive() def set_testmode(self, value): """Establece el modo de prueba""" self.testmode = value def set_ignoresshkey(self, value): """Ignorar requisito de clave de ssh para el instalador""" self.ignoresshkey = value def set_usesshkey(self, value): """Usar clave de ssh especificada""" self.usesshkey = value def set_basepath(self, value): """Establece ruta base de OpenGnsys Valor por defecto: /opt/opengnsys """ self.base_path = value def _get_tempdir(self): """Obtiene el directorio temporal""" if self.testmode: dirname = "/tmp/ogtemp" if os.path.exists(dirname): shutil.rmtree(dirname) dir=FakeTemporaryDirectory(dirname) self.__logger.debug("Modo de prueba, temp=/tmp/ogtemp") return dir else: dir = tempfile.TemporaryDirectory() self.__logger.debug("Temp = %s", dir) return dir def _cleanup(self): """Limpia el directorio temporal""" if self.temp_dir: shutil.rmtree(self.temp_dir, ignore_errors=True) def set_ssh_user_group(self, username, groupname): self.ssh_group = groupname self.ssh_user = username try: self.ssh_gid = grp.getgrnam(self.ssh_group).gr_gid self.__logger.info("Group %s exists with gid %i", self.ssh_group, self.ssh_gid) except KeyError: self.__logger.info("Need to create group %s", self.ssh_group) subprocess.run(["/usr/sbin/groupadd", "--system", self.ssh_group], check=True) self.ssh_gid = grp.getgrnam(groupname).gr_gid try: self.ssh_uid = pwd.getpwnam(self.ssh_user).pw_uid self.__logger.info("User %s exists with gid %i", self.ssh_user, self.ssh_uid) except KeyError: self.__logger.info("Need to create user %s", self.ssh_user) subprocess.run(["/usr/sbin/useradd", "--gid", str(self.ssh_gid), "-m", "--system", self.ssh_user], check=True) self.ssh_uid = pwd.getpwnam(username).pw_uid self.ssh_homedir = pwd.getpwnam(username).pw_dir def init_git_repo(self, reponame): """Inicializa un repositorio Git""" # Creamos repositorio ogdir_images = os.path.join(self.base_path, "images") self.__logger.info("Creando repositorio de GIT %s", reponame) os.makedirs(os.path.join(ogdir_images, self.git_basedir), exist_ok=True) repo_path=os.path.join(ogdir_images, reponame) shutil.rmtree(repo_path, ignore_errors=True) # Marcar como directorio seguro # Nota: no usar GitPython. Config global falla, aunque hay indicaciones de que # git.Repo(path=None) es valido. Posiblemente bug de GitPython. subprocess.run(["git", "config", "--global", "add" "safe.directory", repo_path]) self.__logger.debug("Inicializando repositorio: " + repo_path) repo = git.Repo.init(repo_path, bare = True) self.__logger.info("Configurando repositorio de GIT") repo.config_writer().set_value("user", "name", "OpenGnsys").release() repo.config_writer().set_value("user", "email", self.email).release() self._recursive_chown(repo_path, ouid=self.ssh_uid, ogid=self.ssh_gid) def _add_line_to_file(self, filename, new_line): """Agrega una línea a un archivo""" found = False self.__logger.debug("Agregando linea: %s a %s", new_line, filename) with open(filename, "a+", encoding="utf-8") as f: f.seek(0) for line in f: if line.strip() == new_line.strip(): found = True if not found: self.__logger.debug("Agregando linea: %s", new_line) f.write(new_line + "\n") else: self.__logger.debug("Linea ya presente") def _recursive_chown(self, path, ouid, ogid): """Cambia el propietario y grupo de forma recursiva""" for dirpath, _, filenames in os.walk(path): os.chown(dirpath, uid=ouid, gid=ogid) for filename in filenames: os.chown(os.path.join(dirpath, filename), uid=ouid, gid=ogid) def _wait_for_port(self, host, port): self.__logger.info("Waiting for %s:%i to be up", host, port) timeout = 60 start_time = time.time() ready = False while not ready and (time.time() - start_time) < 60: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: s.connect((host, port)) ready = True s.close() except TimeoutError: self.__logger.debug("Timed out, no connection yet.") except OSError as oserr: self.__logger.debug("%s, no connection yet. %.1f seconds left.", oserr.strerror, timeout - (time.time() - start_time)) time.sleep(0.1) if ready: self.__logger.info("Connection established.") else: self.__logger.error("Timed out waiting for connection!") raise TimeoutError("Timed out waiting for connection!") def _extract_ssh_key(self): public_key="" INITRD = "oginitrd.img" tftp_dir = os.path.join(self.base_path, "tftpboot") default_num = self.oglive.get_default() default_client = self.oglive.get_clients()[default_num] client_initrd_path = os.path.join(tftp_dir, default_client, INITRD) #self.temp_dir = self._get_tempdir() if self.usesshkey: with open(self.usesshkey, 'r') as f: public_key = f.read().strip() else: if os.path.isfile(client_initrd_path): #os.makedirs(temp_dir, exist_ok=True) #os.chdir(self.temp_dir.name) self.__logger.debug("Descomprimiendo %s", client_initrd_path) public_key = None with libarchive.file_reader(client_initrd_path) as initrd: for file in initrd: #self.__logger.debug("Archivo: %s", file) if file.pathname in self.key_paths_dict: data = bytearray() for block in file.get_blocks(): data = data + block public_key = data.decode('utf-8').strip() break else: print(f"No se encuentra la imagen de initrd {client_initrd_path}") exit(2) return public_key def install(self): """Instalar Ejecuta todo el proceso de instalación incluyendo: * Dependencias * Configuración de authorized_keys * Configuración de ssh * Creación de repositorio Raises: RequirementException: No ejecutado por usuario root RequirementException: No ejecutado en Debian o Ubuntu RequirementException: Falta clave pública RequirementException: Python < 3.8 """ self.__logger.info("install()") ogdir_images = os.path.join(self.base_path, "images") ENGINECFG = os.path.join(self.base_path, "client/etc/engine.cfg") os.environ["PATH"] += os.pathsep + os.path.join(self.base_path, "bin") tftp_dir = os.path.join(self.base_path, "tftpboot") INITRD = "oginitrd.img" self.temp_dir = self._get_tempdir() SSHUSER = "opengnsys" # Control básico de errores. self.__logger.debug("Comprobando euid") if os.geteuid() != 0: raise RequirementException("Sólo ejecutable por root") if not os.path.exists("/etc/debian_version"): raise RequirementException("Instalación sólo soportada en Debian y Ubuntu") MIN_PYTHON = (3, 8) if sys.version_info < MIN_PYTHON: raise RequirementException(f"Python %s.%s mínimo requerido.\n" % MIN_PYTHON) self.__logger.debug("Instalando dependencias") subprocess.run(["apt-get", "install", "-y", "git"], check=True) # Autenticación del usuario opengnsys con clave pública desde los ogLive # Requiere que todos los ogLive tengan la misma clave publica (utilizar setsslkey) # Tomamos la clave publica del cliente por defecto default_num = self.oglive.get_default() default_client = self.oglive.get_clients()[default_num] client_initrd_path = os.path.join(tftp_dir, default_client, INITRD) self.__logger.debug("Ruta de initrd: %s", client_initrd_path) # Si me salgo con error borro el directorio temporal if not self.ignoresshkey: public_key = self._extract_ssh_key() # Si la clave publica no existe me salgo con error if not public_key: raise RequirementException(f"No se encuentra clave pública dentro del ogLive en {self.temp_dir}, imagen {client_initrd_path}. Rutas buscadas: {self.key_paths}\n" + "Los oglive deben tener la misma clave pública (utilizar setsslkey)") ssh_dir = os.path.join(self.ssh_homedir, ".ssh") authorized_keys_file = os.path.join(ssh_dir, "authorized_keys") self.__logger.debug("Configurando ssh: Agregando clave %s a %s", public_key, authorized_keys_file) self.__logger.debug("Key: %s", public_key) os.makedirs(ssh_dir, exist_ok=True) self._add_line_to_file(authorized_keys_file, public_key) os.chmod(authorized_keys_file, 0o600) os.chown(ssh_dir, uid=self.ssh_uid, gid=self.ssh_gid) os.chown(authorized_keys_file, uid=self.ssh_uid, gid=self.ssh_gid) # Configuramos el servicio ssh para que permita el acceso con clave pública self.__logger.info(" Configuramos el servicio ssh para que permita el acceso con clave pública.") with open("/etc/ssh/sshd_config", "r") as f: sshd_config = f.read() sshd_config = sshd_config.replace("PubkeyAuthentication no", "PubkeyAuthentication yes") with open("/etc/ssh/sshd_config", "w") as f: f.write(sshd_config) os.system("systemctl reload ssh") # Instalamos git os.system("apt install git") # Para que el usuario sólo pueda usar git (no ssh) SHELL = shutil.which("git-shell") os.system(f"usermod -s {SHELL} opengnsys") # Creamos repositorios #self.init_git_repo('windows.git') #self.init_git_repo('linux.git') #self.init_git_repo('mac.git') # Damos permiso al usuario opengnsys #for DIR in ["base.git", "linux.git", "windows.git"]: #, "LinAcl", "WinAcl"]: # self._recursive_chown(os.path.join(ogdir_images, DIR), ouid=self.ssh_uid, ogid=self.ssh_gid) def _install_template(self, template, destination, keysvalues): self.__logger.info("Writing template %s into %s", template, destination) data = "" with open(template, "r", encoding="utf-8") as template_file: data = template_file.read() for key in keysvalues.keys(): data = data.replace("{" + key + "}", keysvalues[key]) with open(destination, "w+", encoding="utf-8") as out_file: out_file.write(data) def _runcmd(self, cmd): self.__logger.debug("Running: %s", cmd) ret = subprocess.run(cmd, check=True,capture_output=True, encoding='utf-8') return ret.stdout.strip() def install_forgejo(self): self.__logger.info("Installing Forgejo") bin_path = os.path.join(self.base_path, "bin", "forgejo") conf_dir_path = os.path.join(self.base_path, "etc", "forgejo") lfs_dir_path = os.path.join(self.base_path, "images", "git-lfs") git_dir_path = os.path.join(self.base_path, "images", "git") forgejo_work_dir_path = os.path.join(self.base_path, "var", "lib", "forgejo/work") forgejo_db_dir_path = os.path.join(self.base_path, "var", "lib", "forgejo/db") forgejo_data_dir_path = os.path.join(self.base_path, "var", "lib", "forgejo/data") forgejo_db_path = os.path.join(forgejo_db_dir_path, "forgejo.db") forgejo_log_dir_path = os.path.join(self.base_path, "log", "forgejo") conf_path = os.path.join(conf_dir_path, "app.ini") self.__logger.debug("Stopping opengnsys-forgejo service") subprocess.run(["systemctl", "stop", "opengnsys-forgejo"], check=False) if not os.path.exists(bin_path): self.__logger.debug("Downloading from %s into %s", FORGEJO_URL, bin_path) urllib.request.urlretrieve(FORGEJO_URL, bin_path) os.chmod(bin_path, 0o755) if os.path.exists(forgejo_db_path): self.__logger.debug("Removing old configuration") os.unlink(forgejo_db_path) else: self.__logger.debug("Old configuration not present, ok.") self.__logger.debug("Wiping old data") for dir in [conf_dir_path, git_dir_path, lfs_dir_path, forgejo_work_dir_path, forgejo_data_dir_path, forgejo_db_dir_path]: if os.path.exists(dir): self.__logger.debug("Removing %s", dir) shutil.rmtree(dir) self.__logger.debug("Creating directories") pathlib.Path(conf_dir_path).mkdir(parents=True, exist_ok=True) pathlib.Path(git_dir_path).mkdir(parents=True, exist_ok=True) pathlib.Path(lfs_dir_path).mkdir(parents=True, exist_ok=True) pathlib.Path(forgejo_work_dir_path).mkdir(parents=True, exist_ok=True) pathlib.Path(forgejo_data_dir_path).mkdir(parents=True, exist_ok=True) pathlib.Path(forgejo_db_dir_path).mkdir(parents=True, exist_ok=True) pathlib.Path(forgejo_log_dir_path).mkdir(parents=True, exist_ok=True) os.chown(lfs_dir_path, self.ssh_uid, self.ssh_gid) os.chown(git_dir_path, self.ssh_uid, self.ssh_gid) os.chown(forgejo_data_dir_path, self.ssh_uid, self.ssh_gid) os.chown(forgejo_work_dir_path, self.ssh_uid, self.ssh_gid) os.chown(forgejo_db_dir_path, self.ssh_uid, self.ssh_gid) os.chown(forgejo_log_dir_path, self.ssh_uid, self.ssh_gid) data = { "forgejo_user" : self.ssh_user, "forgejo_group" : self.ssh_group, "forgejo_port" : str(self.forgejo_port), "forgejo_bin" : bin_path, "forgejo_app_ini" : conf_path, "forgejo_work_path" : forgejo_work_dir_path, "forgejo_data_path" : forgejo_data_dir_path, "forgejo_db_path" : forgejo_db_path, "forgejo_repository_root" : git_dir_path, "forgejo_lfs_path" : lfs_dir_path, "forgejo_log_path" : forgejo_log_dir_path, "forgejo_hostname" : self._runcmd("hostname"), "forgejo_lfs_jwt_secret" : self._runcmd([bin_path,"generate", "secret", "LFS_JWT_SECRET"]), "forgejo_jwt_secret" : self._runcmd([bin_path,"generate", "secret", "JWT_SECRET"]), "forgejo_internal_token" : self._runcmd([bin_path,"generate", "secret", "INTERNAL_TOKEN"]), "forgejo_secret_key" : self._runcmd([bin_path,"generate", "secret", "SECRET_KEY"]) } self._install_template(os.path.join(self.script_path, "forgejo-app.ini"), conf_path, data) self._install_template(os.path.join(self.script_path, "forgejo.service"), "/etc/systemd/system/opengnsys-forgejo.service", data) self.__logger.debug("Reloading systemd and starting service") subprocess.run(["systemctl", "daemon-reload"], check=True) subprocess.run(["systemctl", "enable", "opengnsys-forgejo"], check=True) subprocess.run(["systemctl", "restart", "opengnsys-forgejo"], check=True) self.__logger.info("Waiting for forgejo to start") self._wait_for_port("localhost", self.forgejo_port) self.__logger.info("Configuring forgejo") def run_forge_cmd(args): cmd = [bin_path, "--config", conf_path] + args self.__logger.debug("Running command: %s", cmd) ret = subprocess.run(cmd, check=False, capture_output=True, encoding='utf-8', user=self.ssh_user) if ret.returncode == 0: return ret.stdout.strip() else: self.__logger.error("Failed to run command: %s, return code %i", cmd, ret.returncode) self.__logger.error("stdout: %s", ret.stdout) self.__logger.error("stderr: %s", ret.stderr) raise RuntimeError("Failed to run necessary command") run_forge_cmd(["admin", "doctor", "check"]) run_forge_cmd(["admin", "user", "create", "--username", self.forgejo_user, "--password", self.forgejo_password, "--email", self.email]) token = run_forge_cmd(["admin", "user", "generate-access-token", "--username", self.forgejo_user, "-t", "gitapi", "--scopes", "all", "--raw"]) with open(os.path.join(self.base_path, "etc", "ogGitApiToken.cfg"), "w+", encoding='utf-8') as token_file: token_file.write(token) ssh_key = self._extract_ssh_key() self.add_forgejo_sshkey(ssh_key, "Default key") def add_forgejo_repo(self, repository_name, description = ""): token = "" with open(os.path.join(self.base_path, "etc", "ogGitApiToken.cfg"), "r", encoding='utf-8') as token_file: token = token_file.read().strip() self.__logger.info("Adding repository %s for Forgejo", repository_name) r = requests.post( f"http://localhost:{self.forgejo_port}/api/v1/user/repos", json={ "auto_init" : False, "default_branch" : "main", "description" : description, "name" : repository_name, "private" : False }, headers={ 'Authorization' : f"token {token}" }, timeout = 60 ) self.__logger.info("Request status was %i", r.status_code) def add_forgejo_sshkey(self, pubkey, description = ""): token = "" with open(os.path.join(self.base_path, "etc", "ogGitApiToken.cfg"), "r", encoding='utf-8') as token_file: token = token_file.read().strip() self.__logger.info("Adding SSH key to Forgejo: %s", pubkey) r = requests.post( f"http://localhost:{self.forgejo_port}/api/v1/user/keys", json={ "key" : pubkey, "read_only" : False, "title" : description }, headers={ 'Authorization' : f"token {token}" }, timeout = 60 ) self.__logger.info("Request status was %i", r.status_code) if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)20s - [%(levelname)5s] - %(message)s') logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) logger.info("Inicio del programa") parser = argparse.ArgumentParser( prog="OpenGnsys Installer", description="Script para la instalación del repositorio git", ) parser.add_argument('--forgejo-only', action='store_true', help="Solo instalar forgejo") parser.add_argument('--forgejo-addrepos', action='store_true', help="Solo agregar repositorios forgejo") parser.add_argument('--testmode', action='store_true', help="Modo de prueba") parser.add_argument('--ignoresshkey', action='store_true', help="Ignorar clave de SSH") parser.add_argument('--usesshkey', type=str, help="Usar clave SSH especificada") parser.add_argument('--test-createuser', action='store_true') args = parser.parse_args() installer = OpengnsysGitInstaller() installer.set_testmode(args.testmode) installer.set_ignoresshkey(args.ignoresshkey) installer.set_usesshkey(args.usesshkey) logger.debug("Inicio de instalación") try: if args.forgejo_only: installer.install_forgejo() elif args.forgejo_addrepos: installer.add_forgejo_repo("linux") elif args.test_createuser: installer.set_ssh_user_group("oggit2", "oggit2") else: installer.install() installer.install_forgejo() installer.add_forgejo_repo("windows", "Windows") installer.add_forgejo_repo("linux", "Linux") installer.add_forgejo_repo("mac", "Mac") except RequirementException as req: show_error(f"Requisito para la instalación no satisfecho: {req.message}") exit(1)