#!/usr/bin/env python3 """Script para la instalación del repositorio git""" import os import shutil import argparse import tempfile import logging import subprocess import sys import pwd import grp from termcolor import colored, cprint import git import libarchive def show_error(*args): """ Imprime un mensaje de error Args: *args: Argumentos igual que a la función print Returns: None """ cprint(*args, "red", attrs = ["bold"], file=sys.stderr) class RequirementException(Exception): """Excepción que indica que nos falta algún requisito Attributes: message (str): Mensaje de error mostrado al usuario """ def __init__(self, message): """Inicializar RequirementException. Args: message (str): Mensaje de error mostrado al usuario """ super().__init__(message) self.message = message class FakeTemporaryDirectory: """Imitación de TemporaryDirectory para depuración""" def __init__(self, dirname): self.name = dirname os.makedirs(dirname, exist_ok=True) def __str__(self): return self.name class Oglive: """Interfaz a utilidad oglivecli Esto es probablemente temporal hasta que se haga una conversión de oglivecli """ def __init__(self): self.__logger = logging.getLogger("Oglive") self.binary = "/opt/opengnsys/bin/oglivecli" self.__logger.debug("Inicializando") def _cmd(self, args): cmd = [self.binary] + args self.__logger.debug("comando: %s", cmd) proc = subprocess.run(cmd, shell=False, check=True, capture_output=True) out_text = proc.stdout.decode('utf-8').strip() self.__logger.debug("salida: %s", out_text) return out_text def get_default(self): """Devuelve el cliente por defecto""" self.__logger.debug("get_default()") return self._cmd(["get-default"]) def get_clients(self): """Devuelve la lista de clientes en un dict""" self.__logger.debug("get_clients()") lines = self._cmd(["list"]).splitlines() clients = {} for line in lines: (number, name) = line.split() clients[number] = name self.__logger.debug("Clientes: %s", clients) return clients class OpengnsysGitInstaller: """Instalador de OpenGnsys""" def __init__(self): """Inicializar clase""" self.__logger = logging.getLogger("OpengnsysGitInstaller") self.__logger.setLevel(logging.DEBUG) self.__logger.debug("Inicializando") self.testmode = False self.base_path = "/opt/opengnsys" self.git_basedir = "base.git" self.ssh_user = "opengnsys" self.ssh_group = "opengnsys" self.ssh_homedir = pwd.getpwnam(self.ssh_user).pw_dir self.ssh_uid = pwd.getpwnam(self.ssh_user).pw_uid self.ssh_gid = grp.getgrnam(self.ssh_group).gr_gid self.temp_dir = None # Possible names for SSH key self.key_paths = ["scripts/ssl/id_rsa.pub", "scripts/ssl/id_ed25519.pub", "scripts/ssl/id_ecdsa.pub", "scripts/ssl/id_ed25519_sk.pub", "scripts/ssl/id_ecdsa_sk.pub"] self.key_paths_dict = {} for kp in self.key_paths: self.key_paths_dict[kp] = 1 self.oglive = Oglive() def set_testmode(self, value): """Establece el modo de prueba""" self.testmode = value def set_ignoresshkey(self, value): """Ignorar requisito de clave de ssh para el instalador""" self.ignoresshkey = value def set_usesshkey(self, value): """Usar clave de ssh especificada""" self.usesshkey = value def set_basepath(self, value): """Establece ruta base de OpenGnsys Valor por defecto: /opt/opengnsys """ self.base_path = value def _get_tempdir(self): """Obtiene el directorio temporal""" if self.testmode: dirname = "/tmp/ogtemp" if os.path.exists(dirname): shutil.rmtree(dirname) dir=FakeTemporaryDirectory(dirname) self.__logger.debug("Modo de prueba, temp=/tmp/ogtemp") return dir else: dir = tempfile.TemporaryDirectory() self.__logger.debug("Temp = %s", dir) return dir def _cleanup(self): """Limpia el directorio temporal""" if self.temp_dir: shutil.rmtree(self.temp_dir, ignore_errors=True) def _init_git_repo(self, reponame): """Inicializa un repositorio Git""" # Creamos repositorio ogdir_images = os.path.join(self.base_path, "images") self.__logger.info("Creando repositorio de GIT %s", reponame) os.makedirs(os.path.join(ogdir_images, self.git_basedir), exist_ok=True) repo_path=os.path.join(ogdir_images, reponame) shutil.rmtree(repo_path, ignore_errors=True) # Marcar como directorio seguro # Nota: no usar GitPython. Config global falla, aunque hay indicaciones de que # git.Repo(path=None) es valido. Posiblemente bug de GitPython. subprocess.run(["git", "config", "--global", "add" "safe.directory", repo_path]) self.__logger.debug("Inicializando repositorio: " + repo_path) repo = git.Repo.init(repo_path, bare = True) self.__logger.info("Configurando repositorio de GIT") repo.config_writer().set_value("user", "name", "OpenGnsys").release() repo.config_writer().set_value("user", "email", "OpenGnsys@opengnsys.com").release() self._recursive_chown(repo_path, ouid=self.ssh_uid, ogid=self.ssh_gid) def _add_line_to_file(self, filename, new_line): """Agrega una línea a un archivo""" found = False self.__logger.debug("Agregando linea: %s a %s", new_line, filename) with open(filename, "a+", encoding="utf-8") as f: f.seek(0) for line in f: if line.strip() == new_line.strip(): found = True if not found: self.__logger.debug("Agregando linea: %s", new_line) f.write(new_line + "\n") else: self.__logger.debug("Linea ya presente") def _recursive_chown(self, path, ouid, ogid): """Cambia el propietario y grupo de forma recursiva""" for dirpath, _, filenames in os.walk(path): os.chown(dirpath, uid=ouid, gid=ogid) for filename in filenames: os.chown(os.path.join(dirpath, filename), uid=ouid, gid=ogid) def install(self): """Instalar Ejecuta todo el proceso de instalación incluyendo: * Dependencias * Configuración de authorized_keys * Configuración de ssh * Creación de repositorio Raises: RequirementException: No ejecutado por usuario root RequirementException: No ejecutado en Debian o Ubuntu RequirementException: Falta clave pública RequirementException: Python < 3.8 """ self.__logger.info("install()") ogdir_images = os.path.join(self.base_path, "images") ENGINECFG = os.path.join(self.base_path, "client/etc/engine.cfg") os.environ["PATH"] += os.pathsep + os.path.join(self.base_path, "bin") tftp_dir = os.path.join(self.base_path, "tftpboot") INITRD = "oginitrd.img" self.temp_dir = self._get_tempdir() SSHUSER = "opengnsys" # Control básico de errores. self.__logger.debug("Comprobando euid") if os.geteuid() != 0: raise RequirementException("Sólo ejecutable por root") if not os.path.exists("/etc/debian_version"): raise RequirementException("Instalación sólo soportada en Debian y Ubuntu") MIN_PYTHON = (3, 8) if sys.version_info < MIN_PYTHON: raise RequirementException(f"Python %s.%s mínimo requerido.\n" % MIN_PYTHON) self.__logger.debug("Instalando dependencias") subprocess.run(["apt-get", "install", "-y", "git"], check=True) # Autenticación del usuario opengnsys con clave pública desde los ogLive # Requiere que todos los ogLive tengan la misma clave publica (utilizar setsslkey) # Tomamos la clave publica del cliente por defecto default_num = self.oglive.get_default() default_client = self.oglive.get_clients()[default_num] client_initrd_path = os.path.join(tftp_dir, default_client, INITRD) self.__logger.debug("Ruta de initrd: %s", client_initrd_path) # Si me salgo con error borro el directorio temporal if not self.ignoresshkey: public_key="" if self.usesshkey: with open(self.usesshkey, 'r') as f: public_key = f.read().strip() else: if os.path.isfile(client_initrd_path): #os.makedirs(temp_dir, exist_ok=True) os.chdir(self.temp_dir.name) self.__logger.debug("Descomprimiendo %s", client_initrd_path) public_key = None with libarchive.file_reader(client_initrd_path) as initrd: for file in initrd: self.__logger.debug("Archivo: %s", file) if file.pathname in self.key_paths_dict: data = bytearray() for block in file.get_blocks(): data = data + block public_key = data.decode('utf-8').strip() break else: print(f"No se encuentra la imagen de initrd {client_initrd_path}") exit(2) # Si la clave publica no existe me salgo con error if not public_key: raise RequirementException(f"No se encuentra clave pública dentro del ogLive en {self.temp_dir}, imagen {client_initrd_path}. Rutas buscadas: {self.key_paths}\n" + "Los oglive deben tener la misma clave pública (utilizar setsslkey)") ssh_dir = os.path.join(self.ssh_homedir, ".ssh") authorized_keys_file = os.path.join(ssh_dir, "authorized_keys") self.__logger.debug("Configurando ssh: Agregando clave %s a %s", public_key, authorized_keys_file) self.__logger.debug("Key: %s", public_key) os.makedirs(ssh_dir, exist_ok=True) self._add_line_to_file(authorized_keys_file, public_key) os.chmod(authorized_keys_file, 0o600) os.chown(ssh_dir, uid=self.ssh_uid, gid=self.ssh_gid) os.chown(authorized_keys_file, uid=self.ssh_uid, gid=self.ssh_gid) # Configuramos el servicio ssh para que permita el acceso con clave pública self.__logger.info(" Configuramos el servicio ssh para que permita el acceso con clave pública.") with open("/etc/ssh/sshd_config", "r") as f: sshd_config = f.read() sshd_config = sshd_config.replace("PubkeyAuthentication no", "PubkeyAuthentication yes") with open("/etc/ssh/sshd_config", "w") as f: f.write(sshd_config) os.system("systemctl reload ssh") # Instalamos git os.system("apt install git") # Para que el usuario sólo pueda usar git (no ssh) SHELL = shutil.which("git-shell") os.system(f"usermod -s {SHELL} opengnsys") # Creamos repositorios self._init_git_repo('windows.git') self._init_git_repo('linux.git') self._init_git_repo('mac.git') # Damos permiso al usuario opengnsys for DIR in ["base.git", "linux.git", "windows.git"]: #, "LinAcl", "WinAcl"]: self._recursive_chown(os.path.join(ogdir_images, DIR), ouid=self.ssh_uid, ogid=self.ssh_gid) if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)20s - [%(levelname)5s] - %(message)s') logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) logger.info("Inicio del programa") parser = argparse.ArgumentParser( prog="OpenGnsys Installer", description="Script para la instalación del repositorio git", ) parser.add_argument('--testmode', action='store_true', help="Modo de prueba") parser.add_argument('--ignoresshkey', action='store_true', help="Ignorar clave de SSH") parser.add_argument('--usesshkey', type=str, help="Usar clave SSH especificada") args = parser.parse_args() installer = OpengnsysGitInstaller() installer.set_testmode(args.testmode) installer.set_ignoresshkey(args.ignoresshkey) installer.set_usesshkey(args.usesshkey) logger.debug("Inicio de instalación") try: installer.install() except RequirementException as req: show_error(f"Requisito para la instalación no satisfecho: {req.message}") exit(1)