mirror of https://git.48k.eu/ogclient
utils: add hw_inventory.py
hw_inventory.py defines classes and helpers functions enabling fetching of hardware inventory from a running client. Uses a subprocess call to the command 'lshw -json' to obtain hardware information. Relevant public functions: > get_hardware_inventory() Main function encapsulating subprocess and output processing logic. Returns a HardwareInventory object. > legacy_list_hardware_inventory(inventory) Legacy string representation of parameter HardwareInventory objectmore_events
parent
4c0904d8da
commit
49a86bddd9
|
@ -0,0 +1,309 @@
|
|||
#
|
||||
# Copyright (C) 2023 Soleta Networks <info@soleta.eu>
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify it under
|
||||
# the terms of the GNU Affero General Public License as published by the
|
||||
# Free Software Foundation; either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
|
||||
import json
|
||||
import os.path
|
||||
import shlex
|
||||
import subprocess
|
||||
|
||||
from collections import namedtuple
|
||||
from enum import Enum, auto
|
||||
|
||||
|
||||
SYSFS_EFI_PATH = '/sys/firmware/efi'
|
||||
|
||||
|
||||
class HardwareType(Enum):
|
||||
MULTIMEDIA = 1
|
||||
BOOTMODE = 2
|
||||
FIRMWARE = 3
|
||||
GRAPHICS = 4
|
||||
STORAGE = 5
|
||||
NETWORK = 6
|
||||
CHASSIS = 7
|
||||
MEMORY = 8
|
||||
MODEL = 9
|
||||
DISK = 10
|
||||
CPU = 11
|
||||
USB = 12
|
||||
CD = 13
|
||||
|
||||
|
||||
class HardwareElement():
|
||||
"""
|
||||
Simple container of a hardware type and its name
|
||||
"""
|
||||
def __init__(self, hwtype, name):
|
||||
self.type = hwtype
|
||||
self.name = name
|
||||
|
||||
def __str__(self):
|
||||
return f'Hardware element {self.type}: {self.name}'
|
||||
|
||||
|
||||
class HardwareInventory():
|
||||
"""
|
||||
Collection of hardware elements
|
||||
"""
|
||||
def __init__(self):
|
||||
self.elements = list()
|
||||
|
||||
def add_element(self, elem):
|
||||
if elem.type not in HardwareType:
|
||||
raise ValueError('Unsupported hardware type')
|
||||
if not elem.name:
|
||||
raise ValueError('Empty hardware element name')
|
||||
self.elements.append(elem)
|
||||
|
||||
|
||||
def _bytes_to_human(size):
|
||||
suffixes = ['B', 'MiB', 'GiB', 'TiB']
|
||||
if type(size) is not int:
|
||||
raise TypeError('Invalid type')
|
||||
for exponent, suffix in enumerate(suffixes, start=1):
|
||||
conv = size / (1024**exponent)
|
||||
if conv < 1024:
|
||||
return f'{conv:.2f} {suffix}'
|
||||
|
||||
# Utility methods for lshw json output processing
|
||||
|
||||
def _fill_computer_model(inventory, root):
|
||||
model = ' '.join([root['vendor'], root['product'], root['version']])
|
||||
elem = HardwareElement(HardwareType.MODEL, model)
|
||||
inventory.add_element(elem)
|
||||
|
||||
|
||||
def _fill_chassis_type(inventory, root):
|
||||
chassis = root['configuration']['chassis']
|
||||
elem = HardwareElement(HardwareType.CHASSIS, chassis)
|
||||
inventory.add_element(elem)
|
||||
|
||||
|
||||
def _fill_bootmode(inventory):
|
||||
bootmode = 'UEFI' if os.path.exists(SYSFS_EFI_PATH) else 'BIOS'
|
||||
elem = HardwareElement(HardwareType.BOOTMODE, bootmode)
|
||||
inventory.add_element(elem)
|
||||
|
||||
|
||||
def _process_core_firmware(inventory, obj):
|
||||
desc = ' '.join([obj['description'], obj['vendor'], obj['version']])
|
||||
firmware_elem = HardwareElement(HardwareType.FIRMWARE, desc)
|
||||
inventory.add_element(firmware_elem)
|
||||
|
||||
|
||||
def _process_core_cpu(inventory, obj):
|
||||
cpu = obj['product']
|
||||
cpu_elem = HardwareElement(HardwareType.CPU, cpu)
|
||||
inventory.add_element(cpu_elem)
|
||||
|
||||
|
||||
def _process_core_mem_bank(inventory, obj):
|
||||
slot = obj.get('slot', 'Unknown slot')
|
||||
size = _bytes_to_human(obj['size']) if 'size' in obj else None
|
||||
if size:
|
||||
mem = ' '.join([obj['vendor'], obj['product'], size, f'({slot})'])
|
||||
mem_elem = HardwareElement(HardwareType.MEMORY, mem)
|
||||
inventory.add_element(mem_elem)
|
||||
|
||||
|
||||
def _process_core_mem(inventory, obj):
|
||||
banks = obj['children']
|
||||
for bank in banks:
|
||||
_process_core_mem_bank(inventory, bank)
|
||||
|
||||
|
||||
def _process_core_pci_usb(inventory, obj):
|
||||
name = ' '.join([obj['vendor'], obj['product']])
|
||||
usb_elem = HardwareElement(HardwareType.USB, name)
|
||||
inventory.add_element(usb_elem)
|
||||
|
||||
|
||||
def _process_core_pci_display(inventory, obj):
|
||||
name = ' '.join([obj['vendor'], obj['product']])
|
||||
display_elem = HardwareElement(HardwareType.GRAPHICS, name)
|
||||
inventory.add_element(display_elem)
|
||||
|
||||
|
||||
def _process_core_pci_network(inventory, obj):
|
||||
link = obj.get('size', 'Unknown link speed')
|
||||
if type(link) == int:
|
||||
if link >= 1000000000:
|
||||
linkh = f'{link/1e9} Gbit/s'
|
||||
else:
|
||||
linkh = f'{link/1e6} Mbit/s'
|
||||
name = ' '.join([obj['vendor'], obj['product'], f'({linkh})'])
|
||||
elem = HardwareElement(HardwareType.NETWORK, name)
|
||||
inventory.add_element(elem)
|
||||
|
||||
|
||||
def _process_core_pci_storage_child(inventory, obj):
|
||||
obj_id = obj['id']
|
||||
if obj_id.startswith('disk') or obj_id.startswith('nvme'):
|
||||
size = bytes_to_human(obj['size']) if 'size' in obj else 'Unknown size'
|
||||
name = ' '.join([obj['description'], obj['product'], size])
|
||||
elem = HardwareElement(HardwareType.DISK, name)
|
||||
inventory.add_element(elem)
|
||||
|
||||
|
||||
def _process_core_pci_storage(inventory, obj):
|
||||
name = ' '.join([obj['vendor'], obj['product']])
|
||||
elem = HardwareElement(HardwareType.STORAGE, name)
|
||||
inventory.add_element(elem)
|
||||
# Disks follow a storage section
|
||||
for storage_child in obj.get('children', []):
|
||||
_process_core_pci_storage_child(inventory, storage_child)
|
||||
|
||||
|
||||
def _process_core_pci_disk(inventory, obj):
|
||||
name = ' '.join([obj['vendor'], obj['product']])
|
||||
elem = HardwareElement(HardwareType.DISK, name)
|
||||
inventory.add_element(elem)
|
||||
|
||||
|
||||
def _process_core_pci_multimedia(inventory, obj):
|
||||
name = ' '.join([obj['vendor'], obj['product']])
|
||||
elem = HardwareElement(HardwareType.MULTIMEDIA, name)
|
||||
inventory.add_element(elem)
|
||||
|
||||
|
||||
def _process_core_pci_child(inventory, obj):
|
||||
obj_id = obj['id']
|
||||
if obj_id.startswith('usb'):
|
||||
_process_core_pci_usb(inventory, obj)
|
||||
elif obj_id.startswith('display'):
|
||||
_process_core_pci_display(inventory, obj)
|
||||
elif obj_id.startswith('network'):
|
||||
_process_core_pci_network(inventory, obj)
|
||||
elif obj_id.startswith('sata') or obj_id.startswith('storage'):
|
||||
_process_core_pci_storage(inventory, obj)
|
||||
elif obj_id.startswith('multimedia'):
|
||||
_process_core_pci_multimedia(inventory, obj)
|
||||
elif obj_id.startswith('pci'): # PCI bridge
|
||||
bridge_children = obj.get('children', [])
|
||||
for bridge_child in bridge_children:
|
||||
_process_core_pci_child(inventory, bridge_child)
|
||||
|
||||
|
||||
def _process_core_pci(inventory, obj):
|
||||
children = obj.get('children', [])
|
||||
for child in children:
|
||||
_process_core_pci_child(inventory, child)
|
||||
|
||||
|
||||
def _process_core_scsi_disk(inventory, obj):
|
||||
vendor = obj.get('vendor', 'Unknown vendor')
|
||||
name = ' '.join([vendor, obj['product']])
|
||||
elem = HardwareElement(HardwareType.DISK, name)
|
||||
inventory.add_element(elem)
|
||||
|
||||
|
||||
def _process_core_scsi_cdrom(inventory, obj):
|
||||
name = ' '.join([obj['vendor'], obj['product']])
|
||||
elem = HardwareElement(HardwareType.CD, name)
|
||||
inventory.add_element(elem)
|
||||
|
||||
|
||||
def _process_core_scsi_child(inventory, obj):
|
||||
obj_id = obj['id']
|
||||
if obj_id.startswith('disk'):
|
||||
_process_core_scsi_disk(inventory, obj)
|
||||
elif obj_id.startswith('cdrom'):
|
||||
_process_core_scsi_cdrom(inventory, obj)
|
||||
|
||||
|
||||
def _process_core_scsi(inventory, obj):
|
||||
children = obj['children']
|
||||
for child in children:
|
||||
_process_core_scsi_child(inventory, child)
|
||||
|
||||
def _process_core(inventory, core):
|
||||
for element in core['children']:
|
||||
element_id = element['id']
|
||||
if element_id.startswith('firmware'):
|
||||
_process_core_firmware(inventory, element)
|
||||
elif element_id.startswith('cpu'):
|
||||
_process_core_cpu(inventory, element)
|
||||
elif element_id.startswith('memory'):
|
||||
_process_core_mem(inventory, element)
|
||||
elif element_id.startswith('pci'):
|
||||
_process_core_pci(inventory, element)
|
||||
elif element_id.startswith('scsi'):
|
||||
_process_core_scsi(inventory, element)
|
||||
|
||||
def legacy_hardware_element(element):
|
||||
"""
|
||||
Legacy string representation of a hardware element.
|
||||
|
||||
For example, a graphics card named "Foo" would be
|
||||
represented as "vga=Foo"
|
||||
"""
|
||||
if type(element) is not HardwareElement:
|
||||
raise TypeError('Invalid type')
|
||||
elif element.type is HardwareType.MULTIMEDIA:
|
||||
nemonic = 'mul'
|
||||
elif element.type is HardwareType.BOOTMODE:
|
||||
nemonic = 'boo'
|
||||
elif element.type is HardwareType.FIRMWARE:
|
||||
nemonic = 'bio'
|
||||
elif element.type is HardwareType.GRAPHICS:
|
||||
nemonic = 'vga'
|
||||
elif element.type is HardwareType.CHASSIS:
|
||||
nemonic = 'cha'
|
||||
elif element.type is HardwareType.STORAGE:
|
||||
nemonic = 'sto'
|
||||
elif element.type is HardwareType.NETWORK:
|
||||
nemonic = 'net'
|
||||
elif element.type is HardwareType.MEMORY:
|
||||
nemonic = 'mem'
|
||||
elif element.type is HardwareType.MODEL:
|
||||
nemonic = 'mod'
|
||||
elif element.type is HardwareType.DISK:
|
||||
nemonic = 'dis'
|
||||
elif element.type is HardwareType.CPU:
|
||||
nemonic = 'cpu'
|
||||
elif element.type is HardwareType.USB:
|
||||
nemonic = 'usb'
|
||||
elif element.type is HardwareType.CD:
|
||||
nemonic = 'cdr'
|
||||
return f'{nemonic}={element.name}'
|
||||
|
||||
def legacy_list_hardware_inventory(inventory):
|
||||
"""
|
||||
Return a hardware inventory as a legacy string. Map each hardware
|
||||
component to its legacy string representation and concatenate using a new
|
||||
line.
|
||||
|
||||
This is the same output as legacy script ogListHardware.
|
||||
"""
|
||||
return '\n'.join([legacy_hardware_element(elem) for elem in inventory.elements])
|
||||
|
||||
|
||||
def get_hardware_inventory():
|
||||
proc = subprocess.run(['lshw', '-json'], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
|
||||
j = json.loads(proc.stdout)
|
||||
|
||||
if type(j) is list:
|
||||
root = j[0]
|
||||
if type(root) is not dict:
|
||||
raise ValueError('Unvalid lshw json output')
|
||||
|
||||
inventory = HardwareInventory()
|
||||
_fill_computer_model(inventory, root)
|
||||
_fill_chassis_type(inventory, root)
|
||||
_fill_bootmode(inventory)
|
||||
# Process 'children' node
|
||||
# Usually there are two type of children from lshw json root:
|
||||
# 'core' and 'power'.
|
||||
# We are only interested in 'core'.
|
||||
children = root['children']
|
||||
for child in children:
|
||||
child_id = child['id']
|
||||
if child_id == 'core':
|
||||
_process_core(inventory, child)
|
||||
|
||||
return inventory
|
Loading…
Reference in New Issue