ogclient/src/utils/hw_inventory.py

318 lines
10 KiB
Python

#
# Copyright (C) 2020-2024 Soleta Networks <info@soleta.eu>
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Affero General Public License as published by the
# Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
import json
import os.path
import shlex
import subprocess
from src.log import OgError
from collections import namedtuple
from enum import Enum, auto
SYSFS_EFI_PATH = '/sys/firmware/efi'
class HardwareType(Enum):
MULTIMEDIA = 1
BOOTMODE = 2
FIRMWARE = 3
GRAPHICS = 4
STORAGE = 5
NETWORK = 6
CHASSIS = 7
MEMORY = 8
MODEL = 9
DISK = 10
CPU = 11
USB = 12
CD = 13
class HardwareElement():
"""
Simple container of a hardware type and its name
"""
def __init__(self, hwtype, name):
self.type = hwtype
self.name = name
def __str__(self):
return f'Hardware element {self.type}: {self.name}'
class HardwareInventory():
"""
Collection of hardware elements
"""
def __init__(self):
self.elements = list()
def add_element(self, elem):
if elem.type not in HardwareType:
raise OgError(f'Unsupported hardware type, received {elem.type}')
if not elem.name:
raise OgError('Empty hardware element name')
self.elements.append(elem)
def _bytes_to_human(size):
suffixes = ['B', 'MiB', 'GiB', 'TiB']
if type(size) is not int:
raise OgError(f'Invalid type in _bytes_to_human, got: {size} {type(size)}')
for exponent, suffix in enumerate(suffixes, start=1):
conv = size / (1024**exponent)
if conv < 1024:
return f'{conv:.2f} {suffix}'
# Utility methods for lshw json output processing
def _fill_computer_model(inventory, root):
model = ' '.join([root.get('vendor', 'Unknown vendor'),
root.get('product', 'Unknown prouct'),
root.get('version', 'Unknown version')])
elem = HardwareElement(HardwareType.MODEL, model)
inventory.add_element(elem)
def _fill_chassis_type(inventory, root):
chassis = root['configuration']['chassis']
elem = HardwareElement(HardwareType.CHASSIS, chassis)
inventory.add_element(elem)
def _fill_bootmode(inventory):
bootmode = 'UEFI' if os.path.exists(SYSFS_EFI_PATH) else 'BIOS'
elem = HardwareElement(HardwareType.BOOTMODE, bootmode)
inventory.add_element(elem)
def _process_core_firmware(inventory, obj):
desc = ' '.join([obj.get('description', ''), obj.get('vendor', ''), obj.get('version', '')])
firmware_elem = HardwareElement(HardwareType.FIRMWARE, desc)
inventory.add_element(firmware_elem)
def _process_core_cpu(inventory, obj):
cpu = obj.get('product', 'Unknown product')
cpu_elem = HardwareElement(HardwareType.CPU, cpu)
inventory.add_element(cpu_elem)
def _process_core_mem_bank(inventory, obj):
slot = obj.get('slot', 'Unknown slot')
if 'size' in obj:
size = obj['size']
human_size = _bytes_to_human(size)
mem = ' '.join([obj.get('vendor', 'Unknown vendor'), obj.get('product', 'Unknown product'), human_size, f'({slot})'])
else:
mem = ' '.join([obj.get('vendor', 'Unknown vendor'), obj.get('product', 'Unknown product'), 'Empty slot', f'({slot})'])
mem_elem = HardwareElement(HardwareType.MEMORY, mem)
inventory.add_element(mem_elem)
def _process_core_mem(inventory, obj):
banks = obj.get('children', [])
for bank in banks:
_process_core_mem_bank(inventory, bank)
def _process_core_pci_usb(inventory, obj):
name = ' '.join([obj.get('vendor', 'Unknown vendor'), obj.get('product', 'Unknown product')])
usb_elem = HardwareElement(HardwareType.USB, name)
inventory.add_element(usb_elem)
def _process_core_pci_display(inventory, obj):
name = ' '.join([obj.get('vendor', 'Unknown vendor'), obj.get('product', 'Unknown product')])
display_elem = HardwareElement(HardwareType.GRAPHICS, name)
inventory.add_element(display_elem)
def _process_core_pci_network(inventory, obj):
link = obj.get('size', 'Unknown link speed')
if type(link) == int:
if link >= 1e9:
link_human = f'{link/1e9} Gbit/s'
else:
link_human = f'{link/1e6} Mbit/s'
name = ' '.join([obj.get('vendor', 'Unknown vendor'), obj.get('product', 'Unknown product'), f'({link_human})'])
else:
name = ' '.join([obj.get('vendor', 'Unknown vendor'), obj.get('product', 'Unknown product')])
elem = HardwareElement(HardwareType.NETWORK, name)
inventory.add_element(elem)
def _process_core_pci_storage_child(inventory, obj):
obj_id = obj.get('id', '')
if obj_id.startswith('disk') or obj_id.startswith('nvme'):
size = _bytes_to_human(obj['size']) if 'size' in obj else 'Unknown size'
name = ' '.join([obj.get('description', ''), obj.get('product', 'Unknown product'), size])
elem = HardwareElement(HardwareType.DISK, name)
inventory.add_element(elem)
def _process_core_pci_storage(inventory, obj):
name = ' '.join([obj.get('vendor', 'Unknown vendor'), obj.get('product', 'Unknown product')])
elem = HardwareElement(HardwareType.STORAGE, name)
inventory.add_element(elem)
# Disks follow a storage section
for storage_child in obj.get('children', []):
_process_core_pci_storage_child(inventory, storage_child)
def _process_core_pci_disk(inventory, obj):
name = ' '.join([obj.get('vendor', 'Unknown vendor'), obj.get('product', 'Unknown product')])
elem = HardwareElement(HardwareType.DISK, name)
inventory.add_element(elem)
def _process_core_pci_multimedia(inventory, obj):
name = ' '.join([obj.get('vendor', 'Unknown vendor'), obj.get('product', 'Unknown product')])
elem = HardwareElement(HardwareType.MULTIMEDIA, name)
inventory.add_element(elem)
def _process_core_pci_child(inventory, obj):
obj_id = obj.get('id', '')
if obj_id.startswith('usb'):
_process_core_pci_usb(inventory, obj)
elif obj_id.startswith('display'):
_process_core_pci_display(inventory, obj)
elif obj_id.startswith('network'):
_process_core_pci_network(inventory, obj)
elif obj_id.startswith('sata') or obj_id.startswith('storage'):
_process_core_pci_storage(inventory, obj)
elif obj_id.startswith('multimedia'):
_process_core_pci_multimedia(inventory, obj)
elif obj_id.startswith('pci'): # PCI bridge
bridge_children = obj.get('children', [])
for bridge_child in bridge_children:
_process_core_pci_child(inventory, bridge_child)
def _process_core_pci(inventory, obj):
children = obj.get('children', [])
for child in children:
_process_core_pci_child(inventory, child)
def _process_core_scsi_disk(inventory, obj):
vendor = obj.get('vendor', 'Unknown vendor')
name = ' '.join([vendor, obj.get('product', 'Unknown product')])
elem = HardwareElement(HardwareType.DISK, name)
inventory.add_element(elem)
def _process_core_scsi_cdrom(inventory, obj):
name = ' '.join([obj.get('vendor', 'Unknown vendor'), obj.get('product', 'Unknown product')])
elem = HardwareElement(HardwareType.CD, name)
inventory.add_element(elem)
def _process_core_scsi_child(inventory, obj):
obj_id = obj.get('id', '')
if obj_id.startswith('disk'):
_process_core_scsi_disk(inventory, obj)
elif obj_id.startswith('cdrom'):
_process_core_scsi_cdrom(inventory, obj)
def _process_core_scsi(inventory, obj):
children = obj.get('children', [])
for child in children:
_process_core_scsi_child(inventory, child)
def _process_core(inventory, core):
for element in core['children']:
element_id = element['id']
if element_id.startswith('firmware'):
_process_core_firmware(inventory, element)
elif element_id.startswith('cpu'):
_process_core_cpu(inventory, element)
elif element_id.startswith('memory'):
_process_core_mem(inventory, element)
elif element_id.startswith('pci'):
_process_core_pci(inventory, element)
elif element_id.startswith('scsi'):
_process_core_scsi(inventory, element)
def legacy_hardware_element(element):
"""
Legacy string representation of a hardware element.
For example, a graphics card named "Foo" would be
represented as "vga=Foo"
"""
if type(element) is not HardwareElement:
raise OgError('Invalid hardware element type')
elif element.type is HardwareType.MULTIMEDIA:
nemonic = 'mul'
elif element.type is HardwareType.BOOTMODE:
nemonic = 'boo'
elif element.type is HardwareType.FIRMWARE:
nemonic = 'bio'
elif element.type is HardwareType.GRAPHICS:
nemonic = 'vga'
elif element.type is HardwareType.CHASSIS:
nemonic = 'cha'
elif element.type is HardwareType.STORAGE:
nemonic = 'sto'
elif element.type is HardwareType.NETWORK:
nemonic = 'net'
elif element.type is HardwareType.MEMORY:
nemonic = 'mem'
elif element.type is HardwareType.MODEL:
nemonic = 'mod'
elif element.type is HardwareType.DISK:
nemonic = 'dis'
elif element.type is HardwareType.CPU:
nemonic = 'cpu'
elif element.type is HardwareType.USB:
nemonic = 'usb'
elif element.type is HardwareType.CD:
nemonic = 'cdr'
return f'{nemonic}={element.name}'
def legacy_list_hardware_inventory(inventory):
"""
Return a hardware inventory as a legacy string. Map each hardware
component to its legacy string representation and concatenate using a new
line.
This is the same output as legacy script ogListHardware.
"""
return '\n'.join([legacy_hardware_element(elem) for elem in inventory.elements])
def get_hardware_inventory():
proc = subprocess.run(['lshw', '-json'], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
root = json.loads(proc.stdout)
if type(root) is list:
root = root[0]
if type(root) is not dict:
raise OgError('Invalid lshw json output')
inventory = HardwareInventory()
_fill_computer_model(inventory, root)
_fill_chassis_type(inventory, root)
_fill_bootmode(inventory)
# Process 'children' node
# Usually there are two type of children from lshw json root:
# 'core' and 'power'.
# We are only interested in 'core'.
children = root['children']
for child in children:
child_id = child['id']
if child_id == 'core':
_process_core(inventory, child)
return inventory