477 lines
18 KiB
Python
477 lines
18 KiB
Python
#/**
|
|
#@file Inventory.lib
|
|
#@brief Librería o clase Inventory
|
|
#@class Inventory
|
|
#@brief Funciones para recogida de datos de inventario de hardware y software de los clientes.
|
|
#@version 1.1.0
|
|
#@warning License: GNU GPLv3+
|
|
#*/
|
|
|
|
import platform
|
|
import sys
|
|
import os
|
|
import subprocess
|
|
import tempfile
|
|
import re
|
|
import json
|
|
import shutil
|
|
import glob
|
|
import plistlib
|
|
|
|
import ogGlobals
|
|
import SystemLib
|
|
import FileSystemLib
|
|
import RegistryLib
|
|
import FileLib
|
|
|
|
|
|
#/**
|
|
# ogGetArch
|
|
#@brief Devuelve el tipo de arquitectura del cliente.
|
|
#@return str_arch - Arquitectura (i386 para 32 bits, x86_64 para 64 bits).
|
|
#*/
|
|
def ogGetArch():
|
|
if len(sys.argv) > 1 and sys.argv[1] == "help":
|
|
SystemLib.ogHelp(sys.argv[0], sys.argv[0], sys.argv[0] + " => x86_64")
|
|
return
|
|
|
|
if platform.machine().endswith("64"):
|
|
print("x86_64")
|
|
else:
|
|
print("i386")
|
|
|
|
|
|
#/**
|
|
# ogGetOsType int_ndisk int_npartition
|
|
#@brief Devuelve el tipo del sistema operativo instalado.
|
|
#@param int_ndisk nº de orden del disco
|
|
#@param int_npartition nº de orden de la partición
|
|
#@return OSType - Tipo de sistema operativo.
|
|
#@see ogGetOsVersion
|
|
#*/ ##
|
|
def ogGetOsType(disk, partition):
|
|
try:
|
|
os_version = ogGetOsVersion(disk, partition)
|
|
if os_version:
|
|
return os_version.split(":", 1)[0]
|
|
else:
|
|
return "Unknown"
|
|
except Exception as e:
|
|
print(f"Error en ogGetOsType: {e}")
|
|
return "Unknown"
|
|
|
|
|
|
#/**
|
|
# ogGetOsUuid int_ndisk int_nfilesys
|
|
#@brief Devuelve el UUID del sistema operativo instalado en un sistema de archivos.
|
|
#@param int_ndisk nº de orden del disco
|
|
#@param int_nfilesys nº de orden de la partición
|
|
#@return str_uuid - UUID del sistema operativo.
|
|
#@exception OG_ERR_FORMAT Formato incorrecto.
|
|
#@exception OG_ERR_NOTFOUND Disco o partición no corresponden con un dispositiv
|
|
#*/ ##
|
|
def ogGetOsUuid():
|
|
# Si se solicita, mostrar ayuda.
|
|
if len(sys.argv) > 1 and sys.argv[1] == "help":
|
|
SystemLib.ogHelp(sys.argv[0], sys.argv[0] + " int_ndisk int_nfilesys", sys.argv[0] + " 1 2 => 540e47c6-8e78-4178-aa46-042e4803fb16")
|
|
return
|
|
|
|
# Error si no se reciben 2 parametros.
|
|
if len(sys.argv) != 3:
|
|
SystemLib.ogRaiseError(
|
|
"session",
|
|
ogGlobals.OG_ERR_FORMAT,
|
|
f"Error: {sys.argv[0]} need 2 arguments.",
|
|
)
|
|
return
|
|
|
|
# Montar la particion, si no lo estaba previamente.
|
|
MNTDIR = FileSystemLib.ogMount(sys.argv[1], sys.argv[2])
|
|
if not MNTDIR:
|
|
return
|
|
|
|
# Obtener UUID según el tipo de sistema operativo.
|
|
os_type = ogGetOsType(sys.argv[1], sys.argv[2])
|
|
if os_type == "Linux":
|
|
# Leer el UUID del sistema de ficheros raíz o el fichero de identificador.
|
|
uuid = subprocess.check_output(["findmnt", "-no", "UUID", MNTDIR], stderr=subprocess.DEVNULL).decode().strip() or open(os.path.join(MNTDIR, "etc", "machine-id")).read().strip()
|
|
print(uuid)
|
|
elif os_type == "Windows":
|
|
# Leer identificador en clave de registro.
|
|
uuid = RegistryLib.ogGetRegistryValue(MNTDIR, "SOFTWARE", "\\Microsoft\\Cryptography\\MachineGuid")
|
|
print(uuid)
|
|
|
|
|
|
#/**
|
|
# ogGetSerialNumber
|
|
#@brief Obtiene el nº de serie del cliente.
|
|
#*/ ##
|
|
def ogGetSerialNumber():
|
|
# Si se solicita, mostrar ayuda.
|
|
if len(sys.argv) > 1 and sys.argv[1] == "help":
|
|
SystemLib.ogHelp(sys.argv[0], sys.argv[0], sys.argv[0] + " => 123456")
|
|
return
|
|
|
|
# Obtener nº de serie (ignorar los no especificados).
|
|
SERIALNO = subprocess.check_output(["dmidecode", "-s", "system-serial-number"]).decode().strip()
|
|
SERIALNO = re.sub(r"(not specified|to be filled|invalid entry|default string)", "", SERIALNO, flags=re.IGNORECASE)
|
|
SERIALNO = SERIALNO.replace(" ", "")
|
|
SERIALNO = SERIALNO[:25] if len(SERIALNO) > 25 else SERIALNO
|
|
if SERIALNO:
|
|
print(SERIALNO)
|
|
|
|
return 0
|
|
|
|
|
|
#/**
|
|
# ogIsEfiActive
|
|
#@brief Comprueba si el sistema tiene activo el arranque EFI.
|
|
#*/ ##
|
|
def ogIsEfiActive():
|
|
return os.path.isdir("/sys/firmware/efi")
|
|
|
|
def parse_lshw_output():
|
|
try:
|
|
# Ejecutar lshw en formato JSON para un fácil procesamiento
|
|
lshw_output = subprocess.check_output(["lshw", "-json"], text=True)
|
|
lshw_data = json.loads(lshw_output) # Convertir la salida JSON a un diccionario
|
|
|
|
# Extraer información relevante en el formato clave=valor
|
|
parsed_output = []
|
|
|
|
# Ejemplo de datos clave que podríamos extraer
|
|
if "product" in lshw_data:
|
|
parsed_output.append(f"product={lshw_data['product']}")
|
|
|
|
if "vendor" in lshw_data:
|
|
parsed_output.append(f"vendor={lshw_data['vendor']}")
|
|
|
|
if "configuration" in lshw_data and "memory" in lshw_data["configuration"]:
|
|
parsed_output.append(f"memory={lshw_data['configuration']['memory']}")
|
|
|
|
# Recorrer los dispositivos para obtener información de CPU, almacenamiento, etc.
|
|
for item in lshw_data.get("children", []):
|
|
if item["class"] == "processor":
|
|
parsed_output.append(f"cpu={item.get('product', 'Unknown')}")
|
|
elif item["class"] == "memory" and "size" in item:
|
|
parsed_output.append(f"total_memory={item['size']}")
|
|
elif item["class"] == "disk":
|
|
parsed_output.append(f"disk={item.get('product', 'Unknown')}")
|
|
|
|
# Devolver los datos combinados
|
|
return "\n".join(parsed_output)
|
|
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"Error al ejecutar lshw: {e}")
|
|
return "Error al obtener información de hardware"
|
|
|
|
|
|
#/**
|
|
# ogListHardwareInfo
|
|
#@brief Lista el inventario de hardware de la máquina cliente.
|
|
#@return TipoDispositivo:Modelo (por determinar)
|
|
#@warning Se ignoran los parámetros de entrada.
|
|
#@note TipoDispositivo = { bio, boa, bus, cha, cdr, cpu, dis, fir, mem, mod, mul, net, sto, usb, vga }
|
|
#@note Requisitos: dmidecode, lshw, awk
|
|
#*/ ##
|
|
def ogListHardwareInfo():
|
|
# Ejecutar dmidecode y obtener tipo de chasis
|
|
try:
|
|
output = subprocess.check_output('echo "cha=$(dmidecode -s chassis-type)" | grep -v "Other"', shell=True).decode().strip()
|
|
except subprocess.CalledProcessError:
|
|
output = "cha=Unknown"
|
|
|
|
# Detectar BIOS o UEFI
|
|
firmware = "boo=UEFI" if os.path.isdir("/sys/firmware/efi") else "boo=BIOS"
|
|
print(firmware)
|
|
|
|
# Ejecutar y analizar lshw
|
|
lshw_output = parse_lshw_output()
|
|
|
|
# Combina y devuelve los resultados
|
|
return f"{output}\n{firmware}\n{lshw_output}"
|
|
|
|
|
|
#/**
|
|
# ogListSoftware int_ndisk int_npartition
|
|
#@brief Lista el inventario de software instalado en un sistema operativo.
|
|
#@param int_ndisk nº de orden del disco
|
|
#@param int_npartition nº de orden de la partición
|
|
#@return programa versión ...
|
|
#@warning Se ignoran los parámetros de entrada.
|
|
#@note Requisitos: ...
|
|
#@todo Detectar software en Linux
|
|
#*/ ##
|
|
def ogListSoftware(disk, partition):
|
|
if disk is None or partition is None:
|
|
SystemLib.ogRaiseError(ogGlobals.OG_ERR_FORMAT)
|
|
return []
|
|
|
|
mnt_dir = FileSystemLib.ogMount(disk, partition)
|
|
os_type = ogGetOsType(disk, partition)
|
|
|
|
apps_file = tempfile.NamedTemporaryFile(delete=False, mode="w+")
|
|
tmp_file = tempfile.NamedTemporaryFile(delete=False, mode="w+")
|
|
apps = []
|
|
|
|
try:
|
|
if os_type == "Linux":
|
|
pkg_dir = os.path.join(mnt_dir, "var/lib/dpkg")
|
|
status_file = os.path.join(pkg_dir, "status")
|
|
if os.path.exists(status_file):
|
|
with open(status_file, "r") as f:
|
|
pkg, ver = None, None
|
|
for line in f:
|
|
if line.startswith("Package:"):
|
|
pkg = line.split(":", 1)[1].strip()
|
|
elif line.startswith("Version:"):
|
|
ver = line.split(":", 1)[1].strip()
|
|
elif line.startswith("Status:") and "install" not in line:
|
|
pkg, ver = None, None
|
|
if pkg and ver:
|
|
apps.append(f"{pkg} {ver}")
|
|
pkg, ver = None, None
|
|
|
|
pkg_dir = os.path.join(mnt_dir, "var/lib/rpm")
|
|
if os.path.exists(pkg_dir):
|
|
if shutil.which("rpm"):
|
|
result = subprocess.run(
|
|
["rpm", "--dbpath", pkg_dir, "-qa", "--qf", "%{NAME} %{VERSION}\n"],
|
|
capture_output=True,
|
|
text=True
|
|
)
|
|
if result.returncode == 0:
|
|
apps.extend(result.stdout.strip().splitlines())
|
|
else:
|
|
SystemLib.ogEcho("session", "error", "The rpm command is not available.")
|
|
|
|
pass
|
|
|
|
else:
|
|
SystemLib.ogRaiseError(ogGlobals.OG_ERR_NOTOS, disk, partition)
|
|
return []
|
|
|
|
finally:
|
|
apps_file.close()
|
|
tmp_file.close()
|
|
os.remove(apps_file.name)
|
|
os.remove(tmp_file.name)
|
|
|
|
os_version = ogGetOsVersion(disk, partition)
|
|
print(f"Operative System: {os_version}")
|
|
|
|
return sorted(set(apps))
|
|
|
|
## https://stackoverflow.com/questions/2522651/find-a-key-inside-a-deeply-nested-dictionary/2522706#2522706
|
|
def _find_key_recursive(plist_dict, key_substr):
|
|
for k in plist_dict.keys():
|
|
if key_substr in k: return plist_dict[k]
|
|
for k, v in plist_dict.items():
|
|
if type(v) is dict: # Only recurse if we hit a dict value
|
|
value = _find_key_recursive(v, key_substr)
|
|
if value:
|
|
return value
|
|
return ''
|
|
|
|
#/**
|
|
# ogGetOsVersion int_ndisk int_nfilesys
|
|
#@brief Devuelve la versión del sistema operativo instalado en un sistema de archivos.
|
|
#@param int_ndisk nº de orden del disco
|
|
#@param int_nfilesys nº de orden de la partición
|
|
#@return OSType:OSVersion - tipo y versión del sistema operativo.
|
|
#@note OSType = { Android, BSD, GrubLoader, Hurd, Linux, MacOS, Solaris, Windows, WinLoader }
|
|
#@note Requisitos: awk, head, chroot
|
|
#@exception OG_ERR_FORMAT Formato incorrecto.
|
|
#@exception OG_ERR_NOTFOUND Disco o partición no corresponden con un dispositiv
|
|
#@exception OG_ERR_PARTITION Fallo al montar el sistema de archivos.
|
|
#*/ ##
|
|
#ogGetOsVersion ("1", "2") => "Linux:Ubuntu precise (12.04 LTS) 64 bits"
|
|
def ogGetOsVersion(disk, part):
|
|
mntdir = FileSystemLib.ogMount (disk, part)
|
|
if not mntdir:
|
|
return None
|
|
|
|
type = version = None
|
|
is64bit = ''
|
|
|
|
# Buscar tipo de sistema operativo.
|
|
# Para GNU/Linux: leer descripción.
|
|
os_release = os.path.join(mntdir, "etc/os-release")
|
|
if os.path.isfile(os_release):
|
|
type = 'Linux'
|
|
with open(os_release, "r") as f:
|
|
for line in f:
|
|
if line.startswith("PRETTY_NAME"):
|
|
version = line.split("=", 1)[1].strip().strip('"')
|
|
break
|
|
|
|
if not version:
|
|
lsb_release = os.path.join(mntdir, "etc/lsb-release")
|
|
if os.path.isfile(lsb_release):
|
|
type = 'Linux'
|
|
with open(lsb_release, "r") as f:
|
|
for line in f:
|
|
if line.startswith("DISTRIB_DESCRIPTION"):
|
|
version = line.split("=", 1)[1].strip().strip('"')
|
|
break
|
|
|
|
if not version:
|
|
for distrib in ["redhat", "SuSE", "mandrake", "gentoo"]:
|
|
distrib_file = os.path.join(mntdir, f"etc/{distrib}-release")
|
|
if os.path.isfile(distrib_file):
|
|
type = 'Linux'
|
|
with open(distrib_file, "r") as f:
|
|
version = f.readline().strip()
|
|
break
|
|
|
|
if not version:
|
|
arch_release_file = os.path.join(mntdir, "etc/arch-release")
|
|
if os.path.isfile(arch_release_file):
|
|
type = 'Linux'
|
|
version = "Arch Linux"
|
|
|
|
if not version:
|
|
slack_release_file = os.path.join(mntdir, "slackware-version")
|
|
if os.path.isfile(slack_release_file):
|
|
type = 'Linux'
|
|
with open (slack_release_file, 'r') as fd:
|
|
c = fd.read()
|
|
version = "Slackware {c}"
|
|
|
|
# Si no se encuentra, intentar ejecutar "lsb_release".
|
|
if not version:
|
|
out = subprocess.run (['chroot', mntdir, 'lsb_release', '-d'], capture_output=True, text=True).stdout
|
|
m = re.search (':\t(.*)', out)
|
|
if m:
|
|
type = 'Linux'
|
|
version = m.group(1)
|
|
# Comprobar Linux de 64 bits.
|
|
if version and os.path.exists(os.path.join(mntdir, "lib64")):
|
|
is64bit = ogGlobals.lang.MSG_64BIT
|
|
# Para Android, leer fichero de propiedades.
|
|
if not version:
|
|
type = 'Android'
|
|
files = glob.glob (os.path.join (mntdir, 'android*/system/build.prop'))
|
|
if files and os.path.isfile (files[0]):
|
|
v = []
|
|
with open (files[0], 'r') as f:
|
|
for line in f:
|
|
if 'product.brand' in line or 'build.version.release' in line:
|
|
v.append (line.split('=')[1].strip())
|
|
version = ' '.join (v)
|
|
if os.path.exists(os.path.join(mntdir, "lib64")):
|
|
is64bit = ogGlobals.lang.MSG_64BIT
|
|
# Para GNU/Hurd, comprobar fichero de inicio (basado en os-prober).
|
|
if not version:
|
|
type = 'Hurd'
|
|
if os.path.exists(os.path.join(mntdir, "hurd/init")):
|
|
version = 'GNU/Hurd'
|
|
# Para Windows: leer la version del registro.
|
|
if not version:
|
|
type = 'Windows'
|
|
build = 0
|
|
file = RegistryLib.ogGetHivePath (mntdir, 'SOFTWARE')
|
|
if file:
|
|
# Nuevo método más rápido para acceder al registro de Windows..
|
|
i = '\n'.join ([
|
|
f'load {file}',
|
|
r'cd \Microsoft\Windows NT\CurrentVersion',
|
|
'lsval ProductName',
|
|
'lsval DisplayVersion',
|
|
])
|
|
version = subprocess.run (['hivexsh'], input=i, capture_output=True, text=True).stdout
|
|
# Recoge el valor del número de compilación para ver si es Windows 10/11
|
|
i = '\n'.join ([
|
|
f'load {file}',
|
|
r'cd \Microsoft\Windows NT\CurrentVersion',
|
|
'lsval CurrentBuildNumber',
|
|
])
|
|
build = subprocess.run (['hivexsh'], input=i, capture_output=True, text=True).stdout
|
|
|
|
if subprocess.run (['reglookup', '-H', '-p', 'Microsoft/Windows/CurrentVersion/ProgramW6432Dir', file], capture_output=True, text=True).stdout:
|
|
is64bit = ogGlobals.lang.MSG_64BIT
|
|
|
|
if not version:
|
|
# Compatibilidad con métrodo antiguo y más lento de acceder al registro.
|
|
version = RegistryLib.ogGetRegistryValue (mntdir, 'software', r'\Microsoft\Windows NT\CurrentVersion\ProductName')
|
|
if RegistryLib.ogGetRegistryValue (mntdir, 'software', r'\Microsoft\Windows\CurrentVersion\ProgramW6432Dir'):
|
|
is64bit = ogGlobals.lang.MSG_64BIT
|
|
# Si la compilación es mayor o igual a 22000 es Windows 11
|
|
if int (build) >= 22000:
|
|
version = version.replace ('10', '11')
|
|
# Para cargador Windows: buscar versión en fichero BCD (basado en os-prober).
|
|
if not version:
|
|
type = 'WinLoader'
|
|
file = FileLib.ogGetPath (file=f'{mntdir}/boot/bcd')
|
|
if file:
|
|
for distrib in 'Windows Recovery', 'Windows Boot':
|
|
with open (file, 'rb') as fd:
|
|
contents = fd.read()
|
|
distrib_utf16_regex = re.sub (r'(.)', '\\1.', distrib)
|
|
distrib_utf16_regex = bytes (distrib_utf16_regex, 'ascii')
|
|
if re.search (distrib_utf16, contents):
|
|
version = f'{distrib} loader'
|
|
# Para macOS: detectar kernel y completar con fichero plist de información del sistema.
|
|
if not version:
|
|
type = 'MacOS'
|
|
# Kernel de Mac OS (no debe ser fichero de texto).
|
|
file = f'{mntdir}/mach_kernel'
|
|
out = subprocess.run (['file', '--brief', file], capture_output=True, text=True).stdout
|
|
if not 'text' in out:
|
|
# Obtener tipo de kernel.
|
|
if 'Mach-O' in out: version = 'macOS'
|
|
if 'Mach-O 64-bit' in out: is64bit = ogGlobals.lang.MSG_64BIT
|
|
# Datos de configuración de versión de Mac OS.
|
|
file = f'{mntdir}/System/Library/CoreServices/SystemVersion.plist'
|
|
if os.path.exists (file):
|
|
with open (file, 'rb') as fd:
|
|
contents = fd.read()
|
|
plist_dict = plistlib.loads (contents)
|
|
n = _find_key_recursive (plist_dict, 'ProductName')
|
|
v = _find_key_recursive (plist_dict, 'ProductVersion')
|
|
version = f'{n} {v}'.strip()
|
|
# Datos de recuperación de macOS.
|
|
if version and os.path.exists (f'{mntdir}/com.apple.recovery.boot'):
|
|
version += ' recovery'
|
|
|
|
# Para FreeBSD: obtener datos del Kernel.
|
|
### TODO Revisar solución.
|
|
if not version:
|
|
type = 'BSD'
|
|
file = f'{mntdir}/boot/kernel/kernel'
|
|
if os.path.exists (file):
|
|
lines = subprocess.run (['strings', file], capture_output=True, text=True).stdout.splitlines()
|
|
release_search = list (filter (lambda x: re.search ('@.*RELEASE', x), lines))
|
|
if release_search:
|
|
first, second, *rest = release_search[0].split()
|
|
first = first.replace ('@(#)', '')
|
|
version = f'{first} {second}'
|
|
out = subprocess.run (['file', '--brief', file], capture_output=True, text=True).stdout
|
|
if 'x86-64' in out: is64bit = ogGlobals.lang.MSG_64BIT
|
|
# Para Solaris: leer el fichero de versión.
|
|
### TODO Revisar solución.
|
|
if not version:
|
|
type = 'Solaris'
|
|
file = f'{mntdir}/etc/release'
|
|
if os.path.exists (file):
|
|
with open (file, 'r') as fd:
|
|
version = fd.readline().strip
|
|
# Para cargador GRUB, comprobar fichero de configuración.
|
|
if not version:
|
|
type = 'GrubLoader'
|
|
for file in f'{mntdir}/grub/menu.lst', f'{mntdir}/boot/grub/menu.lst':
|
|
if os.path.exists (file):
|
|
VERSION = 'GRUB Loader'
|
|
for entry in f'{mntdir}/grub/grub.cfg', f'{mntdir}/grub2/grub.cfg', f'{mntdir}/EFI/*/grub.cfg', f'{mntdir}/boot/grub/grub.cfg', f'{mntdir}/boot/grub2/grub.cfg', f'{mntdir}/boot/EFI/*/grub.cfg':
|
|
for file in glob.glob (entry):
|
|
if os.path.exists (file):
|
|
version = 'GRUB2 Loader'
|
|
|
|
|
|
|
|
|
|
|
|
# Mostrar resultado y salir sin errores.
|
|
if version: return f"{type}: {version} {is64bit}"
|
|
return None
|