#!/usr/bin/env python3 import subprocess import requests import sys import npyscreen import re import os # Configuración general REPO_BASE_URL = "http://ognproject.evlt.uma.es/debian-opengnsys/opengnsys-devel" RELEASES_URL = "https://ognproject.evlt.uma.es/debian-opengnsys/versions-dev.json" APT_LIST_PATH = "/etc/apt/sources.list.d/opengnsys.list" PACKAGES = ["ogrepository", "ogcore", "oggui", "ogclient", "ogboot", "ogdhcp"] RELEASE_FILE = "/opt/opengnsys/release" # === Sección npyscreen === class ServerURLForm(npyscreen.Form): def create(self): self.server_url = self.add(npyscreen.TitleText, name="Servidor de validación (URL completa):", value="http://localhost:5000/validar") def afterEditing(self): self.parentApp.server_url = self.server_url.value self.parentApp.setNextForm("RELEASE") class ReleaseSelectorForm(npyscreen.ActionForm): def create(self): self.releases = self.parentApp.releases self.listbox = self.add(npyscreen.TitleSelectOne, name="Releases disponibles", values=self.releases, scroll_exit=True, max_height=len(self.releases)+4) def on_ok(self): selected_index = self.listbox.value[0] if self.listbox.value else None if selected_index is None: npyscreen.notify_confirm("Debes seleccionar una release antes de continuar.", title="Error") else: self.parentApp.selected = self.releases[selected_index] self.parentApp.setNextForm(None) def on_cancel(self): npyscreen.notify_confirm("Operación cancelada. Saliendo del formulario.", title="Cancelado") self.parentApp.setNextForm(None) class ReleaseSelectorApp(npyscreen.NPSAppManaged): def __init__(self, releases): self.releases = releases self.selected = None self.server_url = None super().__init__() def onStart(self): self.addForm("MAIN", ServerURLForm, name="Configuración inicial") self.addForm("RELEASE", ReleaseSelectorForm, name="Selecciona una release", releases=self.releases) def choose_release_and_server(releases): app = ReleaseSelectorApp(releases) app.run() return app.selected, app.server_url # === Funciones principales === def backup_file(filepath): """Crea una copia de seguridad del archivo especificado.""" backup_path = f"{filepath}.bak" if os.path.exists(filepath): try: os.replace(filepath, backup_path) print(f"[INFO] Copia de seguridad creada: {backup_path}") except Exception as e: print(f"[ERROR] No se pudo crear la copia de seguridad de {filepath}: {e}") return backup_path def restore_file(backup_path, original_path): """Restaura el archivo desde su copia de seguridad.""" if os.path.exists(backup_path): try: os.replace(backup_path, original_path) print(f"[INFO] Archivo restaurado: {original_path}") except Exception as e: print(f"[ERROR] No se pudo restaurar el archivo {original_path}: {e}") def get_installed_packages(): installed = [] for pkg in PACKAGES: try: subprocess.run( ["dpkg-query", "-W", pkg], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, text=True, check=True ) installed.append(pkg) except subprocess.CalledProcessError: continue return installed def get_installed_release(): try: with open(RELEASE_FILE, "r") as release_file: line = release_file.readline().strip() match = re.search(r".*:\s*(.+)", line) if match: return match.group(1).strip() except FileNotFoundError: print("El archivo de release no existe.") except Exception as e: print(f"Error al leer el archivo de release: {e}") return None def fetch_available_releases(): try: response = requests.get(RELEASES_URL) response.raise_for_status() data = response.json() return data.get("versions", []) except requests.RequestException as e: print(f"[ERROR] No se pudo obtener la lista de releases: {e}") sys.exit(1) def update_repo_file(selected_release): backup_path = backup_file(APT_LIST_PATH) line = f"deb {REPO_BASE_URL}/{selected_release} noble main\n" print(f"[INFO] Escribiendo nueva línea en {APT_LIST_PATH}:\n{line.strip()}") try: with open(APT_LIST_PATH, "w") as f: f.write(line) except PermissionError: print("[ERROR] No tienes permisos para escribir en el archivo del repositorio. Ejecuta el script como root.") restore_file(backup_path, APT_LIST_PATH) sys.exit(1) # Ejecutar apt update para actualizar la información del repositorio try: print("[INFO] Actualizando la información del repositorio con 'apt update'...") subprocess.run(["sudo", "apt", "update"], check=True) except subprocess.CalledProcessError as e: print(f"[ERROR] Error al ejecutar 'apt update': {e}") restore_file(backup_path, APT_LIST_PATH) sys.exit(1) def check_compatibility(server_url, installed_release, selected_release): payload = { "installed_release": installed_release, "target_release": selected_release } try: response = requests.post(server_url, json=payload, timeout=5) response.raise_for_status() result = response.json() return result.get("compatible", False), result.get("message", "") except requests.RequestException as e: print(f"[ERROR] No se pudo contactar con el servidor de validación: {e}") return False, str(e) def summarize_updates(installed_packages, selected_release): """Genera un resumen de los paquetes que se van a actualizar y los que no.""" to_update = [] up_to_date = [] for pkg in installed_packages: try: result = subprocess.run( ["apt-cache", "policy", pkg], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, text=True, check=True, env={"LANG": "C"} # Forzar el idioma a inglés ) installed = None candidate = None for line in result.stdout.splitlines(): if "Installed:" in line: # Siempre estará en inglés installed = line.split(":", 1)[1].strip() elif "Candidate:" in line: # Siempre estará en inglés candidate = line.split(":", 1)[1].strip() if not installed or candidate == "(none)": to_update.append(f"{pkg} (no instalado o sin versión candidata)") elif installed != candidate: to_update.append(f"{pkg} ({installed} → {candidate})") else: up_to_date.append(f"{pkg} ({installed})") except subprocess.CalledProcessError: to_update.append(f"{pkg} (error obteniendo versión)") summary = "\n--- Resumen de actualización ---\n" summary += f"Release objetivo: {selected_release}\n\n" summary += "Paquetes que se actualizarán:\n" summary += "\n".join(f" - {line}" for line in to_update) if to_update else " - Ninguno\n" summary += "\nPaquetes que ya están actualizados:\n" summary += "\n".join(f" - {line}" for line in up_to_date) if up_to_date else " - Ninguno\n" summary += "\n--------------------------------" # Mostrar el resumen en una ventana emergente npyscreen.notify_confirm(summary, title="Resumen de actualización", wide=True) if not to_update: npyscreen.notify_confirm("[INFO] Todos los paquetes están actualizados. No es necesario continuar.", title="Información") sys.exit(0) if not npyscreen.notify_yes_no("¿Deseas continuar con la actualización?", title="Confirmación"): npyscreen.notify_confirm("[INFO] Actualización cancelada por el usuario.", title="Cancelado") restore_file(f"{APT_LIST_PATH}.bak", APT_LIST_PATH) restore_file(f"{RELEASE_FILE}.bak", RELEASE_FILE) sys.exit(0) return [line.split()[0] for line in to_update] def show_final_versions(packages): print("\n✅ Resumen final de versiones instaladas:") for pkg in packages: try: result = subprocess.run( ["dpkg-query", "-W", "-f=${Version}", pkg], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, text=True ) version = result.stdout.strip() print(f" - {pkg}: {version}") except subprocess.CalledProcessError: print(f" - {pkg}: no instalado") def update_and_install(packages_to_update, selected_release): backup_release = backup_file(RELEASE_FILE) try: subprocess.run(["sudo", "apt", "update"], check=True) subprocess.run(["sudo", "apt", "install", "-y"] + packages_to_update, check=True) print("[INFO] Paquetes actualizados correctamente.") # Actualizar el archivo de release con la versión seleccionada try: os.makedirs(os.path.dirname(RELEASE_FILE), exist_ok=True) with open(RELEASE_FILE, "w") as release_file: release_file.write(f"Versión instalada: {selected_release}\n") print(f"[INFO] Archivo de release actualizado: {selected_release}") except Exception as e: print(f"[ERROR] No se pudo actualizar el archivo de release: {e}") show_final_versions(packages_to_update) except subprocess.CalledProcessError as e: print(f"[ERROR] Error al instalar paquetes: {e}") restore_file(backup_release, RELEASE_FILE) sys.exit(1) # === Entrada principal === def main(): print("[INFO] Iniciando actualización de paquetes de OpenGnSys...") installed_release = get_installed_release() if installed_release: print(f"[INFO] Versión instalada: {installed_release}") else: print("[WARN] No se encontró la versión instalada.") sys.exit(1) installed = get_installed_packages() if not installed: print("[ERROR] No se detectaron paquetes OpenGnSys instalados.") sys.exit(1) releases = fetch_available_releases() selected, server_url = choose_release_and_server(releases) if not selected or not server_url: print("[WARN] No se seleccionó release o URL del servidor. Restaurando archivos.") restore_file(f"{APT_LIST_PATH}.bak", APT_LIST_PATH) restore_file(f"{RELEASE_FILE}.bak", RELEASE_FILE) sys.exit(0) print(f"[INFO] Validando compatibilidad con {server_url}...") compatible, message = check_compatibility(server_url, installed_release, selected) if not compatible: print(f"[ERROR] El servidor indica que la actualización no es compatible: {message}") restore_file(f"{APT_LIST_PATH}.bak", APT_LIST_PATH) restore_file(f"{RELEASE_FILE}.bak", RELEASE_FILE) sys.exit(1) else: print(f"[INFO] Compatibilidad validada: {message}") try: update_repo_file(selected) to_update = summarize_updates(installed, selected) update_and_install(to_update, selected) except Exception as e: print(f"[ERROR] Error durante la actualización: {e}") restore_file(f"{APT_LIST_PATH}.bak", APT_LIST_PATH) restore_file(f"{RELEASE_FILE}.bak", RELEASE_FILE) sys.exit(1) if __name__ == "__main__": try: main() except SystemExit as e: # Manejar la excepción SystemExit para evitar interrupciones if e.code != 0: print(f"[INFO] El script terminó con código de salida: {e.code}") except Exception as e: print(f"[ERROR] Ocurrió un error inesperado: {e}") finally: # Restaurar el terminal al estado normal npyscreen.wrapper_basic(lambda stdscr: None)