oggit/installer/opengnsys_git_installer.py

885 lines
32 KiB
Python

#!/usr/bin/env python3
"""Script para la instalación del repositorio git"""
import os
import sys
sys.path.insert(0, "/usr/share/opengnsys-modules/python3/dist-packages")
import shutil
import argparse
import tempfile
import logging
import subprocess
import sys
import pwd
import grp
from termcolor import cprint
import git
import libarchive
from libarchive.extract import *
#from libarchive.entry import FileType
import urllib.request
import pathlib
import socket
import time
import requests
import tempfile
import hashlib
import datetime
#FORGEJO_VERSION="8.0.3"
FORGEJO_VERSION="9.0.0"
FORGEJO_URL=f"https://codeberg.org/forgejo/forgejo/releases/download/v{FORGEJO_VERSION}/forgejo-{FORGEJO_VERSION}-linux-amd64"
def show_error(*args):
"""
Imprime un mensaje de error
Args:
*args: Argumentos igual que a la función print
Returns:
None
"""
cprint(*args, "red", attrs = ["bold"], file=sys.stderr)
class RequirementException(Exception):
"""Excepción que indica que nos falta algún requisito
Attributes:
message (str): Mensaje de error mostrado al usuario
"""
def __init__(self, message):
"""Inicializar RequirementException.
Args:
message (str): Mensaje de error mostrado al usuario
"""
super().__init__(message)
self.message = message
class FakeTemporaryDirectory:
"""Imitación de TemporaryDirectory para depuración"""
def __init__(self, dirname):
self.name = dirname
os.makedirs(dirname, exist_ok=True)
def __str__(self):
return self.name
class Oglive:
"""Interfaz a utilidad oglivecli
Esto es probablemente temporal hasta que se haga una conversión de oglivecli
"""
def __init__(self):
self.__logger = logging.getLogger("Oglive")
self.binary = "/opt/opengnsys/bin/oglivecli"
self.__logger.debug("Inicializando")
def _cmd(self, args):
cmd = [self.binary] + args
self.__logger.debug("comando: %s", cmd)
proc = subprocess.run(cmd, shell=False, check=True, capture_output=True)
out_text = proc.stdout.decode('utf-8').strip()
self.__logger.debug("salida: %s", out_text)
return out_text
def get_default(self):
"""Devuelve el cliente por defecto"""
self.__logger.debug("get_default()")
return self._cmd(["get-default"])
def get_clients(self):
"""Devuelve la lista de clientes en un dict"""
self.__logger.debug("get_clients()")
lines = self._cmd(["list"]).splitlines()
clients = {}
for line in lines:
(number, name) = line.split()
clients[number] = name
self.__logger.debug("Clientes: %s", clients)
return clients
class OpengnsysGitInstaller:
"""Instalador de OpenGnsys"""
def __init__(self):
"""Inicializar clase"""
self.__logger = logging.getLogger("OpengnsysGitInstaller")
self.__logger.setLevel(logging.DEBUG)
self.__logger.debug("Inicializando")
self.testmode = False
self.base_path = "/opt/opengnsys"
self.git_basedir = "base.git"
self.email = "OpenGnsys@opengnsys.com"
self.forgejo_user = "oggit"
self.forgejo_password = "opengnsys"
self.forgejo_organization = "opengnsys"
self.forgejo_port = 3000
self.set_ssh_user_group("oggit", "oggit")
self.temp_dir = None
self.script_path = os.path.realpath(os.path.dirname(__file__))
# Possible names for SSH public keys
self.ssh_key_users = ["root", "opengnsys"]
self.key_names = ["id_rsa.pub", "id_ed25519.pub", "id_ecdsa.pub", "id_ed25519_sk.pub", "id_ecdsa_sk.pub"]
# Possible names for SSH key in oglive
self.key_paths = ["scripts/ssl/id_rsa.pub", "scripts/ssl/id_ed25519.pub", "scripts/ssl/id_ecdsa.pub", "scripts/ssl/id_ed25519_sk.pub", "scripts/ssl/id_ecdsa_sk.pub"]
self.key_paths_dict = {}
for kp in self.key_paths:
self.key_paths_dict[kp] = 1
self.oglive = Oglive()
def set_testmode(self, value):
"""Establece el modo de prueba"""
self.testmode = value
def set_ignoresshkey(self, value):
"""Ignorar requisito de clave de ssh para el instalador"""
self.ignoresshkey = value
def set_usesshkey(self, value):
"""Usar clave de ssh especificada"""
self.usesshkey = value
def set_basepath(self, value):
"""Establece ruta base de OpenGnsys
Valor por defecto: /opt/opengnsys
"""
self.base_path = value
def _get_tempdir(self):
"""Obtiene el directorio temporal"""
if self.testmode:
dirname = "/tmp/ogtemp"
if os.path.exists(dirname):
shutil.rmtree(dirname)
dir=FakeTemporaryDirectory(dirname)
self.__logger.debug("Modo de prueba, temp=/tmp/ogtemp")
return dir
else:
dir = tempfile.TemporaryDirectory()
self.__logger.debug("Temp = %s", dir)
return dir
def _cleanup(self):
"""Limpia el directorio temporal"""
if self.temp_dir:
shutil.rmtree(self.temp_dir, ignore_errors=True)
def set_ssh_user_group(self, username, groupname):
self.ssh_group = groupname
self.ssh_user = username
try:
self.ssh_gid = grp.getgrnam(self.ssh_group).gr_gid
self.__logger.info("Group %s exists with gid %i", self.ssh_group, self.ssh_gid)
except KeyError:
self.__logger.info("Need to create group %s", self.ssh_group)
subprocess.run(["/usr/sbin/groupadd", "--system", self.ssh_group], check=True)
self.ssh_gid = grp.getgrnam(groupname).gr_gid
try:
self.ssh_uid = pwd.getpwnam(self.ssh_user).pw_uid
self.__logger.info("User %s exists with gid %i", self.ssh_user, self.ssh_uid)
except KeyError:
self.__logger.info("Need to create user %s", self.ssh_user)
subprocess.run(["/usr/sbin/useradd", "--gid", str(self.ssh_gid), "-m", "--system", self.ssh_user], check=True)
self.ssh_uid = pwd.getpwnam(username).pw_uid
self.ssh_homedir = pwd.getpwnam(username).pw_dir
def init_git_repo(self, reponame):
"""Inicializa un repositorio Git"""
# Creamos repositorio
ogdir_images = os.path.join(self.base_path, "images")
self.__logger.info("Creando repositorio de GIT %s", reponame)
os.makedirs(os.path.join(ogdir_images, self.git_basedir), exist_ok=True)
repo_path=os.path.join(ogdir_images, reponame)
shutil.rmtree(repo_path, ignore_errors=True)
# Marcar como directorio seguro
# Nota: no usar GitPython. Config global falla, aunque hay indicaciones de que
# git.Repo(path=None) es valido. Posiblemente bug de GitPython.
subprocess.run(["git", "config", "--global", "add" "safe.directory", repo_path])
self.__logger.debug("Inicializando repositorio: " + repo_path)
repo = git.Repo.init(repo_path, bare = True)
self.__logger.info("Configurando repositorio de GIT")
repo.config_writer().set_value("user", "name", "OpenGnsys").release()
repo.config_writer().set_value("user", "email", self.email).release()
self._recursive_chown(repo_path, ouid=self.ssh_uid, ogid=self.ssh_gid)
def _add_line_to_file(self, filename, new_line):
"""Agrega una línea a un archivo"""
found = False
self.__logger.debug("Agregando linea: %s a %s", new_line, filename)
with open(filename, "a+", encoding="utf-8") as f:
f.seek(0)
for line in f:
if line.strip() == new_line.strip():
found = True
if not found:
self.__logger.debug("Agregando linea: %s", new_line)
f.write(new_line + "\n")
else:
self.__logger.debug("Linea ya presente")
def _recursive_chown(self, path, ouid, ogid):
"""Cambia el propietario y grupo de forma recursiva"""
for dirpath, _, filenames in os.walk(path):
os.chown(dirpath, uid=ouid, gid=ogid)
for filename in filenames:
os.chown(os.path.join(dirpath, filename), uid=ouid, gid=ogid)
def _wait_for_port(self, host, port):
self.__logger.info("Waiting for %s:%i to be up", host, port)
timeout = 60
start_time = time.time()
ready = False
while not ready and (time.time() - start_time) < 60:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
s.connect((host, port))
ready = True
s.close()
except TimeoutError:
self.__logger.debug("Timed out, no connection yet.")
except OSError as oserr:
self.__logger.debug("%s, no connection yet. %.1f seconds left.", oserr.strerror, timeout - (time.time() - start_time))
time.sleep(0.1)
if ready:
self.__logger.info("Connection established.")
else:
self.__logger.error("Timed out waiting for connection!")
raise TimeoutError("Timed out waiting for connection!")
def add_ssh_key_from_squashfs(self, oglive_num = None):
if oglive_num is None:
self.__logger.info("Using default oglive")
oglive_num = int(self.oglive.get_default())
else:
self.__logger.info("Using oglive %i", oglive_num)
oglive_client = self.oglive.get_clients()[str(oglive_num)]
self.__logger.info("Oglive is %s", oglive_client)
keys = installer.extract_ssh_keys(oglive_num = oglive_num)
for k in keys:
timestamp = '{:%Y-%m-%d %H:%M:%S}'.format(datetime.datetime.now())
installer.add_forgejo_sshkey(k, f"Key for {oglive_client} ({timestamp})")
def extract_ssh_keys(self, oglive_num = None):
public_keys = []
squashfs = "ogclient.sqfs"
tftp_dir = os.path.join(self.base_path, "tftpboot")
if oglive_num is None:
self.__logger.info("Reading from default oglive")
oglive_num = self.oglive.get_default()
else:
self.__logger.info("Reading from oglive %i", oglive_num)
oglive_client = self.oglive.get_clients()[str(oglive_num)]
self.__logger.info("Oglive is %s", oglive_client)
client_squashfs_path = os.path.join(tftp_dir, oglive_client, squashfs)
self.__logger.info("Mounting %s", client_squashfs_path)
mount_tempdir = tempfile.TemporaryDirectory()
ssh_keys_dir = os.path.join(mount_tempdir.name, "root", ".ssh")
subprocess.run(["mount", client_squashfs_path, mount_tempdir.name], check=True)
for file in os.listdir(ssh_keys_dir):
full_path = os.path.join(ssh_keys_dir, file)
if file.endswith(".pub"):
self.__logger.info("Found public key: %s", full_path)
with open(full_path, "r", encoding="utf-8") as keyfile:
keydata = keyfile.read().strip()
public_keys = public_keys + [keydata]
subprocess.run(["umount", mount_tempdir.name], check=True)
return public_keys
def _extract_ssh_key_from_initrd(self):
public_key=""
INITRD = "oginitrd.img"
tftp_dir = os.path.join(self.base_path, "tftpboot")
default_num = self.oglive.get_default()
default_client = self.oglive.get_clients()[default_num]
client_initrd_path = os.path.join(tftp_dir, default_client, INITRD)
#self.temp_dir = self._get_tempdir()
if self.usesshkey:
with open(self.usesshkey, 'r') as f:
public_key = f.read().strip()
else:
if os.path.isfile(client_initrd_path):
#os.makedirs(temp_dir, exist_ok=True)
#os.chdir(self.temp_dir.name)
self.__logger.debug("Descomprimiendo %s", client_initrd_path)
public_key = None
with libarchive.file_reader(client_initrd_path) as initrd:
for file in initrd:
self.__logger.debug("Archivo: %s", file)
pathname = file.pathname;
if pathname.startswith("./"):
pathname = pathname[2:]
if pathname in self.key_paths_dict:
data = bytearray()
for block in file.get_blocks():
data = data + block
public_key = data.decode('utf-8').strip()
break
else:
print(f"No se encuentra la imagen de initrd {client_initrd_path}")
exit(2)
return public_key
def set_ssh_key_in_initrd(self, client_num = None):
INITRD = "oginitrd.img"
tftp_dir = os.path.join(self.base_path, "tftpboot")
if client_num is None:
self.__logger.info("Will modify default client")
client_num = self.oglive.get_default()
ogclient = self.oglive.get_clients()[client_num]
client_initrd_path = os.path.join(tftp_dir, ogclient, INITRD)
client_initrd_path_new = client_initrd_path + ".new"
self.__logger.debug("initrd path for ogclient %s is %s", ogclient, client_initrd_path)
temp_dir = tempfile.TemporaryDirectory()
temp_dir_path = temp_dir.name
#temp_dir_path = "/tmp/extracted"
if os.path.exists(temp_dir_path):
shutil.rmtree(temp_dir_path)
pathlib.Path(temp_dir_path).mkdir(parents=True, exist_ok = True)
self.__logger.debug("Uncompressing initrd %s into %s", client_initrd_path, temp_dir_path)
os.chdir(temp_dir_path)
libarchive.extract_file(client_initrd_path, flags = EXTRACT_UNLINK | EXTRACT_OWNER | EXTRACT_PERM | EXTRACT_FFLAGS | EXTRACT_TIME)
ssh_key_dir = os.path.join(temp_dir_path, "scripts", "ssl")
client_key_path = os.path.join(ssh_key_dir, "id_ed25519")
authorized_keys_path = os.path.join(ssh_key_dir, "authorized_keys")
oglive_public_key = ""
# Create a SSH key on the oglive, if needed
pathlib.Path(ssh_key_dir).mkdir(parents=True, exist_ok=True)
if os.path.exists(client_key_path):
self.__logger.info("Creating SSH key not necessary, it already is in the initrd")
else:
self.__logger.info("Writing new SSH key into %s", client_key_path)
subprocess.run(["/usr/bin/ssh-keygen", "-t", "ed25519", "-N", "", "-f", client_key_path], check=True)
with open(client_key_path + ".pub", "r", encoding="utf-8") as pubkey:
oglive_public_key = pubkey.read()
# Add our public keys to the oglive, so that we can log in
public_keys = ""
for username in self.ssh_key_users:
self.__logger.debug("Looking for keys in user %s", username)
homedir = pwd.getpwnam(username).pw_dir
for key in self.key_names:
key_path = os.path.join(homedir, ".ssh", key)
self.__logger.debug("Checking if we have %s...", key_path)
if os.path.exists(key_path):
with open(key_path, "r", encoding='utf-8') as public_key_file:
self.__logger.info("Adding %s to authorized_keys", key_path)
public_key = public_key_file.read()
public_keys = public_keys + public_key + "\n"
self.__logger.debug("Writing %s", authorized_keys_path)
with open(authorized_keys_path, "w", encoding='utf-8') as auth_keys:
auth_keys.write(public_keys)
# hardlinks in the source package are not correctly packaged back as hardlinks.
# Taking the easy option of turning them into symlinks for now.
file_hashes = {}
with libarchive.file_writer(client_initrd_path_new, "cpio_newc", "zstd") as writer:
file_list = []
for root, subdirs, files in os.walk(temp_dir_path):
proot = pathlib.PurePosixPath(root)
relpath = proot.relative_to(temp_dir_path)
for file in files:
abs_path = os.path.join(root, file)
full_path = os.path.join(relpath, file)
#self.__logger.debug("%s", abs_path)
digest = None
if os.path.islink(abs_path):
self.__logger.debug("%s is a symlink", abs_path)
continue
if not os.path.exists(abs_path):
self.__logger.debug("%s does not exist", abs_path)
continue
stat_data = os.stat(abs_path)
with open(full_path, "rb") as in_file:
digest = hashlib.file_digest(in_file, "sha256").hexdigest()
if stat_data.st_size > 0 and not os.path.islink(full_path):
if digest in file_hashes:
target_path = pathlib.Path(file_hashes[digest])
link_path = target_path.relative_to(relpath, walk_up=True)
self.__logger.debug("%s was a duplicate of %s, linking to %s", full_path, file_hashes[digest], link_path)
os.unlink(full_path)
#os.link(file_hashes[digest], full_path)
os.symlink(link_path, full_path)
else:
file_hashes[digest] = full_path
writer.add_files(".", recursive=True )
os.rename(client_initrd_path, client_initrd_path + ".old")
if os.path.exists(client_initrd_path + ".sum"):
os.rename(client_initrd_path + ".sum", client_initrd_path + ".sum.old")
os.rename(client_initrd_path_new, client_initrd_path)
with open(client_initrd_path, "rb") as initrd_file:
hexdigest = hashlib.file_digest(initrd_file, "sha256").hexdigest()
with open(client_initrd_path + ".sum", "w", encoding="utf-8") as digest_file:
digest_file.write(hexdigest + "\n")
self.__logger.info("Updated initrd %s", client_initrd_path)
timestamp = '{:%Y-%m-%d %H:%M:%S}'.format(datetime.datetime.now())
self.add_forgejo_sshkey(oglive_public_key, f"Key for {ogclient} ({timestamp})")
def install(self):
"""Instalar
Ejecuta todo el proceso de instalación incluyendo:
* Dependencias
* Configuración de authorized_keys
* Configuración de ssh
* Creación de repositorio
Raises:
RequirementException: No ejecutado por usuario root
RequirementException: No ejecutado en Debian o Ubuntu
RequirementException: Falta clave pública
RequirementException: Python < 3.8
"""
self.__logger.info("install()")
ogdir_images = os.path.join(self.base_path, "images")
ENGINECFG = os.path.join(self.base_path, "client/etc/engine.cfg")
os.environ["PATH"] += os.pathsep + os.path.join(self.base_path, "bin")
tftp_dir = os.path.join(self.base_path, "tftpboot")
INITRD = "oginitrd.img"
self.temp_dir = self._get_tempdir()
SSHUSER = "opengnsys"
# Control básico de errores.
self.__logger.debug("Comprobando euid")
if os.geteuid() != 0:
raise RequirementException("Sólo ejecutable por root")
if not os.path.exists("/etc/debian_version"):
raise RequirementException("Instalación sólo soportada en Debian y Ubuntu")
MIN_PYTHON = (3, 8)
if sys.version_info < MIN_PYTHON:
raise RequirementException(f"Python %s.%s mínimo requerido.\n" % MIN_PYTHON)
self.__logger.debug("Instalando dependencias")
subprocess.run(["apt-get", "install", "-y", "git"], check=True)
def _install_template(self, template, destination, keysvalues):
self.__logger.info("Writing template %s into %s", template, destination)
data = ""
with open(template, "r", encoding="utf-8") as template_file:
data = template_file.read()
for key in keysvalues.keys():
data = data.replace("{" + key + "}", keysvalues[key])
with open(destination, "w+", encoding="utf-8") as out_file:
out_file.write(data)
def _runcmd(self, cmd):
self.__logger.debug("Running: %s", cmd)
ret = subprocess.run(cmd, check=True,capture_output=True, encoding='utf-8')
return ret.stdout.strip()
def install_forgejo(self):
self.__logger.info("Installing Forgejo")
bin_path = os.path.join(self.base_path, "bin", "forgejo")
conf_dir_path = os.path.join(self.base_path, "etc", "forgejo")
lfs_dir_path = os.path.join(self.base_path, "images", "git-lfs")
git_dir_path = os.path.join(self.base_path, "images", "git")
forgejo_work_dir_path = os.path.join(self.base_path, "var", "lib", "forgejo/work")
forgejo_db_dir_path = os.path.join(self.base_path, "var", "lib", "forgejo/db")
forgejo_data_dir_path = os.path.join(self.base_path, "var", "lib", "forgejo/data")
forgejo_db_path = os.path.join(forgejo_db_dir_path, "forgejo.db")
forgejo_log_dir_path = os.path.join(self.base_path, "log", "forgejo")
conf_path = os.path.join(conf_dir_path, "app.ini")
self.__logger.debug("Stopping opengnsys-forgejo service")
subprocess.run(["systemctl", "stop", "opengnsys-forgejo"], check=False)
self.__logger.debug("Downloading from %s into %s", FORGEJO_URL, bin_path)
urllib.request.urlretrieve(FORGEJO_URL, bin_path)
os.chmod(bin_path, 0o755)
if os.path.exists(forgejo_db_path):
self.__logger.debug("Removing old configuration")
os.unlink(forgejo_db_path)
else:
self.__logger.debug("Old configuration not present, ok.")
self.__logger.debug("Wiping old data")
for dir in [conf_dir_path, git_dir_path, lfs_dir_path, forgejo_work_dir_path, forgejo_data_dir_path, forgejo_db_dir_path]:
if os.path.exists(dir):
self.__logger.debug("Removing %s", dir)
shutil.rmtree(dir)
self.__logger.debug("Creating directories")
pathlib.Path(conf_dir_path).mkdir(parents=True, exist_ok=True)
pathlib.Path(git_dir_path).mkdir(parents=True, exist_ok=True)
pathlib.Path(lfs_dir_path).mkdir(parents=True, exist_ok=True)
pathlib.Path(forgejo_work_dir_path).mkdir(parents=True, exist_ok=True)
pathlib.Path(forgejo_data_dir_path).mkdir(parents=True, exist_ok=True)
pathlib.Path(forgejo_db_dir_path).mkdir(parents=True, exist_ok=True)
pathlib.Path(forgejo_log_dir_path).mkdir(parents=True, exist_ok=True)
os.chown(lfs_dir_path, self.ssh_uid, self.ssh_gid)
os.chown(git_dir_path, self.ssh_uid, self.ssh_gid)
os.chown(forgejo_data_dir_path, self.ssh_uid, self.ssh_gid)
os.chown(forgejo_work_dir_path, self.ssh_uid, self.ssh_gid)
os.chown(forgejo_db_dir_path, self.ssh_uid, self.ssh_gid)
os.chown(forgejo_log_dir_path, self.ssh_uid, self.ssh_gid)
data = {
"forgejo_user" : self.ssh_user,
"forgejo_group" : self.ssh_group,
"forgejo_port" : str(self.forgejo_port),
"forgejo_bin" : bin_path,
"forgejo_app_ini" : conf_path,
"forgejo_work_path" : forgejo_work_dir_path,
"forgejo_data_path" : forgejo_data_dir_path,
"forgejo_db_path" : forgejo_db_path,
"forgejo_repository_root" : git_dir_path,
"forgejo_lfs_path" : lfs_dir_path,
"forgejo_log_path" : forgejo_log_dir_path,
"forgejo_hostname" : self._runcmd("hostname"),
"forgejo_lfs_jwt_secret" : self._runcmd([bin_path,"generate", "secret", "LFS_JWT_SECRET"]),
"forgejo_jwt_secret" : self._runcmd([bin_path,"generate", "secret", "JWT_SECRET"]),
"forgejo_internal_token" : self._runcmd([bin_path,"generate", "secret", "INTERNAL_TOKEN"]),
"forgejo_secret_key" : self._runcmd([bin_path,"generate", "secret", "SECRET_KEY"])
}
self._install_template(os.path.join(self.script_path, "forgejo-app.ini"), conf_path, data)
self._install_template(os.path.join(self.script_path, "forgejo.service"), "/etc/systemd/system/opengnsys-forgejo.service", data)
self.__logger.debug("Reloading systemd and starting service")
subprocess.run(["systemctl", "daemon-reload"], check=True)
subprocess.run(["systemctl", "enable", "opengnsys-forgejo"], check=True)
subprocess.run(["systemctl", "restart", "opengnsys-forgejo"], check=True)
self.__logger.info("Waiting for forgejo to start")
self._wait_for_port("localhost", self.forgejo_port)
self.__logger.info("Configuring forgejo")
def run_forge_cmd(args):
cmd = [bin_path, "--config", conf_path] + args
self.__logger.debug("Running command: %s", cmd)
ret = subprocess.run(cmd, check=False, capture_output=True, encoding='utf-8', user=self.ssh_user)
if ret.returncode == 0:
return ret.stdout.strip()
else:
self.__logger.error("Failed to run command: %s, return code %i", cmd, ret.returncode)
self.__logger.error("stdout: %s", ret.stdout)
self.__logger.error("stderr: %s", ret.stderr)
raise RuntimeError("Failed to run necessary command")
run_forge_cmd(["admin", "doctor", "check"])
run_forge_cmd(["admin", "user", "create", "--username", self.forgejo_user, "--password", self.forgejo_password, "--email", self.email])
token = run_forge_cmd(["admin", "user", "generate-access-token", "--username", self.forgejo_user, "-t", "gitapi", "--scopes", "all", "--raw"])
with open(os.path.join(self.base_path, "etc", "ogGitApiToken.cfg"), "w+", encoding='utf-8') as token_file:
token_file.write(token)
ssh_key = self._extract_ssh_key_from_initrd()
self.add_forgejo_sshkey(ssh_key, "Default key")
def add_forgejo_repo(self, repository_name, description = ""):
token = ""
with open(os.path.join(self.base_path, "etc", "ogGitApiToken.cfg"), "r", encoding='utf-8') as token_file:
token = token_file.read().strip()
self.__logger.info("Adding repository %s for Forgejo", repository_name)
r = requests.post(
f"http://localhost:{self.forgejo_port}/api/v1/user/repos",
json={
"auto_init" : False,
"default_branch" : "main",
"description" : description,
"name" : repository_name,
"private" : False
}, headers={
'Authorization' : f"token {token}"
},
timeout = 60
)
self.__logger.info("Request status was %i, content %s", r.status_code, r.content)
def add_forgejo_sshkey(self, pubkey, description = ""):
token = ""
with open(os.path.join(self.base_path, "etc", "ogGitApiToken.cfg"), "r", encoding='utf-8') as token_file:
token = token_file.read().strip()
self.__logger.info("Adding SSH key to Forgejo: %s (%s)", pubkey, description)
r = requests.post(
f"http://localhost:{self.forgejo_port}/api/v1/user/keys",
json={
"key" : pubkey,
"read_only" : False,
"title" : description
}, headers={
'Authorization' : f"token {token}"
},
timeout = 60
)
self.__logger.info("Request status was %i, content %s", r.status_code, r.content)
def add_forgejo_organization(self, pubkey, description = ""):
token = ""
with open(os.path.join(self.base_path, "etc", "ogGitApiToken.cfg"), "r", encoding='utf-8') as token_file:
token = token_file.read().strip()
self.__logger.info("Adding SSH key to Forgejo: %s", pubkey)
r = requests.post(
f"http://localhost:{self.forgejo_port}/api/v1/user/keys",
json={
"key" : pubkey,
"read_only" : False,
"title" : description
}, headers={
'Authorization' : f"token {token}"
},
timeout = 60
)
self.__logger.info("Request status was %i, content %s", r.status_code, r.content)
if __name__ == '__main__':
sys.stdout.reconfigure(encoding='utf-8')
opengnsys_log_dir = "/opt/opengnsys/log"
logger = logging.getLogger(__package__)
logger.setLevel(logging.DEBUG)
streamLog = logging.StreamHandler()
streamLog.setLevel(logging.INFO)
if not os.path.exists(opengnsys_log_dir):
os.mkdir(opengnsys_log_dir)
logFilePath = f"{opengnsys_log_dir}/git_installer.log"
fileLog = logging.FileHandler(logFilePath)
fileLog.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(name)24s - [%(levelname)5s] - %(message)s')
streamLog.setFormatter(formatter)
fileLog.setFormatter(formatter)
logger.addHandler(streamLog)
logger.addHandler(fileLog)
parser = argparse.ArgumentParser(
prog="OpenGnsys Installer",
description="Script para la instalación del repositorio git",
)
parser.add_argument('--forgejo-only', action='store_true', help="Solo instalar forgejo")
parser.add_argument('--forgejo-addrepos', action='store_true', help="Solo agregar repositorios forgejo")
parser.add_argument('--testmode', action='store_true', help="Modo de prueba")
parser.add_argument('--ignoresshkey', action='store_true', help="Ignorar clave de SSH")
parser.add_argument('--usesshkey', type=str, help="Usar clave SSH especificada")
parser.add_argument('--test-createuser', action='store_true')
parser.add_argument('--extract-ssh-key', action='store_true', help="Extract SSH key from oglive squashfs")
parser.add_argument('--set-ssh-key', action='store_true', help="Read SSH key from oglive squashfs and set it in Forgejo")
parser.add_argument('--extract-ssh-key-from-initrd', action='store_true', help="Extract SSH key from oglive initrd (obsolete)")
parser.add_argument('--set-ssh-key-in-initrd', action='store_true', help="Configure SSH key in oglive (obsolete)")
parser.add_argument('--oglive', type=int, metavar='NUM', help = "Do SSH key manipulation on this oglive")
parser.add_argument('--quiet', action='store_true', help="Quiet console output")
parser.add_argument("-v", "--verbose", action="store_true", help = "Verbose console output")
args = parser.parse_args()
if args.quiet:
streamLog.setLevel(logging.WARNING)
if args.verbose:
streamLog.setLevel(logging.DEBUG)
installer = OpengnsysGitInstaller()
installer.set_testmode(args.testmode)
installer.set_ignoresshkey(args.ignoresshkey)
installer.set_usesshkey(args.usesshkey)
logger.debug("Inicio de instalación")
try:
if args.forgejo_only:
installer.install_forgejo()
elif args.forgejo_addrepos:
installer.add_forgejo_repo("linux")
elif args.test_createuser:
installer.set_ssh_user_group("oggit2", "oggit2")
elif args.extract_ssh_key:
keys = installer.extract_ssh_keys(oglive_num = args.oglive)
print(f"{keys}")
elif args.extract_ssh_key_from_initrd:
key = installer._extract_ssh_key_from_initrd()
print(f"{key}")
elif args.set_ssh_key:
installer.add_ssh_key_from_squashfs(oglive_num=args.oglive)
elif args.set_ssh_key_in_initrd:
installer.set_ssh_key_in_initrd()
else:
installer.install()
installer.install_forgejo()
installer.add_forgejo_repo("windows", "Windows")
installer.add_forgejo_repo("linux", "Linux")
installer.add_forgejo_repo("mac", "Mac")
except RequirementException as req:
show_error(f"Requisito para la instalación no satisfecho: {req.message}")
exit(1)