oggit/installer/opengnsys_git_installer.py

831 lines
31 KiB
Python

#!/usr/bin/env python3
"""Script para la instalación del repositorio git"""
import os
import sys
sys.path.insert(0, "/opt/opengnsys/python3/dist-packages")
import shutil
import argparse
import tempfile
import logging
import subprocess
import sys
import pwd
import grp
from termcolor import cprint
import git
import libarchive
from libarchive.extract import *
#from libarchive.entry import FileType
import urllib.request
import pathlib
import socket
import time
import requests
import tempfile
import hashlib
#FORGEJO_VERSION="8.0.3"
FORGEJO_VERSION="9.0.0"
FORGEJO_URL=f"https://codeberg.org/forgejo/forgejo/releases/download/v{FORGEJO_VERSION}/forgejo-{FORGEJO_VERSION}-linux-amd64"
def show_error(*args):
"""
Imprime un mensaje de error
Args:
*args: Argumentos igual que a la función print
Returns:
None
"""
cprint(*args, "red", attrs = ["bold"], file=sys.stderr)
class RequirementException(Exception):
"""Excepción que indica que nos falta algún requisito
Attributes:
message (str): Mensaje de error mostrado al usuario
"""
def __init__(self, message):
"""Inicializar RequirementException.
Args:
message (str): Mensaje de error mostrado al usuario
"""
super().__init__(message)
self.message = message
class FakeTemporaryDirectory:
"""Imitación de TemporaryDirectory para depuración"""
def __init__(self, dirname):
self.name = dirname
os.makedirs(dirname, exist_ok=True)
def __str__(self):
return self.name
class Oglive:
"""Interfaz a utilidad oglivecli
Esto es probablemente temporal hasta que se haga una conversión de oglivecli
"""
def __init__(self):
self.__logger = logging.getLogger("Oglive")
self.binary = "/opt/opengnsys/bin/oglivecli"
self.__logger.debug("Inicializando")
def _cmd(self, args):
cmd = [self.binary] + args
self.__logger.debug("comando: %s", cmd)
proc = subprocess.run(cmd, shell=False, check=True, capture_output=True)
out_text = proc.stdout.decode('utf-8').strip()
self.__logger.debug("salida: %s", out_text)
return out_text
def get_default(self):
"""Devuelve el cliente por defecto"""
self.__logger.debug("get_default()")
return self._cmd(["get-default"])
def get_clients(self):
"""Devuelve la lista de clientes en un dict"""
self.__logger.debug("get_clients()")
lines = self._cmd(["list"]).splitlines()
clients = {}
for line in lines:
(number, name) = line.split()
clients[number] = name
self.__logger.debug("Clientes: %s", clients)
return clients
class OpengnsysGitInstaller:
"""Instalador de OpenGnsys"""
def __init__(self):
"""Inicializar clase"""
self.__logger = logging.getLogger("OpengnsysGitInstaller")
self.__logger.setLevel(logging.DEBUG)
self.__logger.debug("Inicializando")
self.testmode = False
self.base_path = "/opt/opengnsys"
self.git_basedir = "base.git"
self.email = "OpenGnsys@opengnsys.com"
self.forgejo_user = "oggit"
self.forgejo_password = "opengnsys"
self.forgejo_organization = "opengnsys"
self.forgejo_port = 3000
self.set_ssh_user_group("oggit", "oggit")
self.temp_dir = None
self.script_path = os.path.realpath(os.path.dirname(__file__))
# Possible names for SSH public keys
self.ssh_key_users = ["root", "opengnsys"]
self.key_names = ["id_rsa.pub", "id_ed25519.pub", "id_ecdsa.pub", "id_ed25519_sk.pub", "id_ecdsa_sk.pub"]
# Possible names for SSH key in oglive
self.key_paths = ["scripts/ssl/id_rsa.pub", "scripts/ssl/id_ed25519.pub", "scripts/ssl/id_ecdsa.pub", "scripts/ssl/id_ed25519_sk.pub", "scripts/ssl/id_ecdsa_sk.pub"]
self.key_paths_dict = {}
for kp in self.key_paths:
self.key_paths_dict[kp] = 1
self.oglive = Oglive()
def set_testmode(self, value):
"""Establece el modo de prueba"""
self.testmode = value
def set_ignoresshkey(self, value):
"""Ignorar requisito de clave de ssh para el instalador"""
self.ignoresshkey = value
def set_usesshkey(self, value):
"""Usar clave de ssh especificada"""
self.usesshkey = value
def set_basepath(self, value):
"""Establece ruta base de OpenGnsys
Valor por defecto: /opt/opengnsys
"""
self.base_path = value
def _get_tempdir(self):
"""Obtiene el directorio temporal"""
if self.testmode:
dirname = "/tmp/ogtemp"
if os.path.exists(dirname):
shutil.rmtree(dirname)
dir=FakeTemporaryDirectory(dirname)
self.__logger.debug("Modo de prueba, temp=/tmp/ogtemp")
return dir
else:
dir = tempfile.TemporaryDirectory()
self.__logger.debug("Temp = %s", dir)
return dir
def _cleanup(self):
"""Limpia el directorio temporal"""
if self.temp_dir:
shutil.rmtree(self.temp_dir, ignore_errors=True)
def set_ssh_user_group(self, username, groupname):
self.ssh_group = groupname
self.ssh_user = username
try:
self.ssh_gid = grp.getgrnam(self.ssh_group).gr_gid
self.__logger.info("Group %s exists with gid %i", self.ssh_group, self.ssh_gid)
except KeyError:
self.__logger.info("Need to create group %s", self.ssh_group)
subprocess.run(["/usr/sbin/groupadd", "--system", self.ssh_group], check=True)
self.ssh_gid = grp.getgrnam(groupname).gr_gid
try:
self.ssh_uid = pwd.getpwnam(self.ssh_user).pw_uid
self.__logger.info("User %s exists with gid %i", self.ssh_user, self.ssh_uid)
except KeyError:
self.__logger.info("Need to create user %s", self.ssh_user)
subprocess.run(["/usr/sbin/useradd", "--gid", str(self.ssh_gid), "-m", "--system", self.ssh_user], check=True)
self.ssh_uid = pwd.getpwnam(username).pw_uid
self.ssh_homedir = pwd.getpwnam(username).pw_dir
def init_git_repo(self, reponame):
"""Inicializa un repositorio Git"""
# Creamos repositorio
ogdir_images = os.path.join(self.base_path, "images")
self.__logger.info("Creando repositorio de GIT %s", reponame)
os.makedirs(os.path.join(ogdir_images, self.git_basedir), exist_ok=True)
repo_path=os.path.join(ogdir_images, reponame)
shutil.rmtree(repo_path, ignore_errors=True)
# Marcar como directorio seguro
# Nota: no usar GitPython. Config global falla, aunque hay indicaciones de que
# git.Repo(path=None) es valido. Posiblemente bug de GitPython.
subprocess.run(["git", "config", "--global", "add" "safe.directory", repo_path])
self.__logger.debug("Inicializando repositorio: " + repo_path)
repo = git.Repo.init(repo_path, bare = True)
self.__logger.info("Configurando repositorio de GIT")
repo.config_writer().set_value("user", "name", "OpenGnsys").release()
repo.config_writer().set_value("user", "email", self.email).release()
self._recursive_chown(repo_path, ouid=self.ssh_uid, ogid=self.ssh_gid)
def _add_line_to_file(self, filename, new_line):
"""Agrega una línea a un archivo"""
found = False
self.__logger.debug("Agregando linea: %s a %s", new_line, filename)
with open(filename, "a+", encoding="utf-8") as f:
f.seek(0)
for line in f:
if line.strip() == new_line.strip():
found = True
if not found:
self.__logger.debug("Agregando linea: %s", new_line)
f.write(new_line + "\n")
else:
self.__logger.debug("Linea ya presente")
def _recursive_chown(self, path, ouid, ogid):
"""Cambia el propietario y grupo de forma recursiva"""
for dirpath, _, filenames in os.walk(path):
os.chown(dirpath, uid=ouid, gid=ogid)
for filename in filenames:
os.chown(os.path.join(dirpath, filename), uid=ouid, gid=ogid)
def _wait_for_port(self, host, port):
self.__logger.info("Waiting for %s:%i to be up", host, port)
timeout = 60
start_time = time.time()
ready = False
while not ready and (time.time() - start_time) < 60:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
s.connect((host, port))
ready = True
s.close()
except TimeoutError:
self.__logger.debug("Timed out, no connection yet.")
except OSError as oserr:
self.__logger.debug("%s, no connection yet. %.1f seconds left.", oserr.strerror, timeout - (time.time() - start_time))
time.sleep(0.1)
if ready:
self.__logger.info("Connection established.")
else:
self.__logger.error("Timed out waiting for connection!")
raise TimeoutError("Timed out waiting for connection!")
def _extract_ssh_key(self):
public_key=""
INITRD = "oginitrd.img"
tftp_dir = os.path.join(self.base_path, "tftpboot")
default_num = self.oglive.get_default()
default_client = self.oglive.get_clients()[default_num]
client_initrd_path = os.path.join(tftp_dir, default_client, INITRD)
#self.temp_dir = self._get_tempdir()
if self.usesshkey:
with open(self.usesshkey, 'r') as f:
public_key = f.read().strip()
else:
if os.path.isfile(client_initrd_path):
#os.makedirs(temp_dir, exist_ok=True)
#os.chdir(self.temp_dir.name)
self.__logger.debug("Descomprimiendo %s", client_initrd_path)
public_key = None
with libarchive.file_reader(client_initrd_path) as initrd:
for file in initrd:
self.__logger.debug("Archivo: %s", file)
pathname = file.pathname;
if pathname.startswith("./"):
pathname = pathname[2:]
if pathname in self.key_paths_dict:
data = bytearray()
for block in file.get_blocks():
data = data + block
public_key = data.decode('utf-8').strip()
break
else:
print(f"No se encuentra la imagen de initrd {client_initrd_path}")
exit(2)
return public_key
def set_ssh_key(self, client_num = None):
INITRD = "oginitrd.img"
tftp_dir = os.path.join(self.base_path, "tftpboot")
if client_num is None:
self.__logger.info("Will modify default client")
client_num = self.oglive.get_default()
ogclient = self.oglive.get_clients()[client_num]
client_initrd_path = os.path.join(tftp_dir, ogclient, INITRD)
client_initrd_path_new = client_initrd_path + ".new"
self.__logger.info("initrd path is %s", client_initrd_path)
temp_dir = tempfile.TemporaryDirectory()
#temp_dir_path = temp_dir.name()
temp_dir_path = "/tmp/extracted"
if os.path.exists(temp_dir_path):
shutil.rmtree(temp_dir_path)
pathlib.Path(temp_dir_path).mkdir(parents=True, exist_ok = True)
self.__logger.info("Uncompressing initrd %s into %s", client_initrd_path, temp_dir_path)
os.chdir(temp_dir_path)
libarchive.extract_file(client_initrd_path, flags = EXTRACT_UNLINK | EXTRACT_OWNER | EXTRACT_PERM | EXTRACT_FFLAGS | EXTRACT_TIME)
ssh_key_dir = os.path.join(temp_dir_path, "scripts", "ssl")
client_key_path = os.path.join(ssh_key_dir, "id_ed25519")
authorized_keys_path = os.path.join(ssh_key_dir, "authorized_keys")
pathlib.Path(ssh_key_dir).mkdir(parents=True, exist_ok=True)
if os.path.exists(client_key_path):
self.__logger.debug("Creating SSH key not necessary, it already is in the initrd")
else:
self.__logger.debug("Writing new SSH key into %s", client_key_path)
subprocess.run(["/usr/bin/ssh-keygen", "-t", "ed25519", "-N", "", "-f", client_key_path], check=True)
public_keys = ""
for username in self.ssh_key_users:
self.__logger.debug("Looking for keys in user %s", username)
homedir = pwd.getpwnam(username).pw_dir
for key in self.key_names:
key_path = os.path.join(homedir, ".ssh", key)
self.__logger.debug("Checking if we have %s...", key_path)
if os.path.exists(key_path):
with open(key_path, "r", encoding='utf-8') as public_key_file:
self.__logger.info("Adding %s to authorized_keys", key_path)
public_key = public_key_file.read()
public_keys = public_keys + public_key + "\n"
self.__logger.debug("Writing %s", authorized_keys_path)
with open(authorized_keys_path, "w", encoding='utf-8') as auth_keys:
auth_keys.write(public_keys)
# hardlinks in the source package are not correctly packaged back as hardlinks.
# Taking the easy option of turning them into symlinks for now.
file_hashes = {}
with libarchive.file_writer(client_initrd_path_new, "cpio_newc", "zstd") as writer:
file_list = []
for root, subdirs, files in os.walk(temp_dir_path):
proot = pathlib.PurePosixPath(root)
relpath = proot.relative_to(temp_dir_path)
for file in files:
full_path = os.path.join(relpath, file)
#self.__logger.debug("%s", full_path)
digest = None
stat_data = os.stat(full_path)
with open(full_path, "rb") as in_file:
digest = hashlib.file_digest(in_file, "sha256").hexdigest()
if stat_data.st_size > 0 and not os.path.islink(full_path):
if digest in file_hashes:
target_path = pathlib.Path(file_hashes[digest])
link_path = target_path.relative_to(relpath, walk_up=True)
self.__logger.debug("%s was a duplicate of %s, linking to %s", full_path, file_hashes[digest], link_path)
os.unlink(full_path)
#os.link(file_hashes[digest], full_path)
os.symlink(link_path, full_path)
else:
file_hashes[digest] = full_path
writer.add_files(".", recursive=True )
os.rename(client_initrd_path, client_initrd_path + ".old")
if os.path.exists(client_initrd_path + ".sum"):
os.rename(client_initrd_path + ".sum", client_initrd_path + ".sum.old")
os.rename(client_initrd_path_new, client_initrd_path)
with open(client_initrd_path, "rb") as initrd_file:
hexdigest = hashlib.file_digest(initrd_file, "sha256").hexdigest()
with open(client_initrd_path + ".sum", "w", encoding="utf-8") as digest_file:
digest_file.write(hexdigest)
self.__logger.info("Updated initrd %s", client_initrd_path)
def install(self):
"""Instalar
Ejecuta todo el proceso de instalación incluyendo:
* Dependencias
* Configuración de authorized_keys
* Configuración de ssh
* Creación de repositorio
Raises:
RequirementException: No ejecutado por usuario root
RequirementException: No ejecutado en Debian o Ubuntu
RequirementException: Falta clave pública
RequirementException: Python < 3.8
"""
self.__logger.info("install()")
ogdir_images = os.path.join(self.base_path, "images")
ENGINECFG = os.path.join(self.base_path, "client/etc/engine.cfg")
os.environ["PATH"] += os.pathsep + os.path.join(self.base_path, "bin")
tftp_dir = os.path.join(self.base_path, "tftpboot")
INITRD = "oginitrd.img"
self.temp_dir = self._get_tempdir()
SSHUSER = "opengnsys"
# Control básico de errores.
self.__logger.debug("Comprobando euid")
if os.geteuid() != 0:
raise RequirementException("Sólo ejecutable por root")
if not os.path.exists("/etc/debian_version"):
raise RequirementException("Instalación sólo soportada en Debian y Ubuntu")
MIN_PYTHON = (3, 8)
if sys.version_info < MIN_PYTHON:
raise RequirementException(f"Python %s.%s mínimo requerido.\n" % MIN_PYTHON)
self.__logger.debug("Instalando dependencias")
subprocess.run(["apt-get", "install", "-y", "git"], check=True)
# Autenticación del usuario opengnsys con clave pública desde los ogLive
# Requiere que todos los ogLive tengan la misma clave publica (utilizar setsslkey)
# Tomamos la clave publica del cliente por defecto
default_num = self.oglive.get_default()
default_client = self.oglive.get_clients()[default_num]
client_initrd_path = os.path.join(tftp_dir, default_client, INITRD)
self.__logger.debug("Ruta de initrd: %s", client_initrd_path)
# Si me salgo con error borro el directorio temporal
if not self.ignoresshkey:
public_key = self._extract_ssh_key()
# Si la clave publica no existe me salgo con error
if not public_key:
raise RequirementException(f"No se encuentra clave pública dentro del ogLive en {self.temp_dir}, imagen {client_initrd_path}. Rutas buscadas: {self.key_paths}\n" +
"Los oglive deben tener la misma clave pública (utilizar setsslkey)")
ssh_dir = os.path.join(self.ssh_homedir, ".ssh")
authorized_keys_file = os.path.join(ssh_dir, "authorized_keys")
self.__logger.debug("Configurando ssh: Agregando clave %s a %s", public_key, authorized_keys_file)
self.__logger.debug("Key: %s", public_key)
os.makedirs(ssh_dir, exist_ok=True)
self._add_line_to_file(authorized_keys_file, public_key)
os.chmod(authorized_keys_file, 0o600)
os.chown(ssh_dir, uid=self.ssh_uid, gid=self.ssh_gid)
os.chown(authorized_keys_file, uid=self.ssh_uid, gid=self.ssh_gid)
# Configuramos el servicio ssh para que permita el acceso con clave pública
self.__logger.info(" Configuramos el servicio ssh para que permita el acceso con clave pública.")
with open("/etc/ssh/sshd_config", "r") as f:
sshd_config = f.read()
sshd_config = sshd_config.replace("PubkeyAuthentication no", "PubkeyAuthentication yes")
with open("/etc/ssh/sshd_config", "w") as f:
f.write(sshd_config)
os.system("systemctl reload ssh")
# Instalamos git
os.system("apt install git")
# Para que el usuario sólo pueda usar git (no ssh)
SHELL = shutil.which("git-shell")
os.system(f"usermod -s {SHELL} opengnsys")
# Creamos repositorios
#self.init_git_repo('windows.git')
#self.init_git_repo('linux.git')
#self.init_git_repo('mac.git')
# Damos permiso al usuario opengnsys
#for DIR in ["base.git", "linux.git", "windows.git"]: #, "LinAcl", "WinAcl"]:
# self._recursive_chown(os.path.join(ogdir_images, DIR), ouid=self.ssh_uid, ogid=self.ssh_gid)
def _install_template(self, template, destination, keysvalues):
self.__logger.info("Writing template %s into %s", template, destination)
data = ""
with open(template, "r", encoding="utf-8") as template_file:
data = template_file.read()
for key in keysvalues.keys():
data = data.replace("{" + key + "}", keysvalues[key])
with open(destination, "w+", encoding="utf-8") as out_file:
out_file.write(data)
def _runcmd(self, cmd):
self.__logger.debug("Running: %s", cmd)
ret = subprocess.run(cmd, check=True,capture_output=True, encoding='utf-8')
return ret.stdout.strip()
def install_forgejo(self):
self.__logger.info("Installing Forgejo")
bin_path = os.path.join(self.base_path, "bin", "forgejo")
conf_dir_path = os.path.join(self.base_path, "etc", "forgejo")
lfs_dir_path = os.path.join(self.base_path, "images", "git-lfs")
git_dir_path = os.path.join(self.base_path, "images", "git")
forgejo_work_dir_path = os.path.join(self.base_path, "var", "lib", "forgejo/work")
forgejo_db_dir_path = os.path.join(self.base_path, "var", "lib", "forgejo/db")
forgejo_data_dir_path = os.path.join(self.base_path, "var", "lib", "forgejo/data")
forgejo_db_path = os.path.join(forgejo_db_dir_path, "forgejo.db")
forgejo_log_dir_path = os.path.join(self.base_path, "log", "forgejo")
conf_path = os.path.join(conf_dir_path, "app.ini")
self.__logger.debug("Stopping opengnsys-forgejo service")
subprocess.run(["systemctl", "stop", "opengnsys-forgejo"], check=False)
if not os.path.exists(bin_path):
self.__logger.debug("Downloading from %s into %s", FORGEJO_URL, bin_path)
urllib.request.urlretrieve(FORGEJO_URL, bin_path)
os.chmod(bin_path, 0o755)
if os.path.exists(forgejo_db_path):
self.__logger.debug("Removing old configuration")
os.unlink(forgejo_db_path)
else:
self.__logger.debug("Old configuration not present, ok.")
self.__logger.debug("Wiping old data")
for dir in [conf_dir_path, git_dir_path, lfs_dir_path, forgejo_work_dir_path, forgejo_data_dir_path, forgejo_db_dir_path]:
if os.path.exists(dir):
self.__logger.debug("Removing %s", dir)
shutil.rmtree(dir)
self.__logger.debug("Creating directories")
pathlib.Path(conf_dir_path).mkdir(parents=True, exist_ok=True)
pathlib.Path(git_dir_path).mkdir(parents=True, exist_ok=True)
pathlib.Path(lfs_dir_path).mkdir(parents=True, exist_ok=True)
pathlib.Path(forgejo_work_dir_path).mkdir(parents=True, exist_ok=True)
pathlib.Path(forgejo_data_dir_path).mkdir(parents=True, exist_ok=True)
pathlib.Path(forgejo_db_dir_path).mkdir(parents=True, exist_ok=True)
pathlib.Path(forgejo_log_dir_path).mkdir(parents=True, exist_ok=True)
os.chown(lfs_dir_path, self.ssh_uid, self.ssh_gid)
os.chown(git_dir_path, self.ssh_uid, self.ssh_gid)
os.chown(forgejo_data_dir_path, self.ssh_uid, self.ssh_gid)
os.chown(forgejo_work_dir_path, self.ssh_uid, self.ssh_gid)
os.chown(forgejo_db_dir_path, self.ssh_uid, self.ssh_gid)
os.chown(forgejo_log_dir_path, self.ssh_uid, self.ssh_gid)
data = {
"forgejo_user" : self.ssh_user,
"forgejo_group" : self.ssh_group,
"forgejo_port" : str(self.forgejo_port),
"forgejo_bin" : bin_path,
"forgejo_app_ini" : conf_path,
"forgejo_work_path" : forgejo_work_dir_path,
"forgejo_data_path" : forgejo_data_dir_path,
"forgejo_db_path" : forgejo_db_path,
"forgejo_repository_root" : git_dir_path,
"forgejo_lfs_path" : lfs_dir_path,
"forgejo_log_path" : forgejo_log_dir_path,
"forgejo_hostname" : self._runcmd("hostname"),
"forgejo_lfs_jwt_secret" : self._runcmd([bin_path,"generate", "secret", "LFS_JWT_SECRET"]),
"forgejo_jwt_secret" : self._runcmd([bin_path,"generate", "secret", "JWT_SECRET"]),
"forgejo_internal_token" : self._runcmd([bin_path,"generate", "secret", "INTERNAL_TOKEN"]),
"forgejo_secret_key" : self._runcmd([bin_path,"generate", "secret", "SECRET_KEY"])
}
self._install_template(os.path.join(self.script_path, "forgejo-app.ini"), conf_path, data)
self._install_template(os.path.join(self.script_path, "forgejo.service"), "/etc/systemd/system/opengnsys-forgejo.service", data)
self.__logger.debug("Reloading systemd and starting service")
subprocess.run(["systemctl", "daemon-reload"], check=True)
subprocess.run(["systemctl", "enable", "opengnsys-forgejo"], check=True)
subprocess.run(["systemctl", "restart", "opengnsys-forgejo"], check=True)
self.__logger.info("Waiting for forgejo to start")
self._wait_for_port("localhost", self.forgejo_port)
self.__logger.info("Configuring forgejo")
def run_forge_cmd(args):
cmd = [bin_path, "--config", conf_path] + args
self.__logger.debug("Running command: %s", cmd)
ret = subprocess.run(cmd, check=False, capture_output=True, encoding='utf-8', user=self.ssh_user)
if ret.returncode == 0:
return ret.stdout.strip()
else:
self.__logger.error("Failed to run command: %s, return code %i", cmd, ret.returncode)
self.__logger.error("stdout: %s", ret.stdout)
self.__logger.error("stderr: %s", ret.stderr)
raise RuntimeError("Failed to run necessary command")
run_forge_cmd(["admin", "doctor", "check"])
run_forge_cmd(["admin", "user", "create", "--username", self.forgejo_user, "--password", self.forgejo_password, "--email", self.email])
token = run_forge_cmd(["admin", "user", "generate-access-token", "--username", self.forgejo_user, "-t", "gitapi", "--scopes", "all", "--raw"])
with open(os.path.join(self.base_path, "etc", "ogGitApiToken.cfg"), "w+", encoding='utf-8') as token_file:
token_file.write(token)
ssh_key = self._extract_ssh_key()
self.add_forgejo_sshkey(ssh_key, "Default key")
def add_forgejo_repo(self, repository_name, description = ""):
token = ""
with open(os.path.join(self.base_path, "etc", "ogGitApiToken.cfg"), "r", encoding='utf-8') as token_file:
token = token_file.read().strip()
self.__logger.info("Adding repository %s for Forgejo", repository_name)
r = requests.post(
f"http://localhost:{self.forgejo_port}/api/v1/user/repos",
json={
"auto_init" : False,
"default_branch" : "main",
"description" : description,
"name" : repository_name,
"private" : False
}, headers={
'Authorization' : f"token {token}"
},
timeout = 60
)
self.__logger.info("Request status was %i, content %s", r.status_code, r.content)
def add_forgejo_sshkey(self, pubkey, description = ""):
token = ""
with open(os.path.join(self.base_path, "etc", "ogGitApiToken.cfg"), "r", encoding='utf-8') as token_file:
token = token_file.read().strip()
self.__logger.info("Adding SSH key to Forgejo: %s", pubkey)
r = requests.post(
f"http://localhost:{self.forgejo_port}/api/v1/user/keys",
json={
"key" : pubkey,
"read_only" : False,
"title" : description
}, headers={
'Authorization' : f"token {token}"
},
timeout = 60
)
self.__logger.info("Request status was %i, content %s", r.status_code, r.content)
def add_forgejo_organization(self, pubkey, description = ""):
token = ""
with open(os.path.join(self.base_path, "etc", "ogGitApiToken.cfg"), "r", encoding='utf-8') as token_file:
token = token_file.read().strip()
self.__logger.info("Adding SSH key to Forgejo: %s", pubkey)
r = requests.post(
f"http://localhost:{self.forgejo_port}/api/v1/user/keys",
json={
"key" : pubkey,
"read_only" : False,
"title" : description
}, headers={
'Authorization' : f"token {token}"
},
timeout = 60
)
self.__logger.info("Request status was %i, content %s", r.status_code, r.content)
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)20s - [%(levelname)5s] - %(message)s')
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logger.info("Inicio del programa")
parser = argparse.ArgumentParser(
prog="OpenGnsys Installer",
description="Script para la instalación del repositorio git",
)
parser.add_argument('--forgejo-only', action='store_true', help="Solo instalar forgejo")
parser.add_argument('--forgejo-addrepos', action='store_true', help="Solo agregar repositorios forgejo")
parser.add_argument('--testmode', action='store_true', help="Modo de prueba")
parser.add_argument('--ignoresshkey', action='store_true', help="Ignorar clave de SSH")
parser.add_argument('--usesshkey', type=str, help="Usar clave SSH especificada")
parser.add_argument('--test-createuser', action='store_true')
parser.add_argument('--extract-ssh-key', action='store_true', help="Extract SSH key from oglive")
parser.add_argument('--set-ssh-key', action='store_true', help="Configure SSH key in oglive")
parser.add_argument('--oglive', type=int, metavar='NUM', help = "Do SSH key manipulation on this oglive")
args = parser.parse_args()
installer = OpengnsysGitInstaller()
installer.set_testmode(args.testmode)
installer.set_ignoresshkey(args.ignoresshkey)
installer.set_usesshkey(args.usesshkey)
logger.debug("Inicio de instalación")
try:
if args.forgejo_only:
installer.install_forgejo()
elif args.forgejo_addrepos:
installer.add_forgejo_repo("linux")
elif args.test_createuser:
installer.set_ssh_user_group("oggit2", "oggit2")
elif args.extract_ssh_key:
key = installer._extract_ssh_key()
print(f"Key: {key}")
elif args.set_ssh_key:
installer.set_ssh_key()
else:
installer.install()
installer.install_forgejo()
installer.add_forgejo_repo("windows", "Windows")
installer.add_forgejo_repo("linux", "Linux")
installer.add_forgejo_repo("mac", "Mac")
except RequirementException as req:
show_error(f"Requisito para la instalación no satisfecho: {req.message}")
exit(1)