oginstaller/non_graf_installer/python-installer/ogupdater-v1.py

170 lines
5.7 KiB
Python

#!/usr/bin/env python3
import subprocess
import requests
import json
import sys
import os
import npyscreen
# Configuración general
REPO_BASE_URL = "http://ognproject.evlt.uma.es/debian-opengnsys/opengnsys-devel"
RELEASES_URL = "https://ognproject.evlt.uma.es/debian-opengnsys/versions-dev.json"
APT_LIST_PATH = "/etc/apt/sources.list.d/opengnsys.list"
PACKAGES = ["ogrepository", "ogcore", "oggui", "ogclient", "ogboot", "ogdhcp"]
# === Sección npyscreen ===
class ServerURLForm(npyscreen.Form):
def create(self):
self.server_url = self.add(npyscreen.TitleText, name="Servidor de validación (URL completa):")
def afterEditing(self):
self.parentApp.server_url = self.server_url.value
self.parentApp.setNextForm("RELEASE")
class ReleaseSelectorForm(npyscreen.FormBaseNew):
def create(self):
self.releases = self.parentApp.releases
self.listbox = self.add(npyscreen.TitleSelectOne,
name="Releases disponibles",
values=self.releases,
scroll_exit=True,
max_height=len(self.releases)+4)
def afterEditing(self):
selected_index = self.listbox.value[0] if self.listbox.value else None
self.parentApp.selected = self.releases[selected_index] if selected_index is not None else None
self.parentApp.setNextForm(None)
class ReleaseSelectorApp(npyscreen.NPSAppManaged):
def __init__(self, releases):
self.releases = releases
self.selected = None
self.server_url = None
super().__init__()
def onStart(self):
self.addForm("MAIN", ServerURLForm, name="Configuración inicial")
self.addForm("RELEASE", ReleaseSelectorForm, name="Selecciona una release", releases=self.releases)
def choose_release_and_server(releases):
app = ReleaseSelectorApp(releases)
app.run()
return app.selected, app.server_url
# === Funciones principales ===
def get_installed_packages():
installed = []
for pkg in PACKAGES:
try:
subprocess.run(
["dpkg-query", "-W", "-f=${Status}", pkg],
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
text=True,
check=True
)
installed.append(pkg)
except subprocess.CalledProcessError:
continue
return installed
def get_installed_versions(packages):
versions = {}
for pkg in packages:
try:
result = subprocess.run(
["dpkg-query", "-W", "-f=${Version}", pkg],
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
text=True,
check=True
)
versions[pkg] = result.stdout.strip()
except subprocess.CalledProcessError:
versions[pkg] = None
return versions
def fetch_available_releases():
try:
response = requests.get(RELEASES_URL)
response.raise_for_status()
data = response.json()
return data.get("versions", [])
except requests.RequestException as e:
print(f"[ERROR] No se pudo obtener la lista de releases: {e}")
sys.exit(1)
def update_repo_file(selected_release):
line = f"deb {REPO_BASE_URL}/{selected_release} noble main\n"
print(f"[INFO] Escribiendo nueva línea en {APT_LIST_PATH}:\n{line.strip()}")
try:
with open(APT_LIST_PATH, "w") as f:
f.write(line)
except PermissionError:
print("[ERROR] No tienes permisos para escribir en el archivo del repositorio. Ejecuta el script como root.")
sys.exit(1)
def check_compatibility(server_url, installed_versions, selected_release):
payload = {
"installed_versions": installed_versions,
"target_release": selected_release
}
try:
response = requests.post(server_url, json=payload, timeout=5)
response.raise_for_status()
result = response.json()
return result.get("compatible", False), result.get("message", "")
except requests.RequestException as e:
print(f"[ERROR] No se pudo contactar con el servidor de validación: {e}")
return False, str(e)
def update_and_install(installed_packages):
try:
subprocess.run(["sudo", "apt", "update"], check=True)
subprocess.run(["sudo", "apt", "install", "-y"] + installed_packages, check=True)
print("[INFO] Paquetes actualizados correctamente.")
except subprocess.CalledProcessError as e:
print(f"[ERROR] Error al instalar paquetes: {e}")
sys.exit(1)
# === Entrada principal ===
def main():
print("[INFO] Detectando paquetes instalados...")
installed = get_installed_packages()
if not installed:
print("[WARN] No hay paquetes opengnsys instalados.")
sys.exit(0)
print(f"[INFO] Paquetes detectados: {', '.join(installed)}")
releases = fetch_available_releases()
if not releases:
print("[ERROR] No se encontraron releases disponibles.")
sys.exit(1)
selected, server_url = choose_release_and_server(releases)
if not selected or not server_url:
print("[WARN] No se seleccionó release o URL del servidor. Cancelando.")
sys.exit(0)
print(f"[INFO] Validando compatibilidad con {server_url}...")
installed_versions = get_installed_versions(installed)
compatible, message = check_compatibility(server_url, installed_versions, selected)
if not compatible:
print(f"[ERROR] El servidor indica que la actualización no es compatible: {message}")
sys.exit(1)
else:
print(f"[INFO] Compatibilidad validada: {message}")
update_repo_file(selected)
update_and_install(installed)
if __name__ == "__main__":
main()