test-ogclone/client/lib/engine/bin/InventoryLib.py

237 lines
8.5 KiB
Python

import glob
import platform
import sys
import os
import subprocess
import tempfile
import re
import json
import shutil
import sqlite3
import SystemLib
import ogGlobals
import FileLib
import RegistryLib
import FileSystemLib
MSG_HARDWAREINVENTORY = "Inventario de hardware de la máquina"
def ogGetArch():
if len(sys.argv) > 1 and sys.argv[1] == "help":
SystemLib.ogHelp(sys.argv[0], sys.argv[0], sys.argv[0] + " => x86_64")
return
if platform.machine().endswith("64"):
print("x86_64")
else:
print("i386")
def ogGetOsType():
# Si se solicita, mostrar ayuda.
if len(sys.argv) > 1 and sys.argv[1] == "help":
SystemLib.ogHelp(sys.argv[0], sys.argv[0] + " int_ndisk int_npartition", sys.argv[0] + " 1 2 => Linux")
return
ogGetOsVersion(sys.argv[1:]).split(':')[0]
def ogGetOsUuid():
# Si se solicita, mostrar ayuda.
if len(sys.argv) > 1 and sys.argv[1] == "help":
SystemLib.ogHelp(sys.argv[0], sys.argv[0] + " int_ndisk int_nfilesys", sys.argv[0] + " 1 2 => 540e47c6-8e78-4178-aa46-042e4803fb16")
return
# Error si no se reciben 2 parametros.
if len(sys.argv) != 3:
SystemLib.ogRaiseError(OG_ERR_FORMAT)
return
# Montar la particion, si no lo estaba previamente.
MNTDIR = FileSystemLib.ogMount(sys.argv[1], sys.argv[2])
if not MNTDIR:
return
# Obtener UUID según el tipo de sistema operativo.
os_type = ogGetOsType(sys.argv[1], sys.argv[2])
if os_type == "Linux":
# Leer el UUID del sistema de ficheros raíz o el fichero de identificador.
uuid = subprocess.check_output(["findmnt", "-no", "UUID", MNTDIR], stderr=subprocess.DEVNULL).decode().strip() or open(os.path.join(MNTDIR, "etc", "machine-id")).read().strip()
print(uuid)
elif os_type == "Windows":
# Leer identificador en clave de registro.
uuid = Registry.ogGetRegistryValue(MNTDIR, "SOFTWARE", "\\Microsoft\\Cryptography\\MachineGuid")
print(uuid)
def ogGetSerialNumber():
# Si se solicita, mostrar ayuda.
if len(sys.argv) > 1 and sys.argv[1] == "help":
SystemLib.ogHelp(sys.argv[0], sys.argv[0], sys.argv[0] + " => 123456")
return
# Obtener nº de serie (ignorar los no especificados).
SERIALNO = subprocess.check_output(["dmidecode", "-s", "system-serial-number"]).decode().strip()
SERIALNO = re.sub(r"(not specified|to be filled|invalid entry|default string)", "", SERIALNO, flags=re.IGNORECASE)
SERIALNO = SERIALNO.replace(" ", "")
SERIALNO = SERIALNO[:25] if len(SERIALNO) > 25 else SERIALNO
if SERIALNO:
print(SERIALNO)
return 0
def ogIsEfiActive():
return os.path.isdir("/sys/firmware/efi")
import subprocess
import os
import sys
def parse_lshw_output():
try:
# Ejecutar lshw en formato JSON para un fácil procesamiento
lshw_output = subprocess.check_output(["lshw", "-json"], text=True)
lshw_data = json.loads(lshw_output) # Convertir la salida JSON a un diccionario
# Extraer información relevante en el formato clave=valor
parsed_output = []
# Ejemplo de datos clave que podríamos extraer
if "product" in lshw_data:
parsed_output.append(f"product={lshw_data['product']}")
if "vendor" in lshw_data:
parsed_output.append(f"vendor={lshw_data['vendor']}")
if "configuration" in lshw_data and "memory" in lshw_data["configuration"]:
parsed_output.append(f"memory={lshw_data['configuration']['memory']}")
# Recorrer los dispositivos para obtener información de CPU, almacenamiento, etc.
for item in lshw_data.get("children", []):
if item["class"] == "processor":
parsed_output.append(f"cpu={item.get('product', 'Unknown')}")
elif item["class"] == "memory" and "size" in item:
parsed_output.append(f"total_memory={item['size']}")
elif item["class"] == "disk":
parsed_output.append(f"disk={item.get('product', 'Unknown')}")
# Devolver los datos combinados
return "\n".join(parsed_output)
except subprocess.CalledProcessError as e:
print(f"Error al ejecutar lshw: {e}")
return "Error al obtener información de hardware"
def ogListHardwareInfo():
# Ejecutar dmidecode y obtener tipo de chasis
try:
output = subprocess.check_output('echo "cha=$(dmidecode -s chassis-type)" | grep -v "Other"', shell=True).decode().strip()
except subprocess.CalledProcessError:
output = "cha=Unknown"
# Detectar BIOS o UEFI
firmware = "boo=UEFI" if os.path.isdir("/sys/firmware/efi") else "boo=BIOS"
print(firmware)
# Ejecutar y analizar lshw
lshw_output = parse_lshw_output()
# Combina y devuelve los resultados
return f"{output}\n{firmware}\n{lshw_output}"
def ogListSoftware(disk, partition):
if disk is None or partition is None:
SystemLib.ogRaiseError(ogGlobals.OG_ERR_FORMAT)
return []
mnt_dir = FileSystemLib.ogMount(disk, partition)
os_type = ogGetOsType(disk, partition)
apps_file = tempfile.NamedTemporaryFile(delete=False, mode="w+")
tmp_file = tempfile.NamedTemporaryFile(delete=False, mode="w+")
apps = []
try:
if os_type == "Linux":
pkg_dir = os.path.join(mnt_dir, "var/lib/dpkg")
status_file = os.path.join(pkg_dir, "status")
if os.path.exists(status_file):
with open(status_file, "r") as f:
pkg, ver = None, None
for line in f:
if line.startswith("Package:"):
pkg = line.split(":", 1)[1].strip()
elif line.startswith("Version:"):
ver = line.split(":", 1)[1].strip()
elif line.startswith("Status:") and "install" not in line:
pkg, ver = None, None
if pkg and ver:
apps.append(f"{pkg} {ver}")
pkg, ver = None, None
pkg_dir = os.path.join(mnt_dir, "var/lib/rpm")
if os.path.exists(pkg_dir):
if shutil.which("rpm"):
result = subprocess.run(
["rpm", "--dbpath", pkg_dir, "-qa", "--qf", "%{NAME} %{VERSION}\n"],
capture_output=True,
text=True
)
if result.returncode == 0:
apps.extend(result.stdout.strip().splitlines())
else:
SystemLib.ogEcho("session", "error", "The rpm command is not available.")
pass
else:
SystemLib.ogRaiseError(ogGlobals.OG_ERR_NOTOS, disk, partition)
return []
finally:
apps_file.close()
tmp_file.close()
os.remove(apps_file.name)
os.remove(tmp_file.name)
os_version = ogGetOsVersion(disk, partition)
print(f"Operative System: {os_version}")
return sorted(set(apps))
def ogGetOsVersion(disk, part):
try:
mnt_dir = FileSystemLib.ogMount(disk, part)
version = None
os_release = os.path.join(mnt_dir, "etc/os-release")
if os.path.isfile(os_release):
with open(os_release, "r") as f:
for line in f:
if line.startswith("PRETTY_NAME"):
version = line.split("=", 1)[1].strip().strip('"')
break
if not version:
lsb_release = os.path.join(mnt_dir, "etc/lsb-release")
if os.path.isfile(lsb_release):
with open(lsb_release, "r") as f:
for line in f:
if line.startswith("DISTRIB_DESCRIPTION"):
version = line.split("=", 1)[1].strip().strip('"')
break
if not version:
for distrib in ["redhat", "SuSE", "mandrake", "gentoo"]:
distrib_file = os.path.join(mnt_dir, f"etc/{distrib}-release")
if os.path.isfile(distrib_file):
with open(distrib_file, "r") as f:
version = f.readline().strip()
break
is_64bit = os.path.exists(os.path.join(mnt_dir, "lib64"))
if is_64bit:
version = f"{version} 64 bits"
return f"Linux: {version}" if version else "Linux: Unknown"
except Exception as e:
print(f"Fail to get OS: {e}")
return None