ogclone-engine/client/lib/engine/bin/NetLib.py

316 lines
9.5 KiB
Python

#/**
#@file NetLib.py
#@brief Librería o clase Net
#@class Net
#@brief Funciones básicas de red.
#@warning License: GNU GPLv3+
#*/
import subprocess
import sys
import os
import json
import re
import ogGlobals
import SystemLib
import FileLib
def _ogConnect_options():
## the original bash code does: eval $(grep "OPTIONS=" /scripts/ogfunctions)
## this is highly insecure but we need to keep it for now
opt = subprocess.run (['grep', '-o', 'OPTIONS=.*', '/scripts/ogfunctions'], capture_output=True, text=True).stdout.strip()
exec (opt, globals())
return OPTIONS
## defined in /scripts/ogfunctions. We can't tackle that yet.
## this is a quick implementation just to unblock development
def _ogConnect (server, protocol, src, dst, options, readonly):
if 'smb' != protocol: return None ## only supported value right now
write_option = ',ro' if readonly else ',rw'
options += write_option
return not subprocess.run (['mount.cifs', f'//{server}/{src}', dst] + options.split()).returncode
#/**
# ogChangeRepo IPREPO [ OgUnit ]
#@brief Cambia el repositorio para el recurso remoto images.
#@param 1 Ip Repositorio
#@param 2 Abreviatura Unidad Organizativa
#@return Cambio recurso remoto en OGIMG.
#*/
def ogChangeRepo(ip_repo, og_unit=None):
ogprotocol = os.environ['ogprotocol'] or 'smb'
if og_unit:
SystemLib.ogRaiseError ([], ogGlobals.OG_ERR_GENERIC, 'the og_unit parameter became unsupported')
return None
try:
mount = subprocess.run (['mount'], capture_output=True, text=True).stdout
ro = bool (list (filter (lambda line: re.search (r'ogimages.*\bro,', line), mount.splitlines())))
current_repo = ogGetRepoIp()
new_repo = current_repo if ip_repo.upper() == "REPO" else ip_repo
if new_repo == current_repo: return True
subprocess.run(["umount", ogGlobals.OGIMG], check=True)
SystemLib.ogEcho (['session', 'log'], None, f'{ogGlobals.lang.MSG_HELP_ogChangeRepo} {new_repo}')
options = _ogConnect_options()
if not _ogConnect (new_repo, ogprotocol, 'ogimages', ogGlobals.OGIMG, options, ro):
_ogConnect (current_repo, ogprotocol, 'ogimages', ogGlobals.OGIMG, options, ro)
SystemLib.ogRaiseError(
"session",
ogGlobals.OG_ERR_REPO,
f"Error connecting to the new repository: {new_repo}",
)
return False
SystemLib.ogEcho(
["session", "log"],
None,
f"Repository successfully changed to {new_repo}".strip(),
)
return True
except Exception as e:
SystemLib.ogRaiseError(
"session",
ogGlobals.OG_ERR_GENERIC,
f"Error executing ogChangeRepo: {e}",
)
return None
#/**
# ogGetGroupDir [ str_repo ]
#@brief Devuelve el camino del directorio para el grupo del cliente.
#@param str_repo repositorio de imágenes (opcional)
#@return path_dir - Camino al directorio del grupo.
#@note repo = { REPO, CACHE } REPO por defecto
#@exception OG_ERR_FORMAT formato incorrecto.
#*/
def ogGetGroupDir(repo=None):
try:
repo = repo or "REPO"
group = ogGetGroupName()
if group:
dir_path = FileLib.ogGetPath(repo, f"/groups/{group}")
if dir_path and os.path.isdir(dir_path):
return dir_path
return None
except Exception as e:
SystemLib.ogRaiseError(
"session",
ogGlobals.OG_ERR_FORMAT,
f"Error in ogGetGroupDir: {e}"
)
return None
#/**
# ogGetGroupName
#@brief Devuelve el nombre del grupo al que pertenece el cliente.
#@return str_group - Nombre de grupo.
#*/
def ogGetGroupName():
try:
group = globals().get("group", None)
return group if group else None
except Exception as e:
print(f"Error in ogGetGroupName: {e}")
return None
#/**
# ogGetHostname
#@brief Muestra el nombre del cliente.
#@return str_host - nombre de máquina
#*/ ##
def ogGetHostname():
# Tomar nombre de la variable HOSTNAME
host = os.getenv("HOSTNAME", "").strip()
if host: return host
# Si no, tomar del DHCP, opción host-name /* (comentario para Doxygen)
dhcp_file = "/var/lib/dhcp3/dhclient.leases"
if os.path.exists(dhcp_file):
with open(dhcp_file, "r") as f:
for line in f:
if 'option host-name' in line:
return line.split('"')[1].strip(";")
# Si no, leer el parámetro del kernel hostname (comentario para Doxygen) */
cmdline_file = "/proc/cmdline"
if os.path.exists(cmdline_file):
with open(cmdline_file, "r") as f:
for entry in f.read().split():
if entry.startswith("hostname="):
return entry.split("=")[1].strip()
#/**
# ogGetIpAddress
#@brief Muestra la dirección IP del sistema
#@return str_ip - Dirección IP
#@note Usa las variables utilizadas por el initrd "/etc/net-ethX.conf
#*/ ##
def ogGetIpAddress():
if "IPV4ADDR" in os.environ:
ip = os.environ["IPV4ADDR"]
if '/' in ip: ip = ip.split ('/')[0]
return ip
extra_args = []
if "DEVICE" in os.environ:
extra_args = [ "dev", os.environ["DEVICE"] ]
ipas = subprocess.run (['ip', '-json', 'address', 'show', 'up'] + extra_args, capture_output=True, text=True).stdout
ipasj = json.loads (ipas)
addresses = []
for e in ipasj:
if 'lo' == e['ifname']: continue
if 'addr_info' not in e: continue
addrs = e['addr_info']
for a in addrs:
if 'inet' != a['family']: continue
addresses.append ({ 'local': a['local'], 'prefixlen': a['prefixlen'] })
if 1 != len (addresses):
raise Exception ('more than one local IP address found')
return addresses[0]
#/**
# ogGetMacAddress
#@brief Muestra la dirección Ethernet del cliente.
#@return str_ether - Dirección Ethernet
#*/ ##
def ogGetMacAddress():
try:
if "DEVICE" in os.environ:
device = os.environ["DEVICE"]
result = subprocess.run(
["ip", "-o", "link", "show", "up", "dev", device],
capture_output=True,
text=True,
check=True
).stdout
else:
result = subprocess.run(
["ip", "-o", "link", "show", "up"],
capture_output=True,
text=True,
check=True
).stdout
mac_addresses = []
for line in result.splitlines():
if "link/ether" in line:
parts = line.split()
for i, part in enumerate(parts):
if part == "link/ether":
mac_addresses.append(parts[i + 1].upper())
if mac_addresses:
print (f'nati: ogGetMacAddress: {mac_addresses[0]}')
return mac_addresses[0]
else:
print("No active mac address found")
return None
except subprocess.CalledProcessError as e:
print(f"Error executing ip command: {e.stderr}")
return None
except Exception as e:
print(f"Unexpected error: {str(e)}")
return None
#/**
# ogGetNetInterface
#@brief Muestra la interfaz de red del sistema
#@return str_interface - interfaz de red
#@note Usa las variables utilizadas por el initrd "/etc/net-ethX.conf
#*/ ##
def ogGetNetInterface():
if len(sys.argv) >= 2 and sys.argv[1] == "help":
SystemLib.ogHelp("ogGetNetInterface", "ogGetNetInterface", "ogGetNetInterface => eth0")
return
if "DEVICE" in os.environ:
print(os.environ["DEVICE"])
return 0
#/**
# ogGetRepoIp
#@brief Muestra la dirección IP del repositorio de datos.
#@return str_ip - Dirección IP
#*/ ##
def ogGetRepoIp():
out = subprocess.run (["findmnt", "--json", "--output", "SOURCE,FSTYPE", ogGlobals.OGIMG], capture_output=True, text=True).stdout
try:
j = json.loads (out)
except json.decoder.JSONDecodeError:
return None
if 'filesystems' not in j: return None
source = j['filesystems'][0]['source']
fstype = j['filesystems'][0]['fstype']
if 'nfs' == fstype: return source.split(":")[0]
elif 'cifs' == fstype: return source.split("/")[2]
return None
#/**
# ogGetServerIp
#@brief Muestra la dirección IP del Servidor de OpenGnSys.
#@return str_ip - Dirección IP
#@note Comprobacion segun protocolo de conexion al Repo
#*/ ##
def ogGetServerIp():
return os.environ['ogcore']
#/**
# ogGetServerPort
#@brief Muestra el puerto del Servidor de OpenGnSys.
#@return str_port - Puerto
#*/ ##
def ogGetServerPort():
if 'ogcore_port' in os.environ:
return os.environ['ogcore_port']
else:
return '8443'
#/**
# ogMakeGroupDir [ str_repo ]
#@brief Crea el directorio para el grupo del cliente.
#@param str_repo repositorio de imágenes (opcional)
#@return (nada)
#@note repo = { REPO, CACHE } REPO por defecto
#@exception OG_ERR_FORMAT formato incorrecto.
#*/
def ogMakeGroupDir():
REPO = ""
DIR = ""
GROUP = ""
if len(sys.argv) < 2:
SystemLib.ogHelp("ogMakeGroupDir", "ogMakeGroupDir str_repo", "ogMakeGroupDir", "ogMakeGroupDir REPO")
return
if len(sys.argv) == 1:
REPO = "REPO"
else:
REPO = sys.argv[1]
DIR = FileLib.ogGetPath(REPO, "/groups/" + ogGetGroupName(), stderr=subprocess.DEVNULL)
if DIR:
subprocess.run(["mkdir", "-p", DIR], stderr=subprocess.DEVNULL)
return 0