oggit/installer/opengnsys_git_installer.py

373 lines
13 KiB
Python

#!/usr/bin/env python3
"""Script para la instalación del repositorio git"""
import os
import shutil
import argparse
import tempfile
import logging
import subprocess
import sys
import pwd
import grp
from termcolor import colored, cprint
import git
import libarchive
def show_error(*args):
"""
Imprime un mensaje de error
Args:
*args: Argumentos igual que a la función print
Returns:
None
"""
cprint(*args, "red", attrs = ["bold"], file=sys.stderr)
class RequirementException(Exception):
"""Excepción que indica que nos falta algún requisito
Attributes:
message (str): Mensaje de error mostrado al usuario
"""
def __init__(self, message):
"""Inicializar RequirementException.
Args:
message (str): Mensaje de error mostrado al usuario
"""
super().__init__(message)
self.message = message
class FakeTemporaryDirectory:
"""Imitación de TemporaryDirectory para depuración"""
def __init__(self, dirname):
self.name = dirname
os.makedirs(dirname, exist_ok=True)
def __str__(self):
return self.name
class Oglive:
"""Interfaz a utilidad oglivecli
Esto es probablemente temporal hasta que se haga una conversión de oglivecli
"""
def __init__(self):
self.__logger = logging.getLogger("Oglive")
self.binary = "/opt/opengnsys/bin/oglivecli"
self.__logger.debug("Inicializando")
def _cmd(self, args):
cmd = [self.binary] + args
self.__logger.debug("comando: %s", cmd)
proc = subprocess.run(cmd, shell=False, check=True, capture_output=True)
out_text = proc.stdout.decode('utf-8').strip()
self.__logger.debug("salida: %s", out_text)
return out_text
def get_default(self):
"""Devuelve el cliente por defecto"""
self.__logger.debug("get_default()")
return self._cmd(["get-default"])
def get_clients(self):
"""Devuelve la lista de clientes en un dict"""
self.__logger.debug("get_clients()")
lines = self._cmd(["list"]).splitlines()
clients = {}
for line in lines:
(number, name) = line.split()
clients[number] = name
self.__logger.debug("Clientes: %s", clients)
return clients
class OpengnsysGitInstaller:
"""Instalador de OpenGnsys"""
def __init__(self):
"""Inicializar clase"""
self.__logger = logging.getLogger("OpengnsysGitInstaller")
self.__logger.setLevel(logging.DEBUG)
self.__logger.debug("Inicializando")
self.testmode = False
self.base_path = "/opt/opengnsys"
self.git_basedir = "base.git"
self.ssh_user = "opengnsys"
self.ssh_group = "opengnsys"
self.ssh_homedir = pwd.getpwnam(self.ssh_user).pw_dir
self.ssh_uid = pwd.getpwnam(self.ssh_user).pw_uid
self.ssh_gid = grp.getgrnam(self.ssh_group).gr_gid
self.temp_dir = None
# Possible names for SSH key
self.key_paths = ["scripts/ssl/id_rsa.pub", "scripts/ssl/id_ed25519.pub", "scripts/ssl/id_ecdsa.pub", "scripts/ssl/id_ed25519_sk.pub", "scripts/ssl/id_ecdsa_sk.pub"]
self.key_paths_dict = {}
for kp in self.key_paths:
self.key_paths_dict[kp] = 1
self.oglive = Oglive()
def set_testmode(self, value):
"""Establece el modo de prueba"""
self.testmode = value
def set_ignoresshkey(self, value):
"""Ignorar requisito de clave de ssh para el instalador"""
self.ignoresshkey = value
def set_usesshkey(self, value):
"""Usar clave de ssh especificada"""
self.usesshkey = value
def set_basepath(self, value):
"""Establece ruta base de OpenGnsys
Valor por defecto: /opt/opengnsys
"""
self.base_path = value
def _get_tempdir(self):
"""Obtiene el directorio temporal"""
if self.testmode:
dirname = "/tmp/ogtemp"
if os.path.exists(dirname):
shutil.rmtree(dirname)
dir=FakeTemporaryDirectory(dirname)
self.__logger.debug("Modo de prueba, temp=/tmp/ogtemp")
return dir
else:
dir = tempfile.TemporaryDirectory()
self.__logger.debug("Temp = %s", dir)
return dir
def _cleanup(self):
"""Limpia el directorio temporal"""
if self.temp_dir:
shutil.rmtree(self.temp_dir, ignore_errors=True)
def init_git_repo(self, reponame):
"""Inicializa un repositorio Git"""
# Creamos repositorio
ogdir_images = os.path.join(self.base_path, "images")
self.__logger.info("Creando repositorio de GIT %s", reponame)
os.makedirs(os.path.join(ogdir_images, self.git_basedir), exist_ok=True)
repo_path=os.path.join(ogdir_images, reponame)
shutil.rmtree(repo_path, ignore_errors=True)
# Marcar como directorio seguro
# Nota: no usar GitPython. Config global falla, aunque hay indicaciones de que
# git.Repo(path=None) es valido. Posiblemente bug de GitPython.
subprocess.run(["git", "config", "--global", "add" "safe.directory", repo_path])
self.__logger.debug("Inicializando repositorio: " + repo_path)
repo = git.Repo.init(repo_path, bare = True)
self.__logger.info("Configurando repositorio de GIT")
repo.config_writer().set_value("user", "name", "OpenGnsys").release()
repo.config_writer().set_value("user", "email", "OpenGnsys@opengnsys.com").release()
self._recursive_chown(repo_path, ouid=self.ssh_uid, ogid=self.ssh_gid)
def _add_line_to_file(self, filename, new_line):
"""Agrega una línea a un archivo"""
found = False
self.__logger.debug("Agregando linea: %s a %s", new_line, filename)
with open(filename, "a+", encoding="utf-8") as f:
f.seek(0)
for line in f:
if line.strip() == new_line.strip():
found = True
if not found:
self.__logger.debug("Agregando linea: %s", new_line)
f.write(new_line + "\n")
else:
self.__logger.debug("Linea ya presente")
def _recursive_chown(self, path, ouid, ogid):
"""Cambia el propietario y grupo de forma recursiva"""
for dirpath, _, filenames in os.walk(path):
os.chown(dirpath, uid=ouid, gid=ogid)
for filename in filenames:
os.chown(os.path.join(dirpath, filename), uid=ouid, gid=ogid)
def install(self):
"""Instalar
Ejecuta todo el proceso de instalación incluyendo:
* Dependencias
* Configuración de authorized_keys
* Configuración de ssh
* Creación de repositorio
Raises:
RequirementException: No ejecutado por usuario root
RequirementException: No ejecutado en Debian o Ubuntu
RequirementException: Falta clave pública
RequirementException: Python < 3.8
"""
self.__logger.info("install()")
ogdir_images = os.path.join(self.base_path, "images")
ENGINECFG = os.path.join(self.base_path, "client/etc/engine.cfg")
os.environ["PATH"] += os.pathsep + os.path.join(self.base_path, "bin")
tftp_dir = os.path.join(self.base_path, "tftpboot")
INITRD = "oginitrd.img"
self.temp_dir = self._get_tempdir()
SSHUSER = "opengnsys"
# Control básico de errores.
self.__logger.debug("Comprobando euid")
if os.geteuid() != 0:
raise RequirementException("Sólo ejecutable por root")
if not os.path.exists("/etc/debian_version"):
raise RequirementException("Instalación sólo soportada en Debian y Ubuntu")
MIN_PYTHON = (3, 8)
if sys.version_info < MIN_PYTHON:
raise RequirementException(f"Python %s.%s mínimo requerido.\n" % MIN_PYTHON)
self.__logger.debug("Instalando dependencias")
subprocess.run(["apt-get", "install", "-y", "git"], check=True)
# Autenticación del usuario opengnsys con clave pública desde los ogLive
# Requiere que todos los ogLive tengan la misma clave publica (utilizar setsslkey)
# Tomamos la clave publica del cliente por defecto
default_num = self.oglive.get_default()
default_client = self.oglive.get_clients()[default_num]
client_initrd_path = os.path.join(tftp_dir, default_client, INITRD)
self.__logger.debug("Ruta de initrd: %s", client_initrd_path)
# Si me salgo con error borro el directorio temporal
if not self.ignoresshkey:
public_key=""
if self.usesshkey:
with open(self.usesshkey, 'r') as f:
public_key = f.read().strip()
else:
if os.path.isfile(client_initrd_path):
#os.makedirs(temp_dir, exist_ok=True)
os.chdir(self.temp_dir.name)
self.__logger.debug("Descomprimiendo %s", client_initrd_path)
public_key = None
with libarchive.file_reader(client_initrd_path) as initrd:
for file in initrd:
self.__logger.debug("Archivo: %s", file)
if file.pathname in self.key_paths_dict:
data = bytearray()
for block in file.get_blocks():
data = data + block
public_key = data.decode('utf-8').strip()
break
else:
print(f"No se encuentra la imagen de initrd {client_initrd_path}")
exit(2)
# Si la clave publica no existe me salgo con error
if not public_key:
raise RequirementException(f"No se encuentra clave pública dentro del ogLive en {self.temp_dir}, imagen {client_initrd_path}. Rutas buscadas: {self.key_paths}\n" +
"Los oglive deben tener la misma clave pública (utilizar setsslkey)")
ssh_dir = os.path.join(self.ssh_homedir, ".ssh")
authorized_keys_file = os.path.join(ssh_dir, "authorized_keys")
self.__logger.debug("Configurando ssh: Agregando clave %s a %s", public_key, authorized_keys_file)
self.__logger.debug("Key: %s", public_key)
os.makedirs(ssh_dir, exist_ok=True)
self._add_line_to_file(authorized_keys_file, public_key)
os.chmod(authorized_keys_file, 0o600)
os.chown(ssh_dir, uid=self.ssh_uid, gid=self.ssh_gid)
os.chown(authorized_keys_file, uid=self.ssh_uid, gid=self.ssh_gid)
# Configuramos el servicio ssh para que permita el acceso con clave pública
self.__logger.info(" Configuramos el servicio ssh para que permita el acceso con clave pública.")
with open("/etc/ssh/sshd_config", "r") as f:
sshd_config = f.read()
sshd_config = sshd_config.replace("PubkeyAuthentication no", "PubkeyAuthentication yes")
with open("/etc/ssh/sshd_config", "w") as f:
f.write(sshd_config)
os.system("systemctl reload ssh")
# Instalamos git
os.system("apt install git")
# Para que el usuario sólo pueda usar git (no ssh)
SHELL = shutil.which("git-shell")
os.system(f"usermod -s {SHELL} opengnsys")
# Creamos repositorios
self.init_git_repo('windows.git')
self.init_git_repo('linux.git')
self.init_git_repo('mac.git')
# Damos permiso al usuario opengnsys
for DIR in ["base.git", "linux.git", "windows.git"]: #, "LinAcl", "WinAcl"]:
self._recursive_chown(os.path.join(ogdir_images, DIR), ouid=self.ssh_uid, ogid=self.ssh_gid)
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)20s - [%(levelname)5s] - %(message)s')
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logger.info("Inicio del programa")
parser = argparse.ArgumentParser(
prog="OpenGnsys Installer",
description="Script para la instalación del repositorio git",
)
parser.add_argument('--testmode', action='store_true', help="Modo de prueba")
parser.add_argument('--ignoresshkey', action='store_true', help="Ignorar clave de SSH")
parser.add_argument('--usesshkey', type=str, help="Usar clave SSH especificada")
args = parser.parse_args()
installer = OpengnsysGitInstaller()
installer.set_testmode(args.testmode)
installer.set_ignoresshkey(args.ignoresshkey)
installer.set_usesshkey(args.usesshkey)
logger.debug("Inicio de instalación")
try:
installer.install()
except RequirementException as req:
show_error(f"Requisito para la instalación no satisfecho: {req.message}")
exit(1)