import npyscreen import os from git import Repo import subprocess # Importar el módulo subprocess import requests # Importar el módulo requests import time # Importar time para simular el progreso import threading # Importar threading para leer el log en tiempo real import socket import sys # Importar sys para leer los argumentos del script import logging # Importar el módulo logging import shutil # Importar para verificar el tamaño del terminal import json # Importar para manejar JSON CONFIGS_DIR = "/tmp/oginstall" LOGS_DIR = "/var/log/oginstaller" REPO_URL = "https://ognproject.evlt.uma.es/gitea/opengnsys/ogcore.git" # Configurar logging y directorio configuración try: if not os.path.exists(LOGS_DIR): os.makedirs(LOGS_DIR, exist_ok=True) LOG_FILE = os.path.join(LOGS_DIR, "oginstall.log") if not os.path.exists(CONFIGS_DIR): os.makedirs(CONFIGS_DIR, exist_ok=True) logging.basicConfig( filename=LOG_FILE, level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s", ) logging.debug("Inicio del programa") # Mensaje inicial para verificar que el log se crea except Exception as e: print(f"[ERROR] No se pudo configurar el logging: {e}") print(f"[ERROR] Verifica los permisos del directorio: {LOGS_DIR}") exit(1) # Salir si no se puede configurar el logging def get_network_interfaces(): """Obtiene los nombres de las interfaces de red disponibles en el servidor.""" try: # Listar las interfaces de red desde /sys/class/net interfaces = os.listdir('/sys/class/net') # Filtrar interfaces válidas (excluyendo interfaces virtuales como 'lo') valid_interfaces = [iface for iface in interfaces if not iface.startswith('lo')] return ','.join(valid_interfaces) # Devuelve las interfaces separadas por comas except Exception as e: # En caso de error, devolver un valor por defecto print(f"Error al obtener las interfaces de red: {e}") return "eth0" # Valor por defecto def get_available_versions(): """Obtiene la lista de versiones desde el archivo JSON remoto.""" try: # Validar si se pasa un argumento if len(sys.argv) > 1: arg = sys.argv[1].strip().lower() logging.debug(f"Argumento recibido: {arg}") # Usar logging en lugar de print # Validar explícitamente los valores permitidos if arg == "devel": url = "https://ognproject.evlt.uma.es/debian-opengnsys/versions-dev.json" elif arg == "nightly": logging.debug("No hay versiones disponibles para nightly. Usando 'latest'.") return ["latest"] # Devolver solo la opción 'latest' else: logging.debug(f"Argumento no reconocido: {arg}. Usando versiones de producción.") url = "https://ognproject.evlt.uma.es/debian-opengnsys/versions-prod.json" else: logging.debug("No se pasó ningún argumento. Usando versiones de producción.") url = "https://ognproject.evlt.uma.es/debian-opengnsys/versions-prod.json" # Realizar la solicitud HTTP logging.debug(f"Realizando solicitud HTTP a: {url}") response = requests.get(url, timeout=10) response.raise_for_status() # Lanza una excepción si la respuesta no es 200 OK # Registrar el contenido de la respuesta para depuración logging.debug(f"Contenido de la respuesta: {response.text}") # Intentar analizar el JSON try: data = response.json() except ValueError as e: logging.error(f"Error al analizar el JSON: {e}") raise RuntimeError(f"El contenido de la respuesta no es un JSON válido: {e}") # Validar que el JSON contiene la clave "versions" versions = data.get("versions", []) if not versions: logging.warning("La lista de versiones está vacía.") logging.debug(f"Versiones obtenidas: {versions}") return versions except (requests.RequestException, ValueError, RuntimeError) as e: logging.error(f"No se pudo obtener la lista de versiones: {e}") raise RuntimeError(f"Error crítico: {e}") # Lanzar una excepción crítica def get_default_ip(): """Obtiene la IP asociada a la interfaz por defecto del servidor.""" logging.debug("Obteniendo la IP por defecto del servidor.") try: # Obtener la interfaz asociada a la ruta por defecto result = subprocess.run( ["ip", "route", "get", "1.1.1.1"], capture_output=True, text=True, check=True ) # Extraer la interfaz de la salida interface = next((line.split()[-1] for line in result.stdout.splitlines() if "dev" in line), None) if not interface: logging.error("No se pudo determinar la interfaz por defecto.") raise ValueError("No se pudo determinar la interfaz por defecto.") logging.debug(f"Interfaz por defecto: {interface}") # Obtener la IP de la interfaz with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: s.connect(("8.8.8.8", 80)) ip_address = s.getsockname()[0] return ip_address except Exception as e: logging.error(f"Error al obtener la IP por defecto: {e}") return "192.168.2.2" # Valor por defecto def get_oglive_list(): """Obtiene la lista de valores de oglives desde la URL.""" try: # Realizar la solicitud HTTP response = requests.get("https://ognproject.evlt.uma.es/oglive/", timeout=10) response.raise_for_status() # Lanza una excepción si la respuesta no es 200 OK # Registrar el contenido de la respuesta para depuración # Extraer los enlaces del contenido HTML from bs4 import BeautifulSoup soup = BeautifulSoup(response.text, "html.parser") links = [a["href"] for a in soup.find_all("a", href=True) if "ogLive" in a["href"]] # Ordenar los enlaces por la parte después del guion bajo sorted_links = sorted(links, key=lambda x: x.split("_")[1] if "_" in x else x, reverse=True) return sorted_links except Exception as e: logging.error(f"Error al obtener la lista de oglives: {e}") return [] # Devolver una lista vacía en caso de error # Variable global para la IP por defecto DEFAULT_IP = get_default_ip() class InstallationTypeForm(npyscreen.ActionForm): """Formulario para seleccionar el tipo de instalación.""" def create(self): self.installation_type = self.add( npyscreen.TitleSelectOne, name="Selecciona el tipo de instalación:", values=["Mononodo", "Multinodo"], scroll_exit=False, max_height=10 ) def on_ok(self): """Guardar la selección y pasar al formulario correspondiente.""" logging.debug(f"Entrando en InstallationTypeForm") if self.installation_type.value is None: npyscreen.notify_confirm("Debes seleccionar un tipo de instalación.", title="Error") return if self.installation_type.value == [0]: # Mononodo logging.debug("Instalación mononodo seleccionada.") self.parentApp.installation_type = "mononodo" self.parentApp.setNextForm("MONONODO_CONFIG") elif self.installation_type.value == [1]: # Multinodo logging.debug("Instalación multinodo seleccionada.") self.parentApp.installation_type = "multinodo" self.parentApp.setNextForm("MULTINODO_CONFIG") def on_cancel(self): """Salir de la aplicación.""" if npyscreen.notify_yes_no("¿Estás seguro de que deseas salir?", title="Confirmación"): self.parentApp.setNextForm(None) class MononodoConfigForm(npyscreen.ActionForm): """Formulario para configurar Mononodo.""" def create(self): self.server_ip = self.add( npyscreen.TitleText, name="IP del servidor (mononodo):", value=get_default_ip() ) self.samba_user = self.add( npyscreen.TitleText, name="Usuario Samba:", value="opengnsys" ) self.samba_pass = self.add( npyscreen.TitlePassword, name="Contraseña Samba:", value="og", scroll_exit=True ) def on_ok(self): """Guardar la configuración y pasar al siguiente formulario.""" logging.debug(f"Entrando en MononodoConfigForm") self.parentApp.server_ip = self.server_ip.value self.parentApp.samba_user = self.samba_user.value self.parentApp.samba_pass = self.samba_pass.value self.parentApp.setNextForm("MAIN") def on_cancel(self): """Volver al formulario de selección de tipo de instalación.""" self.parentApp.setNextForm("INSTALLATION_TYPE") class MultinodoConfigForm(npyscreen.ActionForm): """Formulario para configurar Multinodo.""" def create(self): self.repo_ip = self.add( npyscreen.TitleText, name="IP del servidor Repository:", value=get_default_ip() ) self.dhcp_ip = self.add( npyscreen.TitleText, name="IP del servidor DHCP:", value=get_default_ip() ) self.core_ip = self.add( npyscreen.TitleText, name="IP del servidor Core:", value=get_default_ip() ) self.boot_ip = self.add( npyscreen.TitleText, name="IP del servidor Boot:", value=get_default_ip() ) self.samba_user = self.add( npyscreen.TitleText, name="Usuario Samba:", value="opengnsys" ) self.samba_pass = self.add( npyscreen.TitlePassword, name="Contraseña Samba:", value="og", scroll_exit=True ) def on_ok(self): """Guardar la configuración y pasar al siguiente formulario.""" self.parentApp.repo_ip = self.repo_ip.value self.parentApp.dhcp_ip = self.dhcp_ip.value self.parentApp.core_ip = self.core_ip.value self.parentApp.boot_ip = self.boot_ip.value self.parentApp.samba_user = self.samba_user.value self.parentApp.samba_pass = self.samba_pass.value self.parentApp.setNextForm("MAIN") def on_cancel(self): """Volver al formulario de selección de tipo de instalación.""" self.parentApp.setNextForm("INSTALLATION_TYPE") class ComponentSelectionForm(npyscreen.ActionForm): def create(self): self.components = self.add( npyscreen.TitleMultiSelect, max_height=6, name="Selecciona los componentes", values=["ogCore", "ogGui", "ogDhcp", "ogBoot", "ogRepository"], scroll_exit=True ) self.versions = get_available_versions() # Obtener las versiones desde el archivo JSON # Si no hay versiones disponibles, usar "latest" como opción por defecto if not self.versions: self.versions = ["latest"] self.tag = self.add( npyscreen.TitleSelectOne, max_height=10, name="Selecciona la versión", values=self.versions, scroll_exit=True ) self.tag.value = [0] # Marcar "latest" (o la primera opción) por defecto # Mostrar la IP del servidor si es mononodo if self.parentApp.installation_type == "mononodo": self.server_ip = self.add( npyscreen.TitleText, name="IP del servidor (mononodo):", value=self.parentApp.server_ip, editable=False ) # Agregar un cuadro de texto para mostrar el log self.log_box = self.add( npyscreen.BoxTitle, name="Log de depuración", max_height=10, scroll_exit=True ) def beforeEditing(self): """Configurar los valores iniciales de los componentes según el tipo de instalación.""" if self.parentApp.installation_type == "mononodo": # Seleccionar todos los componentes por defecto self.components.value = list(range(len(self.components.values))) else: # No seleccionar ningún componente por defecto self.components.value = [] self.display() def on_ok(self): # Validar selección obligatoria de componentes y versión if not self.components.value or len(self.components.value) == 0: npyscreen.notify_confirm("Debes seleccionar al menos un componente.", title="Error") return if not self.tag.value or len(self.tag.value) == 0: npyscreen.notify_confirm("Debes seleccionar una versión.", title="Error") return if self.parentApp.installation_type == "mononodo": self.handle_mononodo() else: self.handle_multinodo() def handle_mononodo(self): npyscreen.blank_terminal() selected_components = [self.components.values[i].lower() for i in self.components.value] selected_tag = self.versions[self.tag.value[0]] self.parentApp.selected_components = selected_components self.parentApp.selected_tag = selected_tag self.parentApp.current_component_index = 0 self.parentApp.configurations = {} self.parentApp.switchForm(selected_components[0]) def handle_multinodo(self): selected_components = [self.components.values[i].lower() for i in self.components.value] selected_tag = self.versions[self.tag.value[0]] self.parentApp.selected_components = selected_components self.parentApp.selected_tag = selected_tag self.parentApp.current_component_index = 0 self.parentApp.configurations = {} self.parentApp.switchForm(selected_components[0]) class ComponentForm(npyscreen.ActionForm): component_name = None def create(self): # Agregar un título dinámico basado en el componente en la primera línea self.title = self.add( npyscreen.FixedText, value="", editable=False, color="STANDOUT", rely=0 # Forzar que el título esté en la primera línea ) self.fields = {} def beforeEditing(self): npyscreen.blank_terminal() # Actualizar el valor del título dinámico basado en el componente self.title.value = f"Configuración del componente: {self.component_name.upper()}" self.title.display() self._recreate_form() def _recreate_form(self): """Limpia y recrea los widgets del formulario, excepto el título.""" # No eliminar el título al recrear los widgets self._widgets__ = [self.title] self._widgets_by_id__ = {id(self.title): self.title} self._contained_widgets = [self.title] self.configure_fields() def configure_fields(self): """Método para definir los campos de configuración para cada componente""" pass def on_ok(self): npyscreen.blank_terminal() component_config = {} for key, field_data in self.fields.items(): component_config[key] = field_data["widget"].value self.parentApp.configurations[self.component_name] = component_config self.parentApp.current_component_index += 1 if self.parentApp.current_component_index < len(self.parentApp.selected_components): next_component = self.parentApp.selected_components[self.parentApp.current_component_index] self.parentApp.switchForm(next_component) else: self.parentApp.generate_debconf() self.parentApp.setNextForm(None) def on_cancel(self): if npyscreen.notify_yes_no("¿Estás seguro de que deseas salir?", title="Confirmación"): self.parentApp.setNextForm(None) class OgCoreForm(ComponentForm): component_name = "ogcore" def configure_fields(self): self.add(npyscreen.FixedText, value="Usuario Administrador: ", editable=False, rely=2, relx=2, color="SAFE" , highlighted=True) self.fields["adminUser"] = { "widget": self.add( npyscreen.Textfield, value="ogadmin", rely=3, # Línea siguiente relx=18, highlighted=True ) } self.fields["adminPass"] = { "widget": self.add( npyscreen.TitlePassword, name="Contraseña Administrador:", value="12345678", rely=6 , # Ajustar la posición vertical highlighted=True , scroll_exit=True ) } class OgGuiForm(ComponentForm): component_name = "oggui" def configure_fields(self): """Configura los campos del formulario según el tipo de instalación.""" if self.parentApp.installation_type == "mononodo": self.server_ip = self.parentApp.server_ip elif self.parentApp.installation_type == "multinodo": self.server_ip = self.parentApp.core_ip self.add(npyscreen.FixedText, value="URL del servidor Core:", editable=False, rely=2, relx=2, color="SAFE" , highlighted=True) self.fields["ogcoreUrl"] = { "widget": self.add( npyscreen.Textfield, value=f"https://{self.server_ip}:8443", rely=3 , relx=18, highlighted=True# Ajustar la posición vertical ) } self.add(npyscreen.FixedText, value="URL del servidor Mercure:", editable=False, rely=4, relx=2, color="SAFE" , highlighted=True) self.fields["ogmercureUrl"] = { "widget": self.add( npyscreen.Textfield, value=f"https://{self.server_ip}:3000/.well-known/mercure", rely=6, relx=18# Ajustar la posición vertical ) } def on_ok(self): """Guarda la configuración y pasa al siguiente formulario.""" # Obtener la configuración del formulario component_config = { "ogcoreUrl": self.fields["ogcoreUrl"]["widget"].value, "ogmercureUrl": self.fields["ogmercureUrl"]["widget"].value, } # Guardar la configuración en el diccionario global self.parentApp.configurations[self.component_name] = component_config # Continuar con el siguiente formulario self.parentApp.current_component_index += 1 if self.parentApp.current_component_index < len(self.parentApp.selected_components): next_component = self.parentApp.selected_components[self.parentApp.current_component_index] self.parentApp.switchForm(next_component) else: self.parentApp.generate_debconf() self.parentApp.setNextForm(None) class OgDhcpForm(ComponentForm): component_name = "ogdhcp" def get_dhcp_ip(self): """Obtiene la IP del servidor DHCP.""" if self.parentApp.installation_type == "mononodo": return self.parentApp.server_ip elif self.parentApp.installation_type == "multinodo": return self.parentApp.dhcp_ip def get_boot_ip(self): """Obtiene la IP del servidor Boot.""" if self.parentApp.installation_type == "mononodo": return self.parentApp.server_ip elif self.parentApp.installation_type == "multinodo": return self.parentApp.boot_ip def configure_fields(self): # Obtener las interfaces de red disponibles available_interfaces = get_network_interfaces().split(",") # Selector de interfaces con altura ajustada self.fields["dhcp_interfaces"] = { "widget": self.add( npyscreen.TitleMultiSelect, name="Selecciona la interfaz de DHCP:", values=available_interfaces, scroll_exit=True, rely=2, max_height=5 # Reducir la altura para dejar espacio ) } # Campo para la IP del servidor DHCP self.fields["ip"] = { "widget": self.add( npyscreen.TitleText, name="IP del servidor DHCP:", value=self.get_dhcp_ip(), rely=8 # Ajustar la posición vertical ) } # Campo para la IP del servidor Boot self.fields["ogbootIP"] = { "widget": self.add( npyscreen.TitleText, name="IP del servidor Boot:", value=self.get_boot_ip(), rely=10 # Ajustar la posición vertical ) } def on_ok(self): available_interfaces = self.fields["dhcp_interfaces"]["widget"].values selected_indices = self.fields["dhcp_interfaces"]["widget"].value # Validar que al menos un interfaz esté seleccionado if not selected_indices or len(selected_indices) == 0: npyscreen.notify_confirm("Debes seleccionar al menos una interfaz de red para DHCP.", title="Error") return try: for i in selected_indices: if i < 0 or i >= len(available_interfaces): raise IndexError("Índice fuera de rango") selected_interfaces = [available_interfaces[i] for i in selected_indices] logging.debug(f"Interfaces seleccionadas: {selected_interfaces}") interfaces_string = ",".join(selected_interfaces) logging.debug(f"Interfaces seleccionadas: {interfaces_string}") except (IndexError, ValueError): npyscreen.notify_confirm("Selección inválida. Por favor, revisa los índices ingresados.", title="Error") return # Guardar las configuraciones self.parentApp.configurations[self.component_name] = { "interfaces": interfaces_string, "ip": self.fields["ip"]["widget"].value, "ogbootIP": self.fields["ogbootIP"]["widget"].value, } # Continuar con el siguiente formulario self.parentApp.current_component_index += 1 if self.parentApp.current_component_index < len(self.parentApp.selected_components): next_component = self.parentApp.selected_components[self.parentApp.current_component_index] self.parentApp.switchForm(next_component) else: self.parentApp.generate_debconf() self.parentApp.setNextForm(None) class OgBootForm(ComponentForm): component_name = "ogboot" download_url = "https://ognproject.evlt.uma.es/oglive/" def get_boot_ip(self): """Obtiene la IP del servidor Boot.""" if self.parentApp.installation_type == "mononodo": return self.parentApp.server_ip elif self.parentApp.installation_type == "multinodo": return self.parentApp.boot_ip def get_core_ip(self): """Obtiene la IP del servidor Core.""" if self.parentApp.installation_type == "mononodo": return self.parentApp.server_ip elif self.parentApp.installation_type == "multinodo": return self.parentApp.core_ip def get_samba_user(self): """Obtiene el usuario Samba.""" if self.parentApp.installation_type == "mononodo": return self.parentApp.samba_user elif self.parentApp.installation_type == "multinodo": return self.parentApp.samba_user def get_samba_user_pass(self): """Obtiene la contraseña del usuario Samba.""" if self.parentApp.installation_type == "mononodo": return self.parentApp.samba_pass elif self.parentApp.installation_type == "multinodo": return self.parentApp.samba_pass def configure_fields(self): # Obtener la lista de oglives oglives = get_oglive_list() if not oglives: oglives = ["https://ognproject.evlt.uma.es/oglive/ogLive-noble-6.8.0-31-generic-amd64-r20250116.538e3fa_20250120.iso"] npyscreen.notify_confirm("No se pudo obtener la lista de oglives. Usando un valor por defecto.", title="Error") # Campo para seleccionar un oglive self.fields["ogliveUrl"] = { "widget": self.add( npyscreen.TitleSelectOne, name="Selecciona un OgLive:", values=oglives, scroll_exit=True, max_height=6, rely=2 # Ajustar la posición vertical ) } # Otros campos #self.fields["ip"] = {"widget": self.add(npyscreen.TitleText, name="IP del servidor Boot:", value=self.get_boot_ip(), rely=14)} self.add(npyscreen.FixedText, value="IP del servidor Boot:", editable=False, rely=10, relx=2, color="SAFE" , highlighted=True) self.fields["ip"] = { "widget": self.add( npyscreen.Textfield, value=self.get_boot_ip(), rely=11, # Línea siguiente relx=18, highlighted=True ) } # self.fields["ogcoreUrl"] = {"widget": self.add(npyscreen.TitleText, name="URL OgCore:", value=f"https://{self.get_core_ip()}:8443", rely=18)} self.add(npyscreen.FixedText, value="IP del servidor Core:", editable=False, rely=12, relx=2, color="SAFE" , highlighted=True) self.fields["ogcoreUrl"] = { "widget": self.add( npyscreen.Textfield, value=f"https://{self.get_core_ip()}:8443", rely=13, # Línea siguiente relx=18, highlighted=True ) } # # self.fields["sambaUser"] = {"widget": self.add(npyscreen.TitleText, name="Usuario Samba:", value="opengnsys", rely=20)} # self.add(npyscreen.FixedText, value="Usuario Samba:", editable=False, rely=14, relx=2, color="SAFE" , highlighted=True) # self.fields["sambaUser"] = { # "widget": self.add( # npyscreen.Textfield, # value="opengnsys", # rely=15, # Línea siguiente # relx=18, # highlighted=True # ) # } # #self.fields["sambaUserPass"] = {"widget": self.add(npyscreen.TitlePassword, name="Contraseña Samba:", value="og", rely=22)} # self.fields["sambaUserPass"] = { # "widget": self.add( # npyscreen.TitlePassword, # name="Contraseña Samba:", # value="og", # rely=16, # Línea siguiente # ) # } #self.fields["port"] = {"widget": self.add(npyscreen.TitleText, name="Puerto Boot:", value="8082",hidden=True, rely=16)} self.add(npyscreen.FixedText, value="Puerto Boot:", editable=False, rely=12, relx=2, color="SAFE" , highlighted=True,hidden=True) self.fields["port"] = { "widget": self.add( npyscreen.Textfield, value="8082", rely=13, # Línea siguiente relx=18, hidden=True ) } def on_ok(self): # Obtener el oglive seleccionado selected_oglive_index = self.fields["ogliveUrl"]["widget"].value if not selected_oglive_index or len(selected_oglive_index) == 0: npyscreen.notify_confirm("Debes seleccionar una imagen OgLive.", title="Error") return selected_oglive = self.fields["ogliveUrl"]["widget"].values[selected_oglive_index[0]] # Guardar las configuraciones self.parentApp.configurations[self.component_name] = { "ogliveUrl": self.download_url + selected_oglive, "ip": self.fields["ip"]["widget"].value, "port": self.fields["port"]["widget"].value, "ogcoreUrl": self.fields["ogcoreUrl"]["widget"].value, "sambaUser": self.get_samba_user(), "sambaUserPass": self.get_samba_user_pass(), # "sambaUser": self.fields["sambaUser"]["widget"].value, # "sambaUserPass": self.fields["sambaUserPass"]["widget"].value, } # Continuar con el siguiente formulario self.parentApp.current_component_index += 1 if self.parentApp.current_component_index < len(self.parentApp.selected_components): next_component = self.parentApp.selected_components[self.parentApp.current_component_index] self.parentApp.switchForm(next_component) else: self.parentApp.generate_debconf() self.parentApp.setNextForm(None) class OgRepositoryForm(ComponentForm): component_name = "ogrepository" def get_repo_ip(self): """Obtiene la IP del servidor Repository.""" if self.parentApp.installation_type == "mononodo": return self.parentApp.server_ip elif self.parentApp.installation_type == "multinodo": return self.parentApp.repo_ip def get_core_ip(self): """Obtiene la IP del servidor Core.""" if self.parentApp.installation_type == "mononodo": return self.parentApp.server_ip elif self.parentApp.installation_type == "multinodo": return self.parentApp.core_ip def get_samba_user(self): """Obtiene el usuario Samba.""" if self.parentApp.installation_type == "mononodo": return self.parentApp.samba_user elif self.parentApp.installation_type == "multinodo": return self.parentApp.samba_user def get_samba_user_pass(self): """Obtiene la contraseña del usuario Samba.""" if self.parentApp.installation_type == "mononodo": return self.parentApp.samba_pass elif self.parentApp.installation_type == "multinodo": return self.parentApp.samba_pass def configure_fields(self): # Campo para la IP del Repositorio self.add(npyscreen.FixedText, value="IP del Repositorio:", editable=False, rely=2, relx=2, color="SAFE" , highlighted=True) self.fields["ogrepoIp"] = { "widget": self.add( npyscreen.Textfield, value=self.get_repo_ip(), rely=3, # Línea siguiente relx=18, highlighted=True ) } # Campo para la IP de OgCore self.add(npyscreen.FixedText, value="IP de OgCore:", editable=False, rely=5, relx=2, color="SAFE" , highlighted=True) self.fields["ogcoreIp"] = { "widget": self.add( npyscreen.Textfield, value=self.get_core_ip(), rely=6, # Línea siguiente relx=18, ) } # # Campo para el Usuario Samba # self.add(npyscreen.FixedText, value="Usuario Samba:", editable=False, rely=8, relx=2, color="SAFE" , highlighted=True) # self.fields["sambaUser"] = { # "widget": self.add( # npyscreen.Textfield, # value="opengnsys", # rely=9, # Línea siguiente # relx=18 , # ) # } # # Campo para la Contraseña Samba # self.fields["sambaUserPass"] = { # "widget": self.add( # npyscreen.TitlePassword, # name="Contraseña Samba:", # value="og", # rely=11 # Mantener el uso de TitlePassword # ) # } class InstallationProgressForm(npyscreen.Form): """Formulario para mostrar el progreso de instalación y el log en tiempo real.""" def create(self): # Ajustar alturas para evitar problemas de espacio self.progress_box = self.add( npyscreen.BoxTitle, name="Progreso de instalación", max_height=6, rely=1, relx=2, scroll_exit=True ) self.log_box = self.add( npyscreen.BoxTitle, name="Log de instalación", max_height=20, rely=10, relx=2, scroll_exit=True ) def update_progress(self, message, current=0, total=0): """Actualiza el progreso de instalación en la parte superior.""" if total > 0: # Crear una barra de progreso personalizada progress_percentage = int((current / total) * 100) bar_length = 30 # Longitud de la barra filled_length = int(bar_length * current // total) bar = f"[{'=' * filled_length}{' ' * (bar_length - filled_length)}] {progress_percentage}%" self.progress_box.values = [bar, message] else: self.progress_box.values = [message] self.progress_box.display() def update_log(self, new_line): """Actualiza el log en tiempo real en la parte inferior.""" max_lines = 14 # Número máximo de líneas a mostrar self.log_box.values.append(new_line.strip()) # Agregar la nueva línea self.log_box.values = self.log_box.values[-max_lines:] # Mantener solo las últimas `max_lines` líneas self.log_box.display() def install_components_with_ui(form, components, selected_tag): """Instala los componentes seleccionados mostrando el progreso y el log en tiempo real.""" log_file_path = os.path.join(LOGS_DIR, "installation.log") installed_packages = [] failed_packages = [] start_time = time.time() try: with open(log_file_path, "w", buffering=1) as log_file: total_packages = len(components) def tail_log(): try: with open(log_file_path, "r") as log_reader: log_reader.seek(0, os.SEEK_END) while True: line = log_reader.readline() if line: form.update_log(line) except Exception as e: # Mostrar error en log si ocurre try: form.update_log(f"[ERROR] {e}") except Exception: pass log_thread = threading.Thread(target=tail_log, daemon=True) log_thread.start() for index, package in enumerate(components, start=1): form.update_progress(f"Instalando paquete {index}/{total_packages}: {package}", current=index, total=total_packages) install_command = f"DEBIAN_FRONTEND=noninteractive apt-get install -y {package}" process = subprocess.Popen( install_command, shell=True, text=True, stdout=log_file, stderr=log_file, bufsize=1 ) process.wait() log_file.flush() if process.returncode != 0: error_message = f"Error al instalar el paquete {package}. Consulta el archivo de registro: {log_file_path}" form.update_progress(error_message) failed_packages.append(package) else: form.update_progress(f"Paquete {package} instalado correctamente.") installed_packages.append(package) if package == "ogboot": form.update_progress("Instalando paquete adicional: ogclient") install_command = "DEBIAN_FRONTEND=noninteractive apt-get install -y ogclient" process = subprocess.Popen( install_command, shell=True, text=True, stdout=log_file, stderr=log_file, bufsize=1 ) process.wait() log_file.flush() if process.returncode != 0: error_message = f"Error al instalar el paquete ogclient. Consulta el archivo de registro: {log_file_path}" form.update_progress(error_message) failed_packages.append("ogclient") else: form.update_progress("Paquete ogclient instalado correctamente.") installed_packages.append("ogclient") # Si hemos instalado ogcore, configuramos las variables del archivo env.json if "ogcore" in installed_packages: form.update_progress("Configurando variables de entorno para ogcore...") print ("Configurando variables de entorno para ogcore...") logging.debug("Configurando variables de entorno para ogcore...") ogDhcpIp = form.parentApp.server_ip if form.parentApp.installation_type == "mononodo" else form.parentApp.dhcp_ip ogCoreIp = form.parentApp.server_ip if form.parentApp.installation_type == "mononodo" else form.parentApp.core_ip ogBootIp = form.parentApp.server_ip if form.parentApp.installation_type == "mononodo" else form.parentApp.boot_ip env_path = "/opt/opengnsys/ogcore/api/env.json" shutil.copy(env_path, env_path + ".bak") # Hacer una copia de seguridad del archivo original with open(env_path, "r") as env_file: data = json.load(env_file) # Modificar los valores data["vars"]["OG_DHCP_API_URL"] = f"{ogDhcpIp}:8081" data["vars"]["OG_CORE_IP"] = ogCoreIp data["vars"]["OG_BOOT_IP"] = ogBootIp with open(env_path, "w", encoding="utf-8") as f: json.dump(data, f, indent=4) form.update_progress("Variables de entorno para ogcore configuradas correctamente.") ## Si la password de samba no es la por defecto o el usuario samba no es por defecto, cambiarla en los oglives y en el sistema con el comando smbpasswd ## Si el paquete ogboot está instalado se cambia la password en los oglives. Si el paquete ogrepository está instalado se cambia la password en el sistema. samba_user = form.parentApp.samba_user samba_pass = form.parentApp.samba_pass is_ogrepository_installed = "ogrepository" in installed_packages is_ogboot_installed = "ogboot" in installed_packages passwd_changed = (samba_pass != "og" or samba_user != "opengnsys") if passwd_changed and (is_ogrepository_installed or is_ogboot_installed): form.update_progress("Cambiando la contraseña del usuario Samba...") change_pass_command = f"echo -e '{samba_pass}\n{samba_pass}' | smbpasswd -s -a {samba_user}" change_pass_process = subprocess.Popen( change_pass_command, shell=True, text=True, stdout=log_file, stderr=log_file, bufsize=1 ) change_pass_process.wait() log_file.flush() form.update_progress("Cambiando la contraseña del usuario de Samba en los oglves ...") if change_pass_process.returncode != 0: error_message = f"Error al cambiar la contraseña del usuario Samba. Consulta el archivo de registro: {log_file_path}" form.update_progress(error_message) failed_packages.append("Cambio de contraseña Samba") else: form.update_progress("Contraseña del usuario Samba cambiada correctamente.") if passwd_changed and is_ogboot_installed: form.update_progress("Cambiando la contraseña del usuario Samba en los oglives...") change_pass_command = "/opt/opengnsys/ogboot/bin/setsmbpass" env= os.environ.copy() env['SAMBAUSER'] = samba_user env['SAMBAPASS'] = samba_pass change_pass_process = subprocess.Popen( change_pass_command, shell=True, text=True, stdout=log_file, stderr=log_file, bufsize=1 , env=env ) change_pass_process.wait() log_file.flush() if change_pass_process.returncode != 0: error_message = f"Error al cambiar la contraseña del usuario Samba en los oglives. Consulta el archivo de registro: {log_file_path}" form.update_progress(error_message) failed_packages.append("Cambio de contraseña Samba en oglives") else: form.update_progress("Contraseña del usuario Samba en oglives cambiada correctamente.") except Exception as e: try: form.update_progress(f"Error durante la instalación: {e}") except Exception: print(f"[ERROR] {e}") failed_packages.append("Error general durante la instalación") end_time = time.time() duration = end_time - start_time summary = "\n--- Resumen de la instalación ---\n" summary += f"Tiempo total de instalación: {duration:.2f} segundos\n" summary += f"Paquetes instalados correctamente: {len(installed_packages)}\n" for pkg in installed_packages: summary += f" - {pkg}\n" if failed_packages: summary += f"\nPaquetes que fallaron: {len(failed_packages)}\n" for pkg in failed_packages: summary += f" - {pkg}\n" else: summary += "\nTodos los paquetes se instalaron correctamente.\n" summary += f"\nConsulta el archivo de registro para más detalles: {log_file_path}" # Mostrar el resumen y salir del formulario try: npyscreen.notify_confirm(summary, title="Resumen de la instalación", wide=True) except Exception as e: print(summary) print(f"[ERROR] {e}") # Forzar la salida de la aplicación después del resumen import sys sys.exit(0) class MyApp(npyscreen.NPSAppManaged): def onStart(self): # Inicializar variables globales self.installation_type = None # Tipo de instalación seleccionado (mononodo o multinodo) self.server_ip = None # IP del servidor para mononodo self.repo_ip = None # IP del servidor Repository self.dhcp_ip = None # IP del servidor DHCP self.core_ip = None # IP del servidor Core self.boot_ip = None # IP del servidor Boot dpkg self.selected_components = [] # Componentes seleccionados self.selected_tag = None # Versión seleccionada self.configurations = {} # Configuraciones de los componentes # Registrar los formularios self.addForm("INSTALLATION_TYPE", InstallationTypeForm) self.addForm("MONONODO_CONFIG", MononodoConfigForm) self.addForm("MULTINODO_CONFIG", MultinodoConfigForm) self.addForm("MAIN", ComponentSelectionForm) self.addForm("ogcore", OgCoreForm) self.addForm("oggui", OgGuiForm) self.addForm("ogdhcp", OgDhcpForm) self.addForm("ogboot", OgBootForm) self.addForm("ogrepository", OgRepositoryForm) self.addForm("INSTALLATION_PROGRESS", InstallationProgressForm) # Configurar el formulario inicial self.setNextForm("INSTALLATION_TYPE") def generate_debconf(self): # Comprobar si la clave pública ya existe logging.debug("Entrando en generate_debconf") key_path = "/etc/apt/trusted.gpg.d/opengnsys.gpg" if os.path.exists(key_path): # Silenciar este mensaje pass else: # Añadir la clave pública try: logging.debug("Añadiendo la clave pública") subprocess.run( 'curl -k -L https://ognproject.evlt.uma.es/debian-opengnsys/public.key | gpg --dearmour -o /etc/apt/trusted.gpg.d/opengnsys.gpg', shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True ) except subprocess.CalledProcessError: logging.error("Error al añadir la clave pública") return # Añadir el repositorio try: logging.debug("Añadiendo el repositorio") selected_tag = self.selected_tag # Obtener el tag seleccionado # Determinar el valor de repo_line según el argumento recibido if len(sys.argv) > 1 and sys.argv[1].lower() == "devel": repo_line = f'deb http://ognproject.evlt.uma.es/debian-opengnsys/opengnsys-devel/{selected_tag} noble main' elif len(sys.argv) > 1 and sys.argv[1].lower() == "nightly": repo_line = f'deb http://ognproject.evlt.uma.es/debian-opengnsys/nightly/main noble main' else: repo_line = f'deb http://ognproject.evlt.uma.es/debian-opengnsys/opengnsys/{selected_tag} noble main' with open('/etc/apt/sources.list.d/opengnsys.list', 'w') as repo_file: repo_file.write(repo_line + '\n') except Exception: logging.error("Error al añadir el repositorio") print("Error al añadir el repositorio") return # Crear el archivo de versión instalada try: logging.debug("Creando el archivo de versión instalada") os.makedirs("/opt/opengnsys", exist_ok=True) with open("/opt/opengnsys/release", "w") as release_file: release_file.write(f"Versión instalada: {selected_tag}\n") except Exception as e: print(f"Error al crear el archivo de versión: {e}") # Actualizar los repositorios try: logging.debug("Actualizando los repositorios") subprocess.run('apt-get update', shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True) except subprocess.CalledProcessError: # Silenciar errores return # Generar configuraciones para debconf output_file = os.path.join(CONFIGS_DIR, "configurations.txt") try: with open(output_file, "w") as f: f.write("\n--- Configuraciones para debconf-set-selections ---\n") for component, config in self.configurations.items(): for key, value in config.items(): field_type = "password" if "Pass" in key else "string" line = f'echo "{component} opengnsys/{component}_{key} {field_type} {value}" | debconf-set-selections\n' f.write(line) # Ejecutar la línea directamente y redirigir la salida a /dev/null subprocess.run( line, shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL ) logging.debug(f"Configuraciones guardadas en: {output_file}") except Exception: # Silenciar errores logging.error(f"Error al guardar configuraciones en {output_file}") print(f"Error al guardar configuraciones en {output_file}") # Silenciar el mensaje de configuraciones guardadas # print(f"\nConfiguraciones guardadas en: {output_file}") # Llamar al formulario de progreso form = self.getForm("INSTALLATION_PROGRESS") self.switchForm("INSTALLATION_PROGRESS") install_components_with_ui(form, self.selected_components, self.selected_tag) def check_terminal_size(min_width=80, min_height=32): """Verifica si el tamaño del terminal es suficiente.""" terminal_size = shutil.get_terminal_size() if terminal_size.columns < min_width or terminal_size.lines < min_height: print(f"[ERROR] El tamaño del terminal es demasiado pequeño. Se requiere al menos {min_width}x{min_height}.") exit(1) if __name__ == "__main__": try: # Verificar el tamaño del terminal antes de iniciar la aplicación check_terminal_size() logging.debug("Ejecutando la aplicación principal") MyApp().run() except RuntimeError as e: print(f"[ERROR] {e}") logging.error(f"[ERROR] {e}") exit(1) # Salir con un código de error except KeyboardInterrupt: logging.warning("El programa fue interrumpido por el usuario (Ctrl-C)") except Exception as e: logging.error(f"[ERROR] Ocurrió un error inesperado: {e}") print(f"[ERROR] {e}") finally: # Asegurarse de que todos los mensajes de log se escriban en el archivo logging.debug("Finalizando el programa y cerrando el log") logging.shutdown() # Restaurar el terminal al estado normal npyscreen.wrapper_basic(lambda stdscr: None)