oggit/installer/opengnsys_git_installer.py

686 lines
25 KiB
Python

#!/usr/bin/env python3
"""Script para la instalación del repositorio git"""
import os
import shutil
import argparse
import tempfile
import logging
import subprocess
import sys
import pwd
import grp
from termcolor import cprint
import git
import libarchive
import urllib.request
import pathlib
import socket
import time
import requests
#FORGEJO_VERSION="8.0.3"
FORGEJO_VERSION="9.0.0"
FORGEJO_URL=f"https://codeberg.org/forgejo/forgejo/releases/download/v{FORGEJO_VERSION}/forgejo-{FORGEJO_VERSION}-linux-amd64"
def show_error(*args):
"""
Imprime un mensaje de error
Args:
*args: Argumentos igual que a la función print
Returns:
None
"""
cprint(*args, "red", attrs = ["bold"], file=sys.stderr)
class RequirementException(Exception):
"""Excepción que indica que nos falta algún requisito
Attributes:
message (str): Mensaje de error mostrado al usuario
"""
def __init__(self, message):
"""Inicializar RequirementException.
Args:
message (str): Mensaje de error mostrado al usuario
"""
super().__init__(message)
self.message = message
class FakeTemporaryDirectory:
"""Imitación de TemporaryDirectory para depuración"""
def __init__(self, dirname):
self.name = dirname
os.makedirs(dirname, exist_ok=True)
def __str__(self):
return self.name
class Oglive:
"""Interfaz a utilidad oglivecli
Esto es probablemente temporal hasta que se haga una conversión de oglivecli
"""
def __init__(self):
self.__logger = logging.getLogger("Oglive")
self.binary = "/opt/opengnsys/bin/oglivecli"
self.__logger.debug("Inicializando")
def _cmd(self, args):
cmd = [self.binary] + args
self.__logger.debug("comando: %s", cmd)
proc = subprocess.run(cmd, shell=False, check=True, capture_output=True)
out_text = proc.stdout.decode('utf-8').strip()
self.__logger.debug("salida: %s", out_text)
return out_text
def get_default(self):
"""Devuelve el cliente por defecto"""
self.__logger.debug("get_default()")
return self._cmd(["get-default"])
def get_clients(self):
"""Devuelve la lista de clientes en un dict"""
self.__logger.debug("get_clients()")
lines = self._cmd(["list"]).splitlines()
clients = {}
for line in lines:
(number, name) = line.split()
clients[number] = name
self.__logger.debug("Clientes: %s", clients)
return clients
class OpengnsysGitInstaller:
"""Instalador de OpenGnsys"""
def __init__(self):
"""Inicializar clase"""
self.__logger = logging.getLogger("OpengnsysGitInstaller")
self.__logger.setLevel(logging.DEBUG)
self.__logger.debug("Inicializando")
self.testmode = False
self.base_path = "/opt/opengnsys"
self.git_basedir = "base.git"
self.email = "OpenGnsys@opengnsys.com"
self.forgejo_user = "oggit"
self.forgejo_password = "opengnsys"
self.forgejo_organization = "opengnsys"
self.forgejo_port = 3000
self.set_ssh_user_group("oggit", "oggit")
self.temp_dir = None
self.script_path = os.path.realpath(os.path.dirname(__file__))
# Possible names for SSH key
self.key_paths = ["scripts/ssl/id_rsa.pub", "scripts/ssl/id_ed25519.pub", "scripts/ssl/id_ecdsa.pub", "scripts/ssl/id_ed25519_sk.pub", "scripts/ssl/id_ecdsa_sk.pub"]
self.key_paths_dict = {}
for kp in self.key_paths:
self.key_paths_dict[kp] = 1
self.oglive = Oglive()
def set_testmode(self, value):
"""Establece el modo de prueba"""
self.testmode = value
def set_ignoresshkey(self, value):
"""Ignorar requisito de clave de ssh para el instalador"""
self.ignoresshkey = value
def set_usesshkey(self, value):
"""Usar clave de ssh especificada"""
self.usesshkey = value
def set_basepath(self, value):
"""Establece ruta base de OpenGnsys
Valor por defecto: /opt/opengnsys
"""
self.base_path = value
def _get_tempdir(self):
"""Obtiene el directorio temporal"""
if self.testmode:
dirname = "/tmp/ogtemp"
if os.path.exists(dirname):
shutil.rmtree(dirname)
dir=FakeTemporaryDirectory(dirname)
self.__logger.debug("Modo de prueba, temp=/tmp/ogtemp")
return dir
else:
dir = tempfile.TemporaryDirectory()
self.__logger.debug("Temp = %s", dir)
return dir
def _cleanup(self):
"""Limpia el directorio temporal"""
if self.temp_dir:
shutil.rmtree(self.temp_dir, ignore_errors=True)
def set_ssh_user_group(self, username, groupname):
self.ssh_group = groupname
self.ssh_user = username
try:
self.ssh_gid = grp.getgrnam(self.ssh_group).gr_gid
self.__logger.info("Group %s exists with gid %i", self.ssh_group, self.ssh_gid)
except KeyError:
self.__logger.info("Need to create group %s", self.ssh_group)
subprocess.run(["/usr/sbin/groupadd", "--system", self.ssh_group], check=True)
self.ssh_gid = grp.getgrnam(groupname).gr_gid
try:
self.ssh_uid = pwd.getpwnam(self.ssh_user).pw_uid
self.__logger.info("User %s exists with gid %i", self.ssh_user, self.ssh_uid)
except KeyError:
self.__logger.info("Need to create user %s", self.ssh_user)
subprocess.run(["/usr/sbin/useradd", "--gid", str(self.ssh_gid), "-m", "--system", self.ssh_user], check=True)
self.ssh_uid = pwd.getpwnam(username).pw_uid
self.ssh_homedir = pwd.getpwnam(username).pw_dir
def init_git_repo(self, reponame):
"""Inicializa un repositorio Git"""
# Creamos repositorio
ogdir_images = os.path.join(self.base_path, "images")
self.__logger.info("Creando repositorio de GIT %s", reponame)
os.makedirs(os.path.join(ogdir_images, self.git_basedir), exist_ok=True)
repo_path=os.path.join(ogdir_images, reponame)
shutil.rmtree(repo_path, ignore_errors=True)
# Marcar como directorio seguro
# Nota: no usar GitPython. Config global falla, aunque hay indicaciones de que
# git.Repo(path=None) es valido. Posiblemente bug de GitPython.
subprocess.run(["git", "config", "--global", "add" "safe.directory", repo_path])
self.__logger.debug("Inicializando repositorio: " + repo_path)
repo = git.Repo.init(repo_path, bare = True)
self.__logger.info("Configurando repositorio de GIT")
repo.config_writer().set_value("user", "name", "OpenGnsys").release()
repo.config_writer().set_value("user", "email", self.email).release()
self._recursive_chown(repo_path, ouid=self.ssh_uid, ogid=self.ssh_gid)
def _add_line_to_file(self, filename, new_line):
"""Agrega una línea a un archivo"""
found = False
self.__logger.debug("Agregando linea: %s a %s", new_line, filename)
with open(filename, "a+", encoding="utf-8") as f:
f.seek(0)
for line in f:
if line.strip() == new_line.strip():
found = True
if not found:
self.__logger.debug("Agregando linea: %s", new_line)
f.write(new_line + "\n")
else:
self.__logger.debug("Linea ya presente")
def _recursive_chown(self, path, ouid, ogid):
"""Cambia el propietario y grupo de forma recursiva"""
for dirpath, _, filenames in os.walk(path):
os.chown(dirpath, uid=ouid, gid=ogid)
for filename in filenames:
os.chown(os.path.join(dirpath, filename), uid=ouid, gid=ogid)
def _wait_for_port(self, host, port):
self.__logger.info("Waiting for %s:%i to be up", host, port)
timeout = 60
start_time = time.time()
ready = False
while not ready and (time.time() - start_time) < 60:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
s.connect((host, port))
ready = True
s.close()
except TimeoutError:
self.__logger.debug("Timed out, no connection yet.")
except OSError as oserr:
self.__logger.debug("%s, no connection yet. %.1f seconds left.", oserr.strerror, timeout - (time.time() - start_time))
time.sleep(0.1)
if ready:
self.__logger.info("Connection established.")
else:
self.__logger.error("Timed out waiting for connection!")
raise TimeoutError("Timed out waiting for connection!")
def _extract_ssh_key(self):
public_key=""
INITRD = "oginitrd.img"
tftp_dir = os.path.join(self.base_path, "tftpboot")
default_num = self.oglive.get_default()
default_client = self.oglive.get_clients()[default_num]
client_initrd_path = os.path.join(tftp_dir, default_client, INITRD)
#self.temp_dir = self._get_tempdir()
if self.usesshkey:
with open(self.usesshkey, 'r') as f:
public_key = f.read().strip()
else:
if os.path.isfile(client_initrd_path):
#os.makedirs(temp_dir, exist_ok=True)
#os.chdir(self.temp_dir.name)
self.__logger.debug("Descomprimiendo %s", client_initrd_path)
public_key = None
with libarchive.file_reader(client_initrd_path) as initrd:
for file in initrd:
#self.__logger.debug("Archivo: %s", file)
if file.pathname in self.key_paths_dict:
data = bytearray()
for block in file.get_blocks():
data = data + block
public_key = data.decode('utf-8').strip()
break
else:
print(f"No se encuentra la imagen de initrd {client_initrd_path}")
exit(2)
return public_key
def install(self):
"""Instalar
Ejecuta todo el proceso de instalación incluyendo:
* Dependencias
* Configuración de authorized_keys
* Configuración de ssh
* Creación de repositorio
Raises:
RequirementException: No ejecutado por usuario root
RequirementException: No ejecutado en Debian o Ubuntu
RequirementException: Falta clave pública
RequirementException: Python < 3.8
"""
self.__logger.info("install()")
ogdir_images = os.path.join(self.base_path, "images")
ENGINECFG = os.path.join(self.base_path, "client/etc/engine.cfg")
os.environ["PATH"] += os.pathsep + os.path.join(self.base_path, "bin")
tftp_dir = os.path.join(self.base_path, "tftpboot")
INITRD = "oginitrd.img"
self.temp_dir = self._get_tempdir()
SSHUSER = "opengnsys"
# Control básico de errores.
self.__logger.debug("Comprobando euid")
if os.geteuid() != 0:
raise RequirementException("Sólo ejecutable por root")
if not os.path.exists("/etc/debian_version"):
raise RequirementException("Instalación sólo soportada en Debian y Ubuntu")
MIN_PYTHON = (3, 8)
if sys.version_info < MIN_PYTHON:
raise RequirementException(f"Python %s.%s mínimo requerido.\n" % MIN_PYTHON)
self.__logger.debug("Instalando dependencias")
subprocess.run(["apt-get", "install", "-y", "git"], check=True)
# Autenticación del usuario opengnsys con clave pública desde los ogLive
# Requiere que todos los ogLive tengan la misma clave publica (utilizar setsslkey)
# Tomamos la clave publica del cliente por defecto
default_num = self.oglive.get_default()
default_client = self.oglive.get_clients()[default_num]
client_initrd_path = os.path.join(tftp_dir, default_client, INITRD)
self.__logger.debug("Ruta de initrd: %s", client_initrd_path)
# Si me salgo con error borro el directorio temporal
if not self.ignoresshkey:
public_key = self._extract_ssh_key()
# Si la clave publica no existe me salgo con error
if not public_key:
raise RequirementException(f"No se encuentra clave pública dentro del ogLive en {self.temp_dir}, imagen {client_initrd_path}. Rutas buscadas: {self.key_paths}\n" +
"Los oglive deben tener la misma clave pública (utilizar setsslkey)")
ssh_dir = os.path.join(self.ssh_homedir, ".ssh")
authorized_keys_file = os.path.join(ssh_dir, "authorized_keys")
self.__logger.debug("Configurando ssh: Agregando clave %s a %s", public_key, authorized_keys_file)
self.__logger.debug("Key: %s", public_key)
os.makedirs(ssh_dir, exist_ok=True)
self._add_line_to_file(authorized_keys_file, public_key)
os.chmod(authorized_keys_file, 0o600)
os.chown(ssh_dir, uid=self.ssh_uid, gid=self.ssh_gid)
os.chown(authorized_keys_file, uid=self.ssh_uid, gid=self.ssh_gid)
# Configuramos el servicio ssh para que permita el acceso con clave pública
self.__logger.info(" Configuramos el servicio ssh para que permita el acceso con clave pública.")
with open("/etc/ssh/sshd_config", "r") as f:
sshd_config = f.read()
sshd_config = sshd_config.replace("PubkeyAuthentication no", "PubkeyAuthentication yes")
with open("/etc/ssh/sshd_config", "w") as f:
f.write(sshd_config)
os.system("systemctl reload ssh")
# Instalamos git
os.system("apt install git")
# Para que el usuario sólo pueda usar git (no ssh)
SHELL = shutil.which("git-shell")
os.system(f"usermod -s {SHELL} opengnsys")
# Creamos repositorios
#self.init_git_repo('windows.git')
#self.init_git_repo('linux.git')
#self.init_git_repo('mac.git')
# Damos permiso al usuario opengnsys
#for DIR in ["base.git", "linux.git", "windows.git"]: #, "LinAcl", "WinAcl"]:
# self._recursive_chown(os.path.join(ogdir_images, DIR), ouid=self.ssh_uid, ogid=self.ssh_gid)
def _install_template(self, template, destination, keysvalues):
self.__logger.info("Writing template %s into %s", template, destination)
data = ""
with open(template, "r", encoding="utf-8") as template_file:
data = template_file.read()
for key in keysvalues.keys():
data = data.replace("{" + key + "}", keysvalues[key])
with open(destination, "w+", encoding="utf-8") as out_file:
out_file.write(data)
def _runcmd(self, cmd):
self.__logger.debug("Running: %s", cmd)
ret = subprocess.run(cmd, check=True,capture_output=True, encoding='utf-8')
return ret.stdout.strip()
def install_forgejo(self):
self.__logger.info("Installing Forgejo")
bin_path = os.path.join(self.base_path, "bin", "forgejo")
conf_dir_path = os.path.join(self.base_path, "etc", "forgejo")
lfs_dir_path = os.path.join(self.base_path, "images", "git-lfs")
git_dir_path = os.path.join(self.base_path, "images", "git")
forgejo_work_dir_path = os.path.join(self.base_path, "var", "lib", "forgejo/work")
forgejo_db_dir_path = os.path.join(self.base_path, "var", "lib", "forgejo/db")
forgejo_data_dir_path = os.path.join(self.base_path, "var", "lib", "forgejo/data")
forgejo_db_path = os.path.join(forgejo_db_dir_path, "forgejo.db")
forgejo_log_dir_path = os.path.join(self.base_path, "log", "forgejo")
conf_path = os.path.join(conf_dir_path, "app.ini")
self.__logger.debug("Stopping opengnsys-forgejo service")
subprocess.run(["systemctl", "stop", "opengnsys-forgejo"], check=False)
if not os.path.exists(bin_path):
self.__logger.debug("Downloading from %s into %s", FORGEJO_URL, bin_path)
urllib.request.urlretrieve(FORGEJO_URL, bin_path)
os.chmod(bin_path, 0o755)
if os.path.exists(forgejo_db_path):
self.__logger.debug("Removing old configuration")
os.unlink(forgejo_db_path)
else:
self.__logger.debug("Old configuration not present, ok.")
self.__logger.debug("Wiping old data")
for dir in [conf_dir_path, git_dir_path, lfs_dir_path, forgejo_work_dir_path, forgejo_data_dir_path, forgejo_db_dir_path]:
if os.path.exists(dir):
self.__logger.debug("Removing %s", dir)
shutil.rmtree(dir)
self.__logger.debug("Creating directories")
pathlib.Path(conf_dir_path).mkdir(parents=True, exist_ok=True)
pathlib.Path(git_dir_path).mkdir(parents=True, exist_ok=True)
pathlib.Path(lfs_dir_path).mkdir(parents=True, exist_ok=True)
pathlib.Path(forgejo_work_dir_path).mkdir(parents=True, exist_ok=True)
pathlib.Path(forgejo_data_dir_path).mkdir(parents=True, exist_ok=True)
pathlib.Path(forgejo_db_dir_path).mkdir(parents=True, exist_ok=True)
pathlib.Path(forgejo_log_dir_path).mkdir(parents=True, exist_ok=True)
os.chown(lfs_dir_path, self.ssh_uid, self.ssh_gid)
os.chown(git_dir_path, self.ssh_uid, self.ssh_gid)
os.chown(forgejo_data_dir_path, self.ssh_uid, self.ssh_gid)
os.chown(forgejo_work_dir_path, self.ssh_uid, self.ssh_gid)
os.chown(forgejo_db_dir_path, self.ssh_uid, self.ssh_gid)
os.chown(forgejo_log_dir_path, self.ssh_uid, self.ssh_gid)
data = {
"forgejo_user" : self.ssh_user,
"forgejo_group" : self.ssh_group,
"forgejo_port" : str(self.forgejo_port),
"forgejo_bin" : bin_path,
"forgejo_app_ini" : conf_path,
"forgejo_work_path" : forgejo_work_dir_path,
"forgejo_data_path" : forgejo_data_dir_path,
"forgejo_db_path" : forgejo_db_path,
"forgejo_repository_root" : git_dir_path,
"forgejo_lfs_path" : lfs_dir_path,
"forgejo_log_path" : forgejo_log_dir_path,
"forgejo_hostname" : self._runcmd("hostname"),
"forgejo_lfs_jwt_secret" : self._runcmd([bin_path,"generate", "secret", "LFS_JWT_SECRET"]),
"forgejo_jwt_secret" : self._runcmd([bin_path,"generate", "secret", "JWT_SECRET"]),
"forgejo_internal_token" : self._runcmd([bin_path,"generate", "secret", "INTERNAL_TOKEN"]),
"forgejo_secret_key" : self._runcmd([bin_path,"generate", "secret", "SECRET_KEY"])
}
self._install_template(os.path.join(self.script_path, "forgejo-app.ini"), conf_path, data)
self._install_template(os.path.join(self.script_path, "forgejo.service"), "/etc/systemd/system/opengnsys-forgejo.service", data)
self.__logger.debug("Reloading systemd and starting service")
subprocess.run(["systemctl", "daemon-reload"], check=True)
subprocess.run(["systemctl", "enable", "opengnsys-forgejo"], check=True)
subprocess.run(["systemctl", "restart", "opengnsys-forgejo"], check=True)
self.__logger.info("Waiting for forgejo to start")
self._wait_for_port("localhost", self.forgejo_port)
self.__logger.info("Configuring forgejo")
def run_forge_cmd(args):
cmd = [bin_path, "--config", conf_path] + args
self.__logger.debug("Running command: %s", cmd)
ret = subprocess.run(cmd, check=False, capture_output=True, encoding='utf-8', user=self.ssh_user)
if ret.returncode == 0:
return ret.stdout.strip()
else:
self.__logger.error("Failed to run command: %s, return code %i", cmd, ret.returncode)
self.__logger.error("stdout: %s", ret.stdout)
self.__logger.error("stderr: %s", ret.stderr)
raise RuntimeError("Failed to run necessary command")
run_forge_cmd(["admin", "doctor", "check"])
run_forge_cmd(["admin", "user", "create", "--username", self.forgejo_user, "--password", self.forgejo_password, "--email", self.email])
token = run_forge_cmd(["admin", "user", "generate-access-token", "--username", self.forgejo_user, "-t", "gitapi", "--scopes", "all", "--raw"])
with open(os.path.join(self.base_path, "etc", "ogGitApiToken.cfg"), "w+", encoding='utf-8') as token_file:
token_file.write(token)
ssh_key = self._extract_ssh_key()
self.add_forgejo_sshkey(ssh_key, "Default key")
def add_forgejo_repo(self, repository_name, description = ""):
token = ""
with open(os.path.join(self.base_path, "etc", "ogGitApiToken.cfg"), "r", encoding='utf-8') as token_file:
token = token_file.read().strip()
self.__logger.info("Adding repository %s for Forgejo", repository_name)
r = requests.post(
f"http://localhost:{self.forgejo_port}/api/v1/user/repos",
json={
"auto_init" : False,
"default_branch" : "main",
"description" : description,
"name" : repository_name,
"private" : False
}, headers={
'Authorization' : f"token {token}"
},
timeout = 60
)
self.__logger.info("Request status was %i", r.status_code)
def add_forgejo_sshkey(self, pubkey, description = ""):
token = ""
with open(os.path.join(self.base_path, "etc", "ogGitApiToken.cfg"), "r", encoding='utf-8') as token_file:
token = token_file.read().strip()
self.__logger.info("Adding SSH key to Forgejo: %s", pubkey)
r = requests.post(
f"http://localhost:{self.forgejo_port}/api/v1/user/keys",
json={
"key" : pubkey,
"read_only" : False,
"title" : description
}, headers={
'Authorization' : f"token {token}"
},
timeout = 60
)
self.__logger.info("Request status was %i", r.status_code)
def add_forgejo_organization(self, pubkey, description = ""):
token = ""
with open(os.path.join(self.base_path, "etc", "ogGitApiToken.cfg"), "r", encoding='utf-8') as token_file:
token = token_file.read().strip()
self.__logger.info("Adding SSH key to Forgejo: %s", pubkey)
r = requests.post(
f"http://localhost:{self.forgejo_port}/api/v1/user/keys",
json={
"key" : pubkey,
"read_only" : False,
"title" : description
}, headers={
'Authorization' : f"token {token}"
},
timeout = 60
)
self.__logger.info("Request status was %i", r.status_code)
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)20s - [%(levelname)5s] - %(message)s')
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logger.info("Inicio del programa")
parser = argparse.ArgumentParser(
prog="OpenGnsys Installer",
description="Script para la instalación del repositorio git",
)
parser.add_argument('--forgejo-only', action='store_true', help="Solo instalar forgejo")
parser.add_argument('--forgejo-addrepos', action='store_true', help="Solo agregar repositorios forgejo")
parser.add_argument('--testmode', action='store_true', help="Modo de prueba")
parser.add_argument('--ignoresshkey', action='store_true', help="Ignorar clave de SSH")
parser.add_argument('--usesshkey', type=str, help="Usar clave SSH especificada")
parser.add_argument('--test-createuser', action='store_true')
args = parser.parse_args()
installer = OpengnsysGitInstaller()
installer.set_testmode(args.testmode)
installer.set_ignoresshkey(args.ignoresshkey)
installer.set_usesshkey(args.usesshkey)
logger.debug("Inicio de instalación")
try:
if args.forgejo_only:
installer.install_forgejo()
elif args.forgejo_addrepos:
installer.add_forgejo_repo("linux")
elif args.test_createuser:
installer.set_ssh_user_group("oggit2", "oggit2")
else:
installer.install()
installer.install_forgejo()
installer.add_forgejo_repo("windows", "Windows")
installer.add_forgejo_repo("linux", "Linux")
installer.add_forgejo_repo("mac", "Mac")
except RequirementException as req:
show_error(f"Requisito para la instalación no satisfecho: {req.message}")
exit(1)