import npyscreen import os from git import Repo import subprocess # Importar el módulo subprocess import requests # Importar el módulo requests from tqdm import tqdm # Importar tqdm para la barra de progreso import time # Importar time para simular el progreso import threading # Importar threading para leer el log en tiempo real import socket CONFIGS_DIR = "/tmp/oginstall" os.makedirs(CONFIGS_DIR, exist_ok=True) REPO_URL = "https://ognproject.evlt.uma.es/gitea/opengnsys/ogcore.git" def get_network_interfaces(): """Obtiene los nombres de las interfaces de red disponibles en el servidor.""" try: # Listar las interfaces de red desde /sys/class/net interfaces = os.listdir('/sys/class/net') # Filtrar interfaces válidas (excluyendo interfaces virtuales como 'lo') valid_interfaces = [iface for iface in interfaces if not iface.startswith('lo')] return ','.join(valid_interfaces) # Devuelve las interfaces separadas por comas except Exception as e: # En caso de error, devolver un valor por defecto print(f"Error al obtener las interfaces de red: {e}") return "eth0" # Valor por defecto def get_available_versions(): """Obtiene la lista de versiones desde el archivo JSON remoto.""" try: url = "https://ognproject.evlt.uma.es/debian-opengnsys/versions.json" # Redirigir la salida de la descarga a /dev/null response = requests.get(url, timeout=10) response.raise_for_status() # Lanza una excepción si la respuesta no es 200 OK data = response.json() return data.get("versions", []) except requests.RequestException: # Silenciar errores y devolver una lista vacía return [] def get_default_ip(): """Obtiene la IP asociada a la interfaz por defecto del servidor.""" try: # Obtener la interfaz asociada a la ruta por defecto result = subprocess.run( ["ip", "route", "get", "1.1.1.1"], capture_output=True, text=True, check=True ) # Extraer la interfaz de la salida interface = next((line.split()[-1] for line in result.stdout.splitlines() if "dev" in line), None) if not interface: raise ValueError("No se pudo determinar la interfaz por defecto.") # Obtener la IP de la interfaz with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: s.connect(("8.8.8.8", 80)) ip_address = s.getsockname()[0] return ip_address except Exception as e: print(f"Error al obtener la IP por defecto: {e}") return "192.168.2.2" # Valor por defecto # Variable global para la IP por defecto DEFAULT_IP = get_default_ip() class ComponentSelectionForm(npyscreen.ActionForm): def create(self): self.components = self.add(npyscreen.TitleMultiSelect, max_height=6, name="Selecciona los componentes", values=["ogCore", "ogGui", "ogDhcp", "ogBoot", "ogRepository"], scroll_exit=True) self.versions = get_available_versions() # Obtener las versiones desde el archivo JSON self.tag = self.add(npyscreen.TitleSelectOne, max_height=10, name="Selecciona la versión", values=self.versions, scroll_exit=True) def on_ok(self): npyscreen.blank_terminal() selected_components = [self.components.values[i].lower() for i in self.components.value] # Convertir a minúsculas if not selected_components: npyscreen.notify_confirm("Debes seleccionar al menos un componente.", title="Error") return if not self.tag.value: npyscreen.notify_confirm("Debes seleccionar una versión.", title="Error") return selected_tag = self.versions[self.tag.value[0]] # Usar la versión seleccionada self.parentApp.selected_components = selected_components self.parentApp.selected_tag = selected_tag self.parentApp.current_component_index = 0 self.parentApp.configurations = {} # Almacena los valores configurados self.parentApp.switchForm(selected_components[0]) # Ya están en minúsculas class ComponentForm(npyscreen.ActionForm): component_name = None def create(self): self.fields = {} def beforeEditing(self): npyscreen.blank_terminal() self.fields.clear() self._recreate_form() def _recreate_form(self): """Limpia y recrea los widgets del formulario.""" self._clear_widgets() self.configure_fields() def configure_fields(self): """Método para definir los campos de configuración para cada componente""" pass def _clear_widgets(self): """Limpia todos los widgets del formulario.""" self._widgets__ = [] self._widgets_by_id__ = {} self._contained_widgets = [] def on_ok(self): npyscreen.blank_terminal() component_config = {} for key, field_data in self.fields.items(): component_config[key] = field_data["widget"].value self.parentApp.configurations[self.component_name] = component_config self.parentApp.current_component_index += 1 if self.parentApp.current_component_index < len(self.parentApp.selected_components): next_component = self.parentApp.selected_components[self.parentApp.current_component_index] self.parentApp.switchForm(next_component) else: self.parentApp.generate_debconf() self.parentApp.setNextForm(None) def on_cancel(self): if npyscreen.notify_yes_no("¿Estás seguro de que deseas salir?", title="Confirmación"): self.parentApp.setNextForm(None) class OgCoreForm(ComponentForm): component_name = "ogcore" def configure_fields(self): self.fields["adminUser"] = {"widget": self.add(npyscreen.TitleText, name="Usuario administrador:", value="ogadmin")} self.fields["adminPass"] = {"widget": self.add(npyscreen.TitlePassword, name="Contraseña:", value="12345678")} class OgGuiForm(ComponentForm): component_name = "oggui" def configure_fields(self): self.fields["ogcoreUrl"] = {"widget": self.add(npyscreen.TitleText, name="URL API OgCore:", value="https://{}:8443".format(DEFAULT_IP))} self.fields["ogmercureUrl"] = {"widget": self.add(npyscreen.TitleText, name="Mercure URL:", value="https://{} gr:3000/.well-known/mercure".format(DEFAULT_IP))} class OgDhcpForm(ComponentForm): component_name = "ogdhcp" def configure_fields(self): # Obtener las interfaces de red disponibles available_interfaces = get_network_interfaces().split(",") # Mostrar las interfaces en una sola línea interfaces_display = ", ".join(f"{i}:{iface}" for i, iface in enumerate(available_interfaces)) # Campo para mostrar las interfaces disponibles self.fields["interfaces_display"] = { "widget": self.add( npyscreen.TitleText, name="Interfaces disponibles (índice:nombre):", value=interfaces_display, editable=False # Solo para mostrar, no editable ) } # Campo para que el usuario seleccione los índices de las interfaces self.fields["interfaces"] = { "widget": self.add( npyscreen.TitleText, name="Selecciona los índices separados por comas:", value="" ) } # Otros campos self.fields["ip"] = {"widget": self.add(npyscreen.TitleText, name="IP del servidor DHCP:", value=DEFAULT_IP)} self.fields["ogbootIP"] = {"widget": self.add(npyscreen.TitleText, name="IP del servidor Boot:", value=DEFAULT_IP)} def on_ok(self): # Obtener los índices seleccionados por el usuario selected_indices = self.fields["interfaces"]["widget"].value.split(",") try: # Convertir los índices en nombres de interfaces available_interfaces = get_network_interfaces().split(",") selected_interfaces = [available_interfaces[int(i.strip())] for i in selected_indices if i.strip().isdigit()] interfaces_string = ",".join(selected_interfaces) except (IndexError, ValueError): npyscreen.notify_confirm("Selección inválida. Por favor, revisa los índices ingresados.", title="Error") return # Guardar las configuraciones self.parentApp.configurations[self.component_name] = { "interfaces": interfaces_string, "ip": self.fields["ip"]["widget"].value, "ogbootIP": self.fields["ogbootIP"]["widget"].value, } # Continuar con el siguiente formulario self.parentApp.current_component_index += 1 if self.parentApp.current_component_index < len(self.parentApp.selected_components): next_component = self.parentApp.selected_components[self.parentApp.current_component_index] self.parentApp.switchForm(next_component) else: self.parentApp.generate_debconf() self.parentApp.setNextForm(None) class OgBootForm(ComponentForm): component_name = "ogboot" def configure_fields(self): self.fields["ip"] = {"widget": self.add(npyscreen.TitleText, name="IP del servidor Boot:", value=DEFAULT_IP)} self.fields["port"] = {"widget": self.add(npyscreen.TitleText, name="Puerto Boot:", value="8082")} self.fields["ogcoreUrl"] = {"widget": self.add(npyscreen.TitleText, name="URL OgCore:", value=f"https://{DEFAULT_IP}:8443")} self.fields["ogliveUrl"] = {"widget": self.add(npyscreen.TitleText, name="URL OgLive:", value="https://ognproject.evlt.uma.es/oglive/ogLive-noble-6.8.0-31-generic-amd64-r20250116.538e3fa_20250120.iso")} self.fields["sambaUser"] = {"widget": self.add(npyscreen.TitleText, name="Usuario Samba:", value="opengnsys")} self.fields["sambaUserPass"] = {"widget": self.add(npyscreen.TitlePassword, name="Contraseña Samba:", value="og")} class OgRepositoryForm(ComponentForm): component_name = "ogrepository" def configure_fields(self): self.fields["ogrepoIp"] = {"widget": self.add(npyscreen.TitleText, name="IP del Repositorio:", value=DEFAULT_IP)} self.fields["ogcoreIp"] = {"widget": self.add(npyscreen.TitleText, name="IP de OgCore:", value=DEFAULT_IP)} self.fields["sambaUser"] = {"widget": self.add(npyscreen.TitleText, name="Usuario Samba:", value="opengnsys")} self.fields["sambaUserPass"] = {"widget": self.add(npyscreen.TitlePassword, name="Contraseña Samba:", value="og")} def install_components(components, selected_tag): """Instala los componentes seleccionados usando el tag especificado.""" log_file_path = os.path.join(CONFIGS_DIR, "installation.log") installed_packages = [] # Lista de paquetes instalados correctamente failed_packages = [] # Lista de paquetes que fallaron # Registrar el tiempo de inicio start_time = time.time() try: with open(log_file_path, "w") as log_file: total_packages = len(components) for index, package in enumerate(components, start=1): # Mostrar solo el progreso en la terminal print(f"Instalando paquete {index}/{total_packages}: {package}") log_file.write(f"\n--- Instalando paquete {index}/{total_packages}: {package} ---\n") # Crear una barra de progreso para el paquete with tqdm(total=100, desc=f"Instalando {package}", unit="%", ncols=80) as progress_bar: # Ejecutar el comando y redirigir toda la salida al archivo de registro install_command = f"DEBIAN_FRONTEND=noninteractive apt-get install -y {package}" process = subprocess.Popen( install_command, shell=True, text=True, stdout=log_file, stderr=log_file ) # Función para leer el log en tiempo real y actualizar la barra de progreso def monitor_log(): with open(log_file_path, "r") as log_reader: log_reader.seek(0, os.SEEK_END) # Ir al final del archivo while process.poll() is None: line = log_reader.readline() if not line: time.sleep(0.1) # Esperar si no hay nuevas líneas continue # Actualizar la barra de progreso según las cadenas detectadas if "Se necesita descargar" in line: progress_bar.n = 10 elif "Preparando para desempaquetar" in line: progress_bar.n = 30 elif "Configurando" in line and package in line: progress_bar.n = 40 progress_bar.refresh() # Iniciar el hilo para monitorear el log log_thread = threading.Thread(target=monitor_log, daemon=True) log_thread.start() # Esperar a que el proceso de instalación termine process.wait() # Completar la barra de progreso progress_bar.n = 100 progress_bar.refresh() # Registrar errores en el archivo de registro if process.returncode != 0: error_message = f"Error al instalar el paquete {package}. Consulta el archivo de registro: {log_file_path}" print(error_message) log_file.write(f"\n{error_message}\n") failed_packages.append(package) # Agregar a la lista de fallos else: log_file.write(f"Paquete {package} instalado correctamente.\n") installed_packages.append(package) # Agregar a la lista de éxitos except Exception as e: with open(log_file_path, "a") as log_file: log_file.write(f"\nError durante la instalación de los paquetes: {e}\n") failed_packages.append("Error general durante la instalación") # Registrar el tiempo de finalización end_time = time.time() duration = end_time - start_time # Calcular la duración en segundos # Generar el resumen summary = "\n--- Resumen de la instalación ---\n" summary += f"Tiempo total de instalación: {duration:.2f} segundos\n" summary += f"Paquetes instalados correctamente: {len(installed_packages)}\n" for pkg in installed_packages: summary += f" - {pkg}\n" if failed_packages: summary += f"\nPaquetes que fallaron: {len(failed_packages)}\n" for pkg in failed_packages: summary += f" - {pkg}\n" else: summary += "\nTodos los paquetes se instalaron correctamente.\n" summary += f"\nConsulta el archivo de registro para más detalles: {log_file_path}" # Mostrar el resumen en una ventana emergente npyscreen.notify_confirm(summary, title="Resumen de la instalación", wide=True) class MyApp(npyscreen.NPSAppManaged): def onStart(self): self.addForm("MAIN", ComponentSelectionForm) self.addForm("ogcore", OgCoreForm) self.addForm("oggui", OgGuiForm) self.addForm("ogdhcp", OgDhcpForm) self.addForm("ogboot", OgBootForm) self.addForm("ogrepository", OgRepositoryForm) def generate_debconf(self): # Comprobar si la clave pública ya existe key_path = "/etc/apt/trusted.gpg.d/opengnsys.gpg" if os.path.exists(key_path): # Silenciar este mensaje pass else: # Añadir la clave pública try: subprocess.run( 'curl -k -L https://ognproject.evlt.uma.es/debian-opengnsys/public.key | gpg --dearmour -o /etc/apt/trusted.gpg.d/opengnsys.gpg', shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True ) except subprocess.CalledProcessError: # Silenciar errores return # Añadir el repositorio try: selected_tag = self.selected_tag # Obtener el tag seleccionado repo_line = f'deb http://ognproject.evlt.uma.es/debian-opengnsys/opengnsys-devel/{selected_tag} noble main' with open('/etc/apt/sources.list.d/opengnsys.list', 'w') as repo_file: repo_file.write(repo_line + '\n') except Exception: # Silenciar errores return # Actualizar los repositorios try: subprocess.run('apt-get update', shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True) except subprocess.CalledProcessError: # Silenciar errores return # Generar configuraciones para debconf output_file = os.path.join(CONFIGS_DIR, "configurations.txt") try: with open(output_file, "w") as f: f.write("\n--- Configuraciones para debconf-set-selections ---\n") for component, config in self.configurations.items(): for key, value in config.items(): field_type = "password" if "Pass" in key else "string" line = f'echo "{component} opengnsys/{component}_{key} {field_type} {value}" | debconf-set-selections\n' f.write(line) # Ejecutar la línea directamente y redirigir la salida a /dev/null subprocess.run( line, shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL ) except Exception: # Silenciar errores pass # Silenciar el mensaje de configuraciones guardadas # print(f"\nConfiguraciones guardadas en: {output_file}") # Llamar a la función de instalación después de generar las configuraciones install_components(self.selected_components, self.selected_tag) if __name__ == "__main__": MyApp().run()