import npyscreen import os from git import Repo import subprocess # Importar el módulo subprocess import requests # Importar el módulo requests import time # Importar time para simular el progreso import threading # Importar threading para leer el log en tiempo real import socket CONFIGS_DIR = "/tmp/oginstall" os.makedirs(CONFIGS_DIR, exist_ok=True) REPO_URL = "https://ognproject.evlt.uma.es/gitea/opengnsys/ogcore.git" def get_network_interfaces(): """Obtiene los nombres de las interfaces de red disponibles en el servidor.""" try: # Listar las interfaces de red desde /sys/class/net interfaces = os.listdir('/sys/class/net') # Filtrar interfaces válidas (excluyendo interfaces virtuales como 'lo') valid_interfaces = [iface for iface in interfaces if not iface.startswith('lo')] return ','.join(valid_interfaces) # Devuelve las interfaces separadas por comas except Exception as e: # En caso de error, devolver un valor por defecto print(f"Error al obtener las interfaces de red: {e}") return "eth0" # Valor por defecto def get_available_versions(): """Obtiene la lista de versiones desde el archivo JSON remoto.""" try: url = "https://ognproject.evlt.uma.es/debian-opengnsys/versions.json" # Redirigir la salida de la descarga a /dev/null response = requests.get(url, timeout=10) response.raise_for_status() # Lanza una excepción si la respuesta no es 200 OK data = response.json() return data.get("versions", []) except requests.RequestException: # Silenciar errores y devolver una lista vacía return [] def get_default_ip(): """Obtiene la IP asociada a la interfaz por defecto del servidor.""" try: # Obtener la interfaz asociada a la ruta por defecto result = subprocess.run( ["ip", "route", "get", "1.1.1.1"], capture_output=True, text=True, check=True ) # Extraer la interfaz de la salida interface = next((line.split()[-1] for line in result.stdout.splitlines() if "dev" in line), None) if not interface: raise ValueError("No se pudo determinar la interfaz por defecto.") # Obtener la IP de la interfaz with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: s.connect(("8.8.8.8", 80)) ip_address = s.getsockname()[0] return ip_address except Exception as e: print(f"Error al obtener la IP por defecto: {e}") return "192.168.2.2" # Valor por defecto def get_oglive_list(): """Obtiene la lista de valores de oglives desde la URL.""" try: # Realizar la solicitud HTTP response = requests.get("https://ognproject.evlt.uma.es/oglive/", timeout=10) response.raise_for_status() # Lanza una excepción si la respuesta no es 200 OK # Extraer los enlaces del contenido HTML from bs4 import BeautifulSoup soup = BeautifulSoup(response.text, "html.parser") links = [a["href"] for a in soup.find_all("a", href=True) if "ogLive" in a["href"]] # Ordenar los enlaces por la parte después del guion bajo sorted_links = sorted(links, key=lambda x: x.split("_")[1] if "_" in x else x, reverse=True) return sorted_links except Exception as e: print(f"Error al obtener la lista de oglives: {e}") return [] # Devolver una lista vacía en caso de error # Variable global para la IP por defecto DEFAULT_IP = get_default_ip() class ComponentSelectionForm(npyscreen.ActionForm): def create(self): self.components = self.add(npyscreen.TitleMultiSelect, max_height=6, name="Selecciona los componentes", values=["ogCore", "ogGui", "ogDhcp", "ogBoot", "ogRepository"], scroll_exit=True) self.versions = get_available_versions() # Obtener las versiones desde el archivo JSON self.tag = self.add(npyscreen.TitleSelectOne, max_height=10, name="Selecciona la versión", values=self.versions, scroll_exit=True) def on_ok(self): npyscreen.blank_terminal() selected_components = [self.components.values[i].lower() for i in self.components.value] # Convertir a minúsculas if not selected_components: npyscreen.notify_confirm("Debes seleccionar al menos un componente.", title="Error") return if not self.tag.value: npyscreen.notify_confirm("Debes seleccionar una versión.", title="Error") return selected_tag = self.versions[self.tag.value[0]] # Usar la versión seleccionada self.parentApp.selected_components = selected_components self.parentApp.selected_tag = selected_tag self.parentApp.current_component_index = 0 self.parentApp.configurations = {} # Almacena los valores configurados self.parentApp.switchForm(selected_components[0]) # Ya están en minúsculas class ComponentForm(npyscreen.ActionForm): component_name = None def create(self): self.fields = {} def beforeEditing(self): npyscreen.blank_terminal() self.fields.clear() self._recreate_form() def _recreate_form(self): """Limpia y recrea los widgets del formulario.""" self._clear_widgets() self.configure_fields() def configure_fields(self): """Método para definir los campos de configuración para cada componente""" pass def _clear_widgets(self): """Limpia todos los widgets del formulario.""" self._widgets__ = [] self._widgets_by_id__ = {} self._contained_widgets = [] def on_ok(self): npyscreen.blank_terminal() component_config = {} for key, field_data in self.fields.items(): component_config[key] = field_data["widget"].value self.parentApp.configurations[self.component_name] = component_config self.parentApp.current_component_index += 1 if self.parentApp.current_component_index < len(self.parentApp.selected_components): next_component = self.parentApp.selected_components[self.parentApp.current_component_index] self.parentApp.switchForm(next_component) else: self.parentApp.generate_debconf() self.parentApp.setNextForm(None) def on_cancel(self): if npyscreen.notify_yes_no("¿Estás seguro de que deseas salir?", title="Confirmación"): self.parentApp.setNextForm(None) class OgCoreForm(ComponentForm): component_name = "ogcore" def configure_fields(self): self.fields["adminUser"] = {"widget": self.add(npyscreen.TitleText, name="Usuario administrador:", value="ogadmin")} self.fields["adminPass"] = {"widget": self.add(npyscreen.TitlePassword, name="Contraseña:", value="12345678")} class OgGuiForm(ComponentForm): component_name = "oggui" def configure_fields(self): self.fields["ogcoreUrl"] = {"widget": self.add(npyscreen.TitleText, name="URL API OgCore:", value="https://{}:8443".format(DEFAULT_IP))} self.fields["ogmercureUrl"] = {"widget": self.add(npyscreen.TitleText, name="Mercure URL:", value="https://{}:3000/.well-known/mercure".format(DEFAULT_IP))} class OgDhcpForm(ComponentForm): component_name = "ogdhcp" def configure_fields(self): # Obtener las interfaces de red disponibles available_interfaces = get_network_interfaces().split(",") # Mostrar las interfaces en una sola línea interfaces_display = ", ".join(f"{i}:{iface}" for i, iface in enumerate(available_interfaces)) # Campo para mostrar las interfaces disponibles self.fields["interfaces_display"] = { "widget": self.add( npyscreen.TitleText, name="Interfaces disponibles (índice:nombre):", value=interfaces_display, editable=False # Solo para mostrar, no editable ) } # Campo para que el usuario seleccione los índices de las interfaces self.fields["interfaces"] = { "widget": self.add( npyscreen.TitleText, name="Selecciona los índices separados por comas:", value="" ) } # Otros campos self.fields["ip"] = {"widget": self.add(npyscreen.TitleText, name="IP del servidor DHCP:", value=DEFAULT_IP)} self.fields["ogbootIP"] = {"widget": self.add(npyscreen.TitleText, name="IP del servidor Boot:", value=DEFAULT_IP)} def on_ok(self): # Obtener los índices seleccionados por el usuario selected_indices = self.fields["interfaces"]["widget"].value.split(",") try: # Convertir los índices en nombres de interfaces available_interfaces = get_network_interfaces().split(",") selected_interfaces = [available_interfaces[int(i.strip())] for i in selected_indices if i.strip().isdigit()] interfaces_string = ",".join(selected_interfaces) except (IndexError, ValueError): npyscreen.notify_confirm("Selección inválida. Por favor, revisa los índices ingresados.", title="Error") return # Guardar las configuraciones self.parentApp.configurations[self.component_name] = { "interfaces": interfaces_string, "ip": self.fields["ip"]["widget"].value, "ogbootIP": self.fields["ogbootIP"]["widget"].value, } # Continuar con el siguiente formulario self.parentApp.current_component_index += 1 if self.parentApp.current_component_index < len(self.parentApp.selected_components): next_component = self.parentApp.selected_components[self.parentApp.current_component_index] self.parentApp.switchForm(next_component) else: self.parentApp.generate_debconf() self.parentApp.setNextForm(None) class OgBootForm(ComponentForm): component_name = "ogboot" download_url = "https://ognproject.evlt.uma.es/oglive/" def configure_fields(self): # Obtener la lista de oglives oglives = get_oglive_list() if not oglives: oglives = ["https://ognproject.evlt.uma.es/oglive/ogLive-noble-6.8.0-31-generic-amd64-r20250116.538e3fa_20250120.iso"] npyscreen.notify_confirm("No se pudo obtener la lista de oglives. Usando un valor por defecto.", title="Error") # Campo para seleccionar un oglive self.fields["ogliveUrl"] = { "widget": self.add( npyscreen.TitleSelectOne, name="Selecciona un OgLive:", values=oglives, scroll_exit=True, max_height=10 # Limitar la altura para listas largas ) } # Otros campos self.fields["ip"] = {"widget": self.add(npyscreen.TitleText, name="IP del servidor Boot:", value=DEFAULT_IP)} self.fields["port"] = {"widget": self.add(npyscreen.TitleText, name="Puerto Boot:", value="8082")} self.fields["ogcoreUrl"] = {"widget": self.add(npyscreen.TitleText, name="URL OgCore:", value=f"https://{DEFAULT_IP}:8443")} self.fields["sambaUser"] = {"widget": self.add(npyscreen.TitleText, name="Usuario Samba:", value="opengnsys")} self.fields["sambaUserPass"] = {"widget": self.add(npyscreen.TitlePassword, name="Contraseña Samba:", value="og")} def on_ok(self): # Obtener el oglive seleccionado selected_oglive_index = self.fields["ogliveUrl"]["widget"].value if selected_oglive_index: selected_oglive = self.fields["ogliveUrl"]["widget"].values[selected_oglive_index[0]] else: selected_oglive = None # Guardar las configuraciones self.parentApp.configurations[self.component_name] = { "ogliveUrl": self.download_url + selected_oglive, "ip": self.fields["ip"]["widget"].value, "port": self.fields["port"]["widget"].value, "ogcoreUrl": self.fields["ogcoreUrl"]["widget"].value, "sambaUser": self.fields["sambaUser"]["widget"].value, "sambaUserPass": self.fields["sambaUserPass"]["widget"].value, } # Continuar con el siguiente formulario self.parentApp.current_component_index += 1 if self.parentApp.current_component_index < len(self.parentApp.selected_components): next_component = self.parentApp.selected_components[self.parentApp.current_component_index] self.parentApp.switchForm(next_component) else: self.parentApp.generate_debconf() self.parentApp.setNextForm(None) class OgRepositoryForm(ComponentForm): component_name = "ogrepository" def configure_fields(self): self.fields["ogrepoIp"] = {"widget": self.add(npyscreen.TitleText, name="IP del Repositorio:", value=DEFAULT_IP)} self.fields["ogcoreIp"] = {"widget": self.add(npyscreen.TitleText, name="IP de OgCore:", value=DEFAULT_IP)} self.fields["sambaUser"] = {"widget": self.add(npyscreen.TitleText, name="Usuario Samba:", value="opengnsys")} self.fields["sambaUserPass"] = {"widget": self.add(npyscreen.TitlePassword, name="Contraseña Samba:", value="og")} class InstallationProgressForm(npyscreen.FormBaseNew): """Formulario para mostrar el progreso de instalación y el log en tiempo real.""" def create(self): # Crear la parte superior para el progreso de instalación self.progress_box = self.add( npyscreen.BoxTitle, name="Progreso de instalación", max_height=int(self.lines * 0.5), # Mitad superior scroll_exit=True ) # Crear la parte inferior para el log en tiempo real self.log_box = self.add( npyscreen.BoxTitle, name="Log de instalación", rely=int(self.lines * 0.5), # Mitad inferior scroll_exit=True ) def update_progress(self, message, current=0, total=0): """Actualiza el progreso de instalación en la parte superior.""" if total > 0: # Crear una barra de progreso personalizada progress_percentage = int((current / total) * 100) bar_length = 30 # Longitud de la barra filled_length = int(bar_length * current // total) bar = f"[{'=' * filled_length}{' ' * (bar_length - filled_length)}] {progress_percentage}%" self.progress_box.values.append(bar) self.progress_box.values.append(message) self.progress_box.display() def update_log(self, log_lines): """Actualiza el log en tiempo real en la parte inferior.""" # Limpiar caracteres especiales de las líneas del log cleaned_lines = [self._clean_text(line) for line in log_lines] self.log_box.values = cleaned_lines[-self.log_box.height:] # Mostrar solo las últimas líneas self.log_box.display() def _clean_text(self, text): """Elimina caracteres especiales o no imprimibles del texto.""" return ''.join(c if c.isprintable() else '?' for c in text) def install_components_with_ui(form, components, selected_tag): """Instala los componentes seleccionados mostrando el progreso y el log en tiempo real.""" log_file_path = os.path.join(CONFIGS_DIR, "installation.log") installed_packages = [] # Lista de paquetes instalados correctamente failed_packages = [] # Lista de paquetes que fallaron # Registrar el tiempo de inicio start_time = time.time() try: with open(log_file_path, "w") as log_file: total_packages = len(components) # Hilo para leer el log en tiempo real def tail_log(): with open(log_file_path, "r") as log_reader: log_reader.seek(0, os.SEEK_END) # Ir al final del archivo while True: line = log_reader.readline() if line: form.update_log(log_reader.readlines()) time.sleep(0.1) log_thread = threading.Thread(target=tail_log, daemon=True) log_thread.start() for index, package in enumerate(components, start=1): # Actualizar el progreso en la parte superior con barra de progreso form.update_progress(f"Instalando paquete {index}/{total_packages}: {package}", current=index, total=total_packages) # Crear una barra de progreso para el paquete install_command = f"DEBIAN_FRONTEND=noninteractive apt-get install -y {package}" process = subprocess.Popen( install_command, shell=True, text=True, stdout=log_file, stderr=log_file ) # Esperar a que el proceso de instalación termine process.wait() # Registrar errores en el archivo de registro if process.returncode != 0: error_message = f"Error al instalar el paquete {package}. Consulta el archivo de registro: {log_file_path}" form.update_progress(error_message) failed_packages.append(package) # Agregar a la lista de fallos else: form.update_progress(f"Paquete {package} instalado correctamente.") installed_packages.append(package) # Agregar a la lista de éxitos # Instalar ogclient si se está instalando ogboot if package == "ogboot": form.update_progress("Instalando paquete adicional: ogclient") install_command = "DEBIAN_FRONTEND=noninteractive apt-get install -y ogclient" process = subprocess.Popen( install_command, shell=True, text=True, stdout=log_file, stderr=log_file ) process.wait() if process.returncode != 0: error_message = f"Error al instalar el paquete ogclient. Consulta el archivo de registro: {log_file_path}" form.update_progress(error_message) failed_packages.append("ogclient") else: form.update_progress("Paquete ogclient instalado correctamente.") installed_packages.append("ogclient") except Exception as e: form.update_progress(f"Error durante la instalación: {e}") failed_packages.append("Error general durante la instalación") # Registrar el tiempo de finalización end_time = time.time() duration = end_time - start_time # Calcular la duración en segundos # Generar el resumen summary = "\n--- Resumen de la instalación ---\n" summary += f"Tiempo total de instalación: {duration:.2f} segundos\n" summary += f"Paquetes instalados correctamente: {len(installed_packages)}\n" for pkg in installed_packages: summary += f" - {pkg}\n" if failed_packages: summary += f"\nPaquetes que fallaron: {len(failed_packages)}\n" for pkg in failed_packages: summary += f" - {pkg}\n" else: summary += "\nTodos los paquetes se instalaron correctamente.\n" summary += f"\nConsulta el archivo de registro para más detalles: {log_file_path}" # Mostrar el resumen en una ventana emergente npyscreen.notify_confirm(summary, title="Resumen de la instalación", wide=True) # Mostrar el resumen en la parte superior (opcional, si quieres mantenerlo en el formulario) form.update_progress(summary) class MyApp(npyscreen.NPSAppManaged): def onStart(self): self.addForm("MAIN", ComponentSelectionForm) self.addForm("ogcore", OgCoreForm) self.addForm("oggui", OgGuiForm) self.addForm("ogdhcp", OgDhcpForm) self.addForm("ogboot", OgBootForm) self.addForm("ogrepository", OgRepositoryForm) self.addForm("INSTALLATION_PROGRESS", InstallationProgressForm) def generate_debconf(self): # Comprobar si la clave pública ya existe key_path = "/etc/apt/trusted.gpg.d/opengnsys.gpg" if os.path.exists(key_path): # Silenciar este mensaje pass else: # Añadir la clave pública try: subprocess.run( 'curl -k -L https://ognproject.evlt.uma.es/debian-opengnsys/public.key | gpg --dearmour -o /etc/apt/trusted.gpg.d/opengnsys.gpg', shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True ) except subprocess.CalledProcessError: # Silenciar errores return # Añadir el repositorio try: selected_tag = self.selected_tag # Obtener el tag seleccionado repo_line = f'deb http://ognproject.evlt.uma.es/debian-opengnsys/opengnsys-devel/{selected_tag} noble main' with open('/etc/apt/sources.list.d/opengnsys.list', 'w') as repo_file: repo_file.write(repo_line + '\n') except Exception: # Silenciar errores return # Actualizar los repositorios try: subprocess.run('apt-get update', shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True) except subprocess.CalledProcessError: # Silenciar errores return # Generar configuraciones para debconf output_file = os.path.join(CONFIGS_DIR, "configurations.txt") try: with open(output_file, "w") as f: f.write("\n--- Configuraciones para debconf-set-selections ---\n") for component, config in self.configurations.items(): for key, value in config.items(): field_type = "password" if "Pass" in key else "string" line = f'echo "{component} opengnsys/{component}_{key} {field_type} {value}" | debconf-set-selections\n' f.write(line) # Ejecutar la línea directamente y redirigir la salida a /dev/null subprocess.run( line, shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL ) except Exception: # Silenciar errores pass # Silenciar el mensaje de configuraciones guardadas # print(f"\nConfiguraciones guardadas en: {output_file}") # Llamar al formulario de progreso form = self.getForm("INSTALLATION_PROGRESS") self.switchForm("INSTALLATION_PROGRESS") install_components_with_ui(form, self.selected_components, self.selected_tag) if __name__ == "__main__": MyApp().run()