340 lines
11 KiB
Python
340 lines
11 KiB
Python
import subprocess
|
|
import sys
|
|
import os
|
|
import json
|
|
import re
|
|
|
|
import ogGlobals
|
|
import SystemLib
|
|
import FileLib
|
|
|
|
def _ogConnect_options():
|
|
## the original bash code does: eval $(grep "OPTIONS=" /scripts/ogfunctions)
|
|
## this is highly insecure but we need to keep it for now
|
|
opt = subprocess.run (['grep', '-o', 'OPTIONS=.*', '/scripts/ogfunctions'], capture_output=True, text=True).stdout.strip()
|
|
exec (opt, globals())
|
|
return OPTIONS
|
|
|
|
## defined in /scripts/ogfunctions. We can't tackle that yet.
|
|
## this is a quick implementation just to unblock development
|
|
def _ogConnect (server, protocol, src, dst, options, readonly):
|
|
if 'smb' != protocol: return None ## only supported value right now
|
|
write_option = ',ro' if readonly else ',rw'
|
|
options += write_option
|
|
return not subprocess.run (['mount.cifs', f'//{server}/{src}', dst] + options.split()).returncode
|
|
|
|
|
|
#/**
|
|
# ogChangeRepo IPREPO [ OgUnit ]
|
|
#@brief Cambia el repositorio para el recurso remoto images.
|
|
#@param 1 Ip Repositorio
|
|
#@param 2 Abreviatura Unidad Organizativa
|
|
#@return Cambio recurso remoto en OGIMG.
|
|
#*/
|
|
def ogChangeRepo(ip_repo, og_unit=None):
|
|
ogprotocol = os.environ['ogprotocol'] or 'smb'
|
|
|
|
if og_unit:
|
|
SystemLib.ogRaiseError ([], ogGlobals.OG_ERR_GENERIC, 'the og_unit parameter became unsupported')
|
|
return None
|
|
|
|
try:
|
|
mount = subprocess.run (['mount'], capture_output=True, text=True).stdout
|
|
ro = not not list (filter (lambda line: re.search (r'ogimages.*\bro,', line), mount.splitlines()))
|
|
|
|
current_repo = ogGetRepoIp()
|
|
new_repo = current_repo if ip_repo.upper() == "REPO" else ip_repo
|
|
if new_repo == current_repo: return True
|
|
|
|
subprocess.run(["umount", ogGlobals.OGIMG], check=True)
|
|
|
|
SystemLib.ogEcho (['session', 'log'], 'info', f'{ogGlobals.lang.MSG_HELP_ogChangeRepo} {new_repo}')
|
|
options = _ogConnect_options()
|
|
if not _ogConnect (new_repo, ogprotocol, 'ogimages', ogGlobals.OGIMG, options, ro):
|
|
_ogConnect (current_repo, ogprotocol, 'ogimages', ogGlobals.OGIMG, options, ro)
|
|
SystemLib.ogRaiseError(
|
|
"session",
|
|
ogGlobals.OG_ERR_REPO,
|
|
f"Error connecting to the new repository: {new_repo}",
|
|
)
|
|
return False
|
|
|
|
SystemLib.ogEcho(
|
|
["session", "log"],
|
|
'info',
|
|
f"Repository successfully changed to {new_repo}".strip(),
|
|
)
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
SystemLib.ogRaiseError(
|
|
"session",
|
|
ogGlobals.OG_ERR_GENERIC,
|
|
f"Error executing ogChangeRepo: {e}",
|
|
)
|
|
return None
|
|
|
|
#/**
|
|
# ogGetGroupDir [ str_repo ]
|
|
#@brief Devuelve el camino del directorio para el grupo del cliente.
|
|
#@param str_repo repositorio de imágenes (opcional)
|
|
#@return path_dir - Camino al directorio del grupo.
|
|
#@note repo = { REPO, CACHE } REPO por defecto
|
|
#@exception OG_ERR_FORMAT formato incorrecto.
|
|
#*/
|
|
def ogGetGroupDir(repo=None):
|
|
try:
|
|
repo = repo or "REPO"
|
|
|
|
group = ogGetGroupName()
|
|
if group:
|
|
dir_path = FileLib.ogGetPath(repo, f"/groups/{group}")
|
|
if dir_path and os.path.isdir(dir_path):
|
|
return dir_path
|
|
|
|
return None
|
|
|
|
except Exception as e:
|
|
SystemLib.ogRaiseError(
|
|
"session",
|
|
ogGlobals.OG_ERR_FORMAT,
|
|
f"Error in ogGetGroupDir: {e}"
|
|
)
|
|
return None
|
|
|
|
#/**
|
|
# ogGetGroupName
|
|
#@brief Devuelve el nombre del grupo al que pertenece el cliente.
|
|
#@return str_group - Nombre de grupo.
|
|
#*/
|
|
def ogGetGroupName():
|
|
try:
|
|
group = globals().get("group", None)
|
|
return group if group else None
|
|
except Exception as e:
|
|
print(f"Error in ogGetGroupName: {e}")
|
|
return None
|
|
|
|
#/**
|
|
# ogGetHostname
|
|
#@brief Muestra el nombre del cliente.
|
|
#@return str_host - nombre de máquina
|
|
#*/ ##
|
|
def ogGetHostname():
|
|
try:
|
|
# 1. Get hostname from the HOSTNAME environment variable
|
|
host = os.getenv("HOSTNAME", "").strip()
|
|
|
|
# 2. If not set, read from the DHCP leases file
|
|
if not host:
|
|
dhcp_file = "/var/lib/dhcp3/dhclient.leases"
|
|
if os.path.exists(dhcp_file):
|
|
with open(dhcp_file, "r") as f:
|
|
for line in f:
|
|
if 'option host-name' in line:
|
|
host = line.split('"')[1].strip(";")
|
|
break
|
|
|
|
# 3. If still not set, read from kernel parameters in /proc/cmdline
|
|
if not host:
|
|
cmdline_file = "/proc/cmdline"
|
|
if os.path.exists(cmdline_file):
|
|
with open(cmdline_file, "r") as f:
|
|
for entry in f.read().split():
|
|
if entry.startswith("hostname="):
|
|
host = entry.split("=")[1].strip()
|
|
break
|
|
|
|
# 4. Update HOSTNAME environment variable if it differs
|
|
current_hostname = os.getenv("HOSTNAME", "")
|
|
if host and current_hostname != host:
|
|
os.environ["HOSTNAME"] = host
|
|
|
|
return host if host else None
|
|
|
|
except Exception as e:
|
|
print(f"Error in ogGetHostname: {e}")
|
|
return None
|
|
|
|
#/**
|
|
# ogGetIpAddress
|
|
#@brief Muestra la dirección IP del sistema
|
|
#@return str_ip - Dirección IP
|
|
#@note Usa las variables utilizadas por el initrd "/etc/net-ethX.conf
|
|
#*/ ##
|
|
def ogGetIpAddress():
|
|
IP = ""
|
|
|
|
if len(sys.argv) >= 2 and sys.argv[1] == "help":
|
|
SystemLib.ogHelp("ogGetIpAddress", "ogGetIpAddress", "ogGetIpAddress => 192.168.0.10")
|
|
return
|
|
|
|
if "IPV4ADDR" in os.environ:
|
|
IP = os.environ["IPV4ADDR"]
|
|
else:
|
|
# Obtener direcciones IP.
|
|
if "DEVICE" in os.environ:
|
|
IP = subprocess.run(["ip", "-o", "address", "show", "up", "dev", os.environ["DEVICE"]], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL).stdout.decode().split()
|
|
else:
|
|
IP = subprocess.run(["ip", "-o", "address", "show", "up"], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL).stdout.decode().split()
|
|
|
|
IP = [addr.split("/")[0] for addr in IP if "inet" in addr]
|
|
|
|
# Mostrar solo la primera.
|
|
if IP:
|
|
print(IP[0])
|
|
|
|
return 0
|
|
|
|
#/**
|
|
# ogGetMacAddress
|
|
#@brief Muestra la dirección Ethernet del cliente.
|
|
#@return str_ether - Dirección Ethernet
|
|
#*/ ##
|
|
def ogGetMacAddress():
|
|
try:
|
|
if "DEVICE" in os.environ:
|
|
device = os.environ["DEVICE"]
|
|
result = subprocess.run(
|
|
["ip", "-o", "link", "show", "up", "dev", device],
|
|
capture_output=True,
|
|
text=True,
|
|
check=True
|
|
).stdout
|
|
else:
|
|
result = subprocess.run(
|
|
["ip", "-o", "link", "show", "up"],
|
|
capture_output=True,
|
|
text=True,
|
|
check=True
|
|
).stdout
|
|
|
|
mac_addresses = []
|
|
for line in result.splitlines():
|
|
if "link/ether" in line:
|
|
parts = line.split()
|
|
for i, part in enumerate(parts):
|
|
if part == "link/ether":
|
|
mac_addresses.append(parts[i + 1].upper())
|
|
|
|
if mac_addresses:
|
|
print(mac_addresses[0])
|
|
return mac_addresses[0]
|
|
else:
|
|
print("No active mac address found")
|
|
return None
|
|
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"Error executing ip command: {e.stderr}")
|
|
return None
|
|
except Exception as e:
|
|
print(f"Unexpected error: {str(e)}")
|
|
return None
|
|
|
|
#/**
|
|
# ogGetNetInterface
|
|
#@brief Muestra la interfaz de red del sistema
|
|
#@return str_interface - interfaz de red
|
|
#@note Usa las variables utilizadas por el initrd "/etc/net-ethX.conf
|
|
#*/ ##
|
|
def ogGetNetInterface():
|
|
if len(sys.argv) >= 2 and sys.argv[1] == "help":
|
|
SystemLib.ogHelp("ogGetNetInterface", "ogGetNetInterface", "ogGetNetInterface => eth0")
|
|
return
|
|
|
|
if "DEVICE" in os.environ:
|
|
print(os.environ["DEVICE"])
|
|
|
|
return 0
|
|
|
|
#/**
|
|
# ogGetRepoIp
|
|
#@brief Muestra la dirección IP del repositorio de datos.
|
|
#@return str_ip - Dirección IP
|
|
#*/ ##
|
|
def ogGetRepoIp():
|
|
out = subprocess.run (["findmnt", "--json", "--output", "SOURCE,FSTYPE", ogGlobals.OGIMG], capture_output=True, text=True).stdout
|
|
try:
|
|
j = json.loads (out)
|
|
except json.decoder.JSONDecodeError:
|
|
return None
|
|
|
|
if 'filesystems' not in j: return None
|
|
source = j['filesystems'][0]['source']
|
|
fstype = j['filesystems'][0]['fstype']
|
|
|
|
if 'nfs' == fstype: return source.split(":")[0]
|
|
elif 'cifs' == fstype: return source.split("/")[2]
|
|
|
|
return None
|
|
|
|
#/**
|
|
# ogGetServerIp
|
|
#@brief Muestra la dirección IP del Servidor de OpenGnSys.
|
|
#@return str_ip - Dirección IP
|
|
#@note Comprobacion segun protocolo de conexion al Repo
|
|
#*/ ##
|
|
def ogGetServerIp():
|
|
try:
|
|
output = subprocess.run(
|
|
["findmnt", "--json", "--output", "SOURCE,FSTYPE", ogGlobals.OGIMG],
|
|
capture_output=True,
|
|
text=True,
|
|
check=True
|
|
).stdout
|
|
except subprocess.CalledProcessError as e:
|
|
SystemLib.ogEcho("session", "error", f"Error to run findmnt: {e.stderr}")
|
|
return None
|
|
|
|
try:
|
|
mounts = json.loads(output)
|
|
except json.decoder.JSONDecodeError:
|
|
SystemLib.ogEcho("session", "error", "Error to decode JSON de findmnt.")
|
|
return None
|
|
|
|
if 'filesystems' not in mounts or not isinstance(mounts['filesystems'], list):
|
|
SystemLib.ogEcho("session", "error", "'filesystems' is not present o not valid in JSON.")
|
|
return None
|
|
|
|
for fs in mounts['filesystems']:
|
|
if 'source' in fs and 'fstype' in fs:
|
|
source = fs['source']
|
|
fstype = fs['fstype']
|
|
|
|
if fstype == "nfs":
|
|
return source.split(":")[0]
|
|
elif fstype == "cifs":
|
|
return source.split("/")[2]
|
|
|
|
SystemLib.ogEcho("session", "info", "No valid file system found")
|
|
return None
|
|
|
|
#/**
|
|
# ogMakeGroupDir [ str_repo ]
|
|
#@brief Crea el directorio para el grupo del cliente.
|
|
#@param str_repo repositorio de imágenes (opcional)
|
|
#@return (nada)
|
|
#@note repo = { REPO, CACHE } REPO por defecto
|
|
#@exception OG_ERR_FORMAT formato incorrecto.
|
|
#*/
|
|
def ogMakeGroupDir():
|
|
REPO = ""
|
|
DIR = ""
|
|
GROUP = ""
|
|
|
|
if len(sys.argv) < 2:
|
|
SystemLib.ogHelp("ogMakeGroupDir", "ogMakeGroupDir str_repo", "ogMakeGroupDir", "ogMakeGroupDir REPO")
|
|
return
|
|
|
|
if len(sys.argv) == 1:
|
|
REPO = "REPO"
|
|
else:
|
|
REPO = sys.argv[1]
|
|
|
|
DIR = FileLib.ogGetPath(REPO, "/groups/" + ogGetGroupName(), stderr=subprocess.DEVNULL)
|
|
if DIR:
|
|
subprocess.run(["mkdir", "-p", DIR], stderr=subprocess.DEVNULL)
|
|
|
|
return 0
|