#/** #@file NetLib.py #@brief Librería o clase Net #@class Net #@brief Funciones básicas de red. #@warning License: GNU GPLv3+ #*/ import subprocess import sys import os import json import re import ogGlobals import SystemLib import FileLib def _ogConnect_options(): ## the original bash code does: eval $(grep "OPTIONS=" /scripts/ogfunctions) ## this is highly insecure but we need to keep it for now opt = subprocess.run (['grep', '-o', 'OPTIONS=.*', '/scripts/ogfunctions'], capture_output=True, text=True).stdout.strip() exec (opt, globals()) return OPTIONS ## defined in /scripts/ogfunctions. We can't tackle that yet. ## this is a quick implementation just to unblock development def _ogConnect (server, protocol, src, dst, options, readonly): if 'smb' != protocol: return None ## only supported value right now write_option = ',ro' if readonly else ',rw' options += write_option return not subprocess.run (['mount.cifs', f'//{server}/{src}', dst] + options.split()).returncode #/** # ogChangeRepo IPREPO [ OgUnit ] #@brief Cambia el repositorio para el recurso remoto images. #@param 1 Ip Repositorio #@param 2 Abreviatura Unidad Organizativa #@return Cambio recurso remoto en OGIMG. #*/ def ogChangeRepo (ip_repo): ogprotocol = os.environ.get ('ogprotocol', 'smb') try: mount = subprocess.run (['mount'], capture_output=True, text=True).stdout ro = bool (list (filter (lambda line: re.search (r'ogimages.*\bro,', line), mount.splitlines()))) current_repo = ogGetRepoIp() new_repo = current_repo if ip_repo.upper() == 'REPO' else ip_repo if new_repo == current_repo: return True subprocess.run (['umount', ogGlobals.OGIMG], check=True) SystemLib.ogEcho (['session', 'log'], None, f'{ogGlobals.lang.MSG_HELP_ogChangeRepo} {new_repo}') options = _ogConnect_options() if not _ogConnect (new_repo, ogprotocol, 'ogimages', ogGlobals.OGIMG, options, ro): _ogConnect (current_repo, ogprotocol, 'ogimages', ogGlobals.OGIMG, options, ro) SystemLib.ogRaiseError ('session', ogGlobals.OG_ERR_REPO, f'Error connecting to the new repository: {new_repo}') return False SystemLib.ogEcho (['session', 'log'], None, f'Repository successfully changed to {new_repo}'.strip()) return True except Exception as e: SystemLib.ogRaiseError ('session', ogGlobals.OG_ERR_GENERIC, f'Error executing ogChangeRepo: {e}') return None #/** # ogGetGroupDir [ str_repo ] #@brief Devuelve el camino del directorio para el grupo del cliente. #@param str_repo repositorio de imágenes (opcional) #@return path_dir - Camino al directorio del grupo. #@note repo = { REPO, CACHE } REPO por defecto #@exception OG_ERR_FORMAT formato incorrecto. #*/ def ogGetGroupDir(repo=None): try: repo = repo or "REPO" group = ogGetGroupName() if group: dir_path = FileLib.ogGetPath(repo, f"/groups/{group}") if dir_path and os.path.isdir(dir_path): return dir_path return None except Exception as e: SystemLib.ogRaiseError( "session", ogGlobals.OG_ERR_FORMAT, f"Error in ogGetGroupDir: {e}" ) return None #/** # ogGetGroupName #@brief Devuelve el nombre del grupo al que pertenece el cliente. #@return str_group - Nombre de grupo. #*/ def ogGetGroupName(): try: group = globals().get("group", None) return group if group else None except Exception as e: print(f"Error in ogGetGroupName: {e}") return None #/** # ogGetHostname #@brief Muestra el nombre del cliente. #@return str_host - nombre de máquina #*/ ## def ogGetHostname(): # Tomar nombre de la variable HOSTNAME host = os.getenv("HOSTNAME", "").strip() if host: return host # Si no, tomar del DHCP, opción host-name /* (comentario para Doxygen) dhcp_file = "/var/lib/dhcp3/dhclient.leases" if os.path.exists(dhcp_file): with open(dhcp_file, "r") as f: for line in f: if 'option host-name' in line: return line.split('"')[1].strip(";") # Si no, leer el parámetro del kernel hostname (comentario para Doxygen) */ cmdline_file = "/proc/cmdline" if os.path.exists(cmdline_file): with open(cmdline_file, "r") as f: for entry in f.read().split(): if entry.startswith("hostname="): return entry.split("=")[1].strip() #/** # ogGetIpAddress #@brief Muestra la dirección IP del sistema #@return str_ip - Dirección IP #@note Usa las variables utilizadas por el initrd "/etc/net-ethX.conf #*/ ## def ogGetIpAddress(): return ogGlobals.ogGetIpAddress() #/** # ogGetMacAddress #@brief Muestra la dirección Ethernet del cliente. #@return str_ether - Dirección Ethernet #*/ ## def ogGetMacAddress(): try: if "DEVICE" in os.environ: device = os.environ["DEVICE"] result = subprocess.run( ["ip", "-o", "link", "show", "up", "dev", device], capture_output=True, text=True, check=True ).stdout else: result = subprocess.run( ["ip", "-o", "link", "show", "up"], capture_output=True, text=True, check=True ).stdout mac_addresses = [] for line in result.splitlines(): if "link/ether" in line: parts = line.split() for i, part in enumerate(parts): if part == "link/ether": mac_addresses.append(parts[i + 1].upper()) if mac_addresses: return mac_addresses[0] else: print("No active mac address found") return None except subprocess.CalledProcessError as e: print(f"Error executing ip command: {e.stderr}") return None except Exception as e: print(f"Unexpected error: {str(e)}") return None #/** # ogGetNetInterface #@brief Muestra la interfaz de red del sistema #@return str_interface - interfaz de red #@note Usa las variables utilizadas por el initrd "/etc/net-ethX.conf #*/ ## def ogGetNetInterface(): if len(sys.argv) >= 2 and sys.argv[1] == "help": SystemLib.ogHelp("ogGetNetInterface", "ogGetNetInterface", "ogGetNetInterface => eth0") return if "DEVICE" in os.environ: print(os.environ["DEVICE"]) return 0 #/** # ogGetRepoIp #@brief Muestra la dirección IP del repositorio de datos. #@return str_ip - Dirección IP #*/ ## def ogGetRepoIp(): out = subprocess.run (["findmnt", "--json", "--output", "SOURCE,FSTYPE", ogGlobals.OGIMG], capture_output=True, text=True).stdout try: j = json.loads (out) except json.decoder.JSONDecodeError: return None if 'filesystems' not in j: return None source = j['filesystems'][0]['source'] fstype = j['filesystems'][0]['fstype'] if 'nfs' == fstype: return source.split(":")[0] elif 'cifs' == fstype: return source.split("/")[2] return None #/** # ogGetServerIp #@brief Muestra la dirección IP del Servidor de OpenGnSys. #@return str_ip - Dirección IP #@note Comprobacion segun protocolo de conexion al Repo #*/ ## def ogGetServerIp(): return os.environ['ogcore'] #/** # ogGetServerPort #@brief Muestra el puerto del Servidor de OpenGnSys. #@return str_port - Puerto #*/ ## def ogGetServerPort(): if 'ogcore_port' in os.environ: return os.environ['ogcore_port'] else: return '8443' #/** # ogMakeGroupDir [ str_repo ] #@brief Crea el directorio para el grupo del cliente. #@param str_repo repositorio de imágenes (opcional) #@return (nada) #@note repo = { REPO, CACHE } REPO por defecto #@exception OG_ERR_FORMAT formato incorrecto. #*/ def ogMakeGroupDir(): REPO = "" DIR = "" GROUP = "" if len(sys.argv) < 2: SystemLib.ogHelp("ogMakeGroupDir", "ogMakeGroupDir str_repo", "ogMakeGroupDir", "ogMakeGroupDir REPO") return if len(sys.argv) == 1: REPO = "REPO" else: REPO = sys.argv[1] DIR = FileLib.ogGetPath(REPO, "/groups/" + ogGetGroupName(), stderr=subprocess.DEVNULL) if DIR: subprocess.run(["mkdir", "-p", DIR], stderr=subprocess.DEVNULL) return 0