ogclone-engine/client/lib/engine/bin/InventoryLib.py

232 lines
8.4 KiB
Python

import platform
import sys
import os
import subprocess
import tempfile
import re
import json
import shutil
import SystemLib
import FileSystemLib
import ogGlobals
MSG_HARDWAREINVENTORY = "Inventario de hardware de la máquina"
def ogGetArch():
if len(sys.argv) > 1 and sys.argv[1] == "help":
SystemLib.ogHelp(sys.argv[0], sys.argv[0], sys.argv[0] + " => x86_64")
return
if platform.machine().endswith("64"):
print("x86_64")
else:
print("i386")
def ogGetOsType(disk, partition):
try:
os_version = ogGetOsVersion(disk, partition)
if os_version:
return os_version.split(":", 1)[0]
else:
return "Unknown"
except Exception as e:
print(f"Error en ogGetOsType: {e}")
return "Unknown"
def ogGetOsUuid():
# Si se solicita, mostrar ayuda.
if len(sys.argv) > 1 and sys.argv[1] == "help":
SystemLib.ogHelp(sys.argv[0], sys.argv[0] + " int_ndisk int_nfilesys", sys.argv[0] + " 1 2 => 540e47c6-8e78-4178-aa46-042e4803fb16")
return
# Error si no se reciben 2 parametros.
if len(sys.argv) != 3:
SystemLib.ogRaiseError(OG_ERR_FORMAT)
return
# Montar la particion, si no lo estaba previamente.
MNTDIR = FileSystemLib.ogMount(sys.argv[1], sys.argv[2])
if not MNTDIR:
return
# Obtener UUID según el tipo de sistema operativo.
os_type = ogGetOsType(sys.argv[1], sys.argv[2])
if os_type == "Linux":
# Leer el UUID del sistema de ficheros raíz o el fichero de identificador.
uuid = subprocess.check_output(["findmnt", "-no", "UUID", MNTDIR], stderr=subprocess.DEVNULL).decode().strip() or open(os.path.join(MNTDIR, "etc", "machine-id")).read().strip()
print(uuid)
elif os_type == "Windows":
# Leer identificador en clave de registro.
uuid = Registry.ogGetRegistryValue(MNTDIR, "SOFTWARE", "\\Microsoft\\Cryptography\\MachineGuid")
print(uuid)
def ogGetSerialNumber():
# Si se solicita, mostrar ayuda.
if len(sys.argv) > 1 and sys.argv[1] == "help":
SystemLib.ogHelp(sys.argv[0], sys.argv[0], sys.argv[0] + " => 123456")
return
# Obtener nº de serie (ignorar los no especificados).
SERIALNO = subprocess.check_output(["dmidecode", "-s", "system-serial-number"]).decode().strip()
SERIALNO = re.sub(r"(not specified|to be filled|invalid entry|default string)", "", SERIALNO, flags=re.IGNORECASE)
SERIALNO = SERIALNO.replace(" ", "")
SERIALNO = SERIALNO[:25] if len(SERIALNO) > 25 else SERIALNO
if SERIALNO:
print(SERIALNO)
return 0
def ogIsEfiActive():
return os.path.isdir("/sys/firmware/efi")
def parse_lshw_output():
try:
# Ejecutar lshw en formato JSON para un fácil procesamiento
lshw_output = subprocess.check_output(["lshw", "-json"], text=True)
lshw_data = json.loads(lshw_output) # Convertir la salida JSON a un diccionario
# Extraer información relevante en el formato clave=valor
parsed_output = []
# Ejemplo de datos clave que podríamos extraer
if "product" in lshw_data:
parsed_output.append(f"product={lshw_data['product']}")
if "vendor" in lshw_data:
parsed_output.append(f"vendor={lshw_data['vendor']}")
if "configuration" in lshw_data and "memory" in lshw_data["configuration"]:
parsed_output.append(f"memory={lshw_data['configuration']['memory']}")
# Recorrer los dispositivos para obtener información de CPU, almacenamiento, etc.
for item in lshw_data.get("children", []):
if item["class"] == "processor":
parsed_output.append(f"cpu={item.get('product', 'Unknown')}")
elif item["class"] == "memory" and "size" in item:
parsed_output.append(f"total_memory={item['size']}")
elif item["class"] == "disk":
parsed_output.append(f"disk={item.get('product', 'Unknown')}")
# Devolver los datos combinados
return "\n".join(parsed_output)
except subprocess.CalledProcessError as e:
print(f"Error al ejecutar lshw: {e}")
return "Error al obtener información de hardware"
def ogListHardwareInfo():
# Ejecutar dmidecode y obtener tipo de chasis
try:
output = subprocess.check_output('echo "cha=$(dmidecode -s chassis-type)" | grep -v "Other"', shell=True).decode().strip()
except subprocess.CalledProcessError:
output = "cha=Unknown"
# Detectar BIOS o UEFI
firmware = "boo=UEFI" if os.path.isdir("/sys/firmware/efi") else "boo=BIOS"
print(firmware)
# Ejecutar y analizar lshw
lshw_output = parse_lshw_output()
# Combina y devuelve los resultados
return f"{output}\n{firmware}\n{lshw_output}"
def ogListSoftware(disk, partition):
if disk is None or partition is None:
SystemLib.ogRaiseError(ogGlobals.OG_ERR_FORMAT)
return []
mnt_dir = FileSystemLib.ogMount(disk, partition)
os_type = ogGetOsType(disk, partition)
apps_file = tempfile.NamedTemporaryFile(delete=False, mode="w+")
tmp_file = tempfile.NamedTemporaryFile(delete=False, mode="w+")
apps = []
try:
if os_type == "Linux":
pkg_dir = os.path.join(mnt_dir, "var/lib/dpkg")
status_file = os.path.join(pkg_dir, "status")
if os.path.exists(status_file):
with open(status_file, "r") as f:
pkg, ver = None, None
for line in f:
if line.startswith("Package:"):
pkg = line.split(":", 1)[1].strip()
elif line.startswith("Version:"):
ver = line.split(":", 1)[1].strip()
elif line.startswith("Status:") and "install" not in line:
pkg, ver = None, None
if pkg and ver:
apps.append(f"{pkg} {ver}")
pkg, ver = None, None
pkg_dir = os.path.join(mnt_dir, "var/lib/rpm")
if os.path.exists(pkg_dir):
if shutil.which("rpm"):
result = subprocess.run(
["rpm", "--dbpath", pkg_dir, "-qa", "--qf", "%{NAME} %{VERSION}\n"],
capture_output=True,
text=True
)
if result.returncode == 0:
apps.extend(result.stdout.strip().splitlines())
else:
SystemLib.ogEcho("session", "error", "The rpm command is not available.")
pass
else:
SystemLib.ogRaiseError(ogGlobals.OG_ERR_NOTOS, disk, partition)
return []
finally:
apps_file.close()
tmp_file.close()
os.remove(apps_file.name)
os.remove(tmp_file.name)
os_version = ogGetOsVersion(disk, partition)
print(f"Operative System: {os_version}")
return sorted(set(apps))
def ogGetOsVersion(disk, part):
try:
mnt_dir = FileSystemLib.ogMount(disk, part)
version = None
os_release = os.path.join(mnt_dir, "etc/os-release")
if os.path.isfile(os_release):
with open(os_release, "r") as f:
for line in f:
if line.startswith("PRETTY_NAME"):
version = line.split("=", 1)[1].strip().strip('"')
break
if not version:
lsb_release = os.path.join(mnt_dir, "etc/lsb-release")
if os.path.isfile(lsb_release):
with open(lsb_release, "r") as f:
for line in f:
if line.startswith("DISTRIB_DESCRIPTION"):
version = line.split("=", 1)[1].strip().strip('"')
break
if not version:
for distrib in ["redhat", "SuSE", "mandrake", "gentoo"]:
distrib_file = os.path.join(mnt_dir, f"etc/{distrib}-release")
if os.path.isfile(distrib_file):
with open(distrib_file, "r") as f:
version = f.readline().strip()
break
is_64bit = os.path.exists(os.path.join(mnt_dir, "lib64"))
if is_64bit:
version = f"{version} 64 bits"
return f"Linux: {version}" if version else "Linux: Unknown"
except Exception as e:
print(f"Fail to get OS: {e}")
return None