ogclone-engine/client/lib/engine/bin/InventoryLib.py

380 lines
15 KiB
Python

import glob
import platform
import sys
import os
import subprocess
import tempfile
import re
import json
import shutil
import sqlite3
from SystemLib import ogMount, ogGetOsType, ogRaiseError, ogHelp, ogGetOsVersion, ogGetHivePath, ogListRegistryKeys, ogGetRegistryValue, ogGetIpAddress
sys.path.append('/opt/opengnsys/lib/engine/bin')
from SystemLib import ogEcho
#from Disklib import ogDiskToDev
#from engine.FileLib import *
#from engine.RegistryLib import *
#from engine.FileSystemLib import *
MSG_HARDWAREINVENTORY = "Inventario de hardware de la máquina"
def ogGetArch():
if len(sys.argv) > 1 and sys.argv[1] == "help":
ogHelp(sys.argv[0], sys.argv[0], sys.argv[0] + " => x86_64")
return
if platform.machine().endswith("64"):
print("x86_64")
else:
print("i386")
def ogGetOsType():
# Si se solicita, mostrar ayuda.
if len(sys.argv) > 1 and sys.argv[1] == "help":
ogHelp(sys.argv[0], sys.argv[0] + " int_ndisk int_npartition", sys.argv[0] + " 1 2 => Linux")
return
ogGetOsVersion(sys.argv[1:]).split(':')[0]
def ogGetOsUuid():
# Si se solicita, mostrar ayuda.
if len(sys.argv) > 1 and sys.argv[1] == "help":
ogHelp(sys.argv[0], sys.argv[0] + " int_ndisk int_nfilesys", sys.argv[0] + " 1 2 => 540e47c6-8e78-4178-aa46-042e4803fb16")
return
# Error si no se reciben 2 parametros.
if len(sys.argv) != 3:
ogRaiseError(OG_ERR_FORMAT)
return
# Montar la particion, si no lo estaba previamente.
MNTDIR = ogMount(sys.argv[1], sys.argv[2])
if not MNTDIR:
return
# Obtener UUID según el tipo de sistema operativo.
os_type = ogGetOsType(sys.argv[1], sys.argv[2])
if os_type == "Linux":
# Leer el UUID del sistema de ficheros raíz o el fichero de identificador.
uuid = subprocess.check_output(["findmnt", "-no", "UUID", MNTDIR], stderr=subprocess.DEVNULL).decode().strip() or open(os.path.join(MNTDIR, "etc", "machine-id")).read().strip()
print(uuid)
elif os_type == "Windows":
# Leer identificador en clave de registro.
uuid = ogGetRegistryValue(MNTDIR, "SOFTWARE", "\\Microsoft\\Cryptography\\MachineGuid")
print(uuid)
def ogGetSerialNumber():
# Si se solicita, mostrar ayuda.
if len(sys.argv) > 1 and sys.argv[1] == "help":
ogHelp(sys.argv[0], sys.argv[0], sys.argv[0] + " => 123456")
return
# Obtener nº de serie (ignorar los no especificados).
SERIALNO = subprocess.check_output(["dmidecode", "-s", "system-serial-number"]).decode().strip()
SERIALNO = re.sub(r"(not specified|to be filled|invalid entry|default string)", "", SERIALNO, flags=re.IGNORECASE)
SERIALNO = SERIALNO.replace(" ", "")
SERIALNO = SERIALNO[:25] if len(SERIALNO) > 25 else SERIALNO
if SERIALNO:
print(SERIALNO)
return 0
def ogIsEfiActive():
return os.path.isdir("/sys/firmware/efi")
import subprocess
import os
import sys
def parse_lshw_output():
try:
# Ejecutar lshw en formato JSON para un fácil procesamiento
lshw_output = subprocess.check_output(["lshw", "-json"], text=True)
lshw_data = json.loads(lshw_output) # Convertir la salida JSON a un diccionario
# Extraer información relevante en el formato clave=valor
parsed_output = []
# Ejemplo de datos clave que podríamos extraer
if "product" in lshw_data:
parsed_output.append(f"product={lshw_data['product']}")
if "vendor" in lshw_data:
parsed_output.append(f"vendor={lshw_data['vendor']}")
if "configuration" in lshw_data and "memory" in lshw_data["configuration"]:
parsed_output.append(f"memory={lshw_data['configuration']['memory']}")
# Recorrer los dispositivos para obtener información de CPU, almacenamiento, etc.
for item in lshw_data.get("children", []):
if item["class"] == "processor":
parsed_output.append(f"cpu={item.get('product', 'Unknown')}")
elif item["class"] == "memory" and "size" in item:
parsed_output.append(f"total_memory={item['size']}")
elif item["class"] == "disk":
parsed_output.append(f"disk={item.get('product', 'Unknown')}")
# Devolver los datos combinados
return "\n".join(parsed_output)
except subprocess.CalledProcessError as e:
print(f"Error al ejecutar lshw: {e}")
return "Error al obtener información de hardware"
def ogListHardwareInfo():
# Ejecutar dmidecode y obtener tipo de chasis
try:
output = subprocess.check_output('echo "cha=$(dmidecode -s chassis-type)" | grep -v "Other"', shell=True).decode().strip()
except subprocess.CalledProcessError:
output = "cha=Unknown"
# Detectar BIOS o UEFI
firmware = "boo=UEFI" if os.path.isdir("/sys/firmware/efi") else "boo=BIOS"
print(firmware)
# Ejecutar y analizar lshw
lshw_output = parse_lshw_output()
# Combina y devuelve los resultados
return f"{output}\n{firmware}\n{lshw_output}"
def ogListSoftware(disk, partition):
# Error si no se reciben 2 parametros
if disk is None or partition is None:
og_raise_error("OG_ERR_FORMAT")
return
# Obtener tipo de sistema de archivos y montarlo
mnt_dir = og_mount(disk, partition)
os_type = og_get_os_type(disk, partition)
# Ficheros temporales
apps_file = tempfile.NamedTemporaryFile(delete=False)
tmp_file = tempfile.NamedTemporaryFile(delete=False)
try:
if os_type == "Linux":
# Procesar paquetes dpkg
pkg_dir = os.path.join(mnt_dir, "var/lib/dpkg")
if os.path.exists(pkg_dir):
with open(os.path.join(pkg_dir, "status"), "r") as status_file:
for line in status_file:
# Process lines to extract software info
pass
# Procesar paquetes RPM
pkg_dir = os.path.join(mnt_dir, "var/lib/rpm")
if os.path.exists(pkg_dir):
# Execute rpm command if available
if shutil.which("rpm"):
subprocess.run(["rpm", "--dbpath", pkg_dir, "-qa", "--qf", "%{NAME} %{VERSION}\n"], stdout=apps_file)
else:
# Compatibility mode for older systems
pass
# Procesar otros tipos de paquetes (pacman, snap, flatpak, etc.)
pass
elif os_type == "Windows":
# Procesar aplicaciones instaladas en Windows
hive_path = og_get_hive_path(mnt_dir, "software")
if hive_path:
subprocess.run(["hivexregedit", "--unsafe-printable-strings", "--export", hive_path, '\Microsoft\Windows\CurrentVersion\Uninstall'], stdout=tmp_file)
elif os_type == "MacOS":
# Procesar aplicaciones instaladas en MacOS
pass
elif os_type == "BSD":
# Procesar aplicaciones instaladas en FreeBSD
pass
else:
og_raise_error("OG_ERR_NOTOS", disk, partition)
finally:
# Eliminar archivos temporales
apps_file.close()
tmp_file.close()
os.remove(apps_file.name)
os.remove(tmp_file.name)
# Mostrar sistema operativo y aplicaciones
og_get_os_version(disk, partition)
with open(apps_file.name, "r") as apps:
for line in sorted(set(apps.readlines())):
print(line.strip())
def ogGetOsVersion(ndisk, nfilesys):
# Variables locales.
MNTDIR = ogMount(ndisk, nfilesys)
TYPE = ""
VERSION = ""
IS64BIT = ""
# Si se solicita, mostrar ayuda.
if len(sys.argv) > 1 and sys.argv[1] == "help":
ogHelp(sys.argv[0], sys.argv[0] + " int_ndisk int_nfilesys", sys.argv[0] + " 1 2 => Linux:Ubuntu precise (12.04 LTS) 64 bits")
return
# Error si no se reciben 2 parametros.
if len(sys.argv) != 3:
ogRaiseError(OG_ERR_FORMAT)
return
# Montar la particion, si no lo estaba previamente.
MNTDIR = ogMount(sys.argv[1], sys.argv[2])
if not MNTDIR:
return
# Buscar tipo de sistema operativo.
# Para GNU/Linux: leer descripción.
TYPE = "Linux"
FILE = os.path.join(MNTDIR, "etc", "os-release")
if os.path.exists(FILE):
with open(FILE, "r") as f:
for line in f:
if line.startswith("PRETTY_NAME"):
VERSION = line.split("=")[1].strip().strip('"')
break
# Si no se puede obtener, buscar en ficheros del sistema.
if not VERSION:
FILE = os.path.join(MNTDIR, "etc", "lsb-release")
if os.path.exists(FILE):
with open(FILE, "r") as f:
for line in f:
if line.startswith("DESCRIPTION"):
VERSION = line.split("=")[1].strip().strip('"')
break
for DISTRIB in ["redhat", "SuSE", "mandrake", "gentoo"]:
FILE = os.path.join(MNTDIR, "etc", f"{DISTRIB}-release")
if os.path.exists(FILE):
with open(FILE, "r") as f:
VERSION = f.readline().strip()
break
FILE = os.path.join(MNTDIR, "etc", "arch-release")
if os.path.exists(FILE):
VERSION = "Arch Linux"
FILE = os.path.join(MNTDIR, "etc", "slackware-version")
if os.path.exists(FILE):
with open(FILE, "r") as f:
VERSION = f.read().strip()
# Si no se encuentra, intentar ejecutar "lsb_release".
if not VERSION:
try:
output = subprocess.check_output(["chroot", MNTDIR, "lsb_release", "-d"], stderr=subprocess.DEVNULL).decode().strip()
VERSION = output.split(":")[1].strip()
except subprocess.CalledProcessError:
pass
# Comprobar Linux de 64 bits.
if VERSION and os.path.exists(os.path.join(MNTDIR, "lib64")):
IS64BIT = "64 bits"
# Para Android, leer fichero de propiedades.
if not VERSION:
TYPE = "Android"
FILE = os.path.join(MNTDIR, "android*", "system", "build.prop")
if glob.glob(FILE):
with open(glob.glob(FILE)[0], "r") as f:
lines = f.readlines()
brand = ""
release = ""
for line in lines:
if line.startswith("product.brand"):
brand = line.split("=")[1].strip()
elif line.startswith("build.version.release"):
release = line.split("=")[1].strip()
VERSION = f"Android {brand} {release}"
if os.path.exists(os.path.join(MNTDIR, "lib64")):
IS64BIT = "64 bits"
# Para GNU/Hurd, comprobar fichero de inicio (basado en os-prober).
if not VERSION:
TYPE = "Hurd"
FILE = os.path.join(MNTDIR, "hurd", "init")
if os.path.exists(FILE):
VERSION = "GNU/Hurd"
# Para Windows: leer la version del registro.
if not VERSION:
TYPE = "Windows"
FILE = ogGetHivePath(MNTDIR, "SOFTWARE")
if FILE:
try:
output = subprocess.check_output(["hivexsh", "load", FILE, "cd", "\\Microsoft\\Windows NT\\CurrentVersion", "lsval", "ProductName", "lsval", "DisplayVersion"], stderr=subprocess.DEVNULL).decode().strip()
lines = output.split("\n")
if len(lines) == 2:
VERSION = lines[1].strip()
if ogGetRegistryValue(MNTDIR, "SOFTWARE", "\\Microsoft\\Windows\\CurrentVersion\\ProgramW6432Dir"):
IS64BIT = "64 bits"
except subprocess.CalledProcessError:
pass
# Para cargador Windows: buscar versión en fichero BCD (basado en os-prober).
if not VERSION:
TYPE = "WinLoader"
FILE = ogGetPath(MNTDIR, "boot", "bcd")
if not FILE:
FILE = ogGetPath(MNTDIR, "EFI", "Microsoft", "boot", "bcd")
if FILE:
for DISTRIB in ["Windows Recovery", "Windows Boot"]:
with open(FILE, "rb") as f:
if re.search(DISTRIB.encode(), f.read()):
VERSION = f"{DISTRIB} loader"
break
# Para macOS: detectar kernel y completar con fichero plist de información del sistema.
if not VERSION:
TYPE = "MacOS"
FILE = os.path.join(MNTDIR, "mach_kernel")
if os.path.exists(FILE) and not subprocess.check_output(["file", "-b", FILE]).decode().strip().startswith("text"):
if subprocess.check_output(["file", "-b", FILE]).decode().strip().startswith("Mach-O"):
VERSION = "macOS"
if subprocess.check_output(["file", "-b", FILE]).decode().strip().startswith("Mach-O 64-bit"):
IS64BIT = "64 bits"
FILE = os.path.join(MNTDIR, "System", "Library", "CoreServices", "SystemVersion.plist")
if os.path.exists(FILE):
with open(FILE, "r") as f:
plist_data = f.read()
product_name = re.search(r"<key>ProductName</key>\s*<string>(.*?)</string>", plist_data)
product_version = re.search(r"<key>ProductVersion</key>\s*<string>(.*?)</string>", plist_data)
if product_name and product_version:
VERSION = f"{product_name.group(1)} {product_version.group(1)}"
FILE = os.path.join(MNTDIR, "com.apple.recovery.boot")
if os.path.exists(FILE) and VERSION:
VERSION = f"{VERSION} recovery"
# Para FreeBSD: obtener datos del Kernel.
if not VERSION:
TYPE = "BSD"
FILE = os.path.join(MNTDIR, "boot", "kernel", "kernel")
if os.path.exists(FILE):
output = subprocess.check_output(["strings", FILE]).decode().strip()
match = re.search(r"@.*RELEASE", output)
if match:
VERSION = match.group().split("@")[0].strip()
if subprocess.check_output(["file", "-b", FILE]).decode().strip().endswith("x86-64"):
IS64BIT = "64 bits"
# Para Solaris: leer el fichero de versión.
if not VERSION:
TYPE = "Solaris"
FILE = os.path.join(MNTDIR, "etc", "release")
if os.path.exists(FILE):
with open(FILE, "r") as f:
VERSION = f.readline().strip()
# Para cargador GRUB, comprobar fichero de configuración.
if not VERSION:
TYPE = "GrubLoader"
for FILE in [os.path.join(MNTDIR, "grub", "menu.lst"), os.path.join(MNTDIR, "boot", "grub", "menu.lst")]:
if os.path.exists(FILE):
VERSION = "GRUB Loader"
break
for FILE in glob.glob(os.path.join(MNTDIR, "{,boot/}{grub{,2},EFI/*}/grub.cfg")):
if os.path.exists(FILE):
VERSION = "GRUB2 Loader"
break
# Mostrar resultado y salir sin errores.
if VERSION:
print(f"{TYPE}:{VERSION} {IS64BIT}")
return 0