live: rewrite software inventory

Replace legacy bash script in favor of Python code. Improves error
traceability and further development.

The software inventory operation mounts the target partition and it
fetches the list of installed software (package set). Once the
operation is complete, it unmounts the target partition.

For Windows, introduce hivex library python bindings for accessing
Windows registry hive files (https://libguestfs.org/hivex.3.html).

This operation is still processed by legacy code in the server side
(ogAdmServer.c in ogServer). Legacy backend process expects the software
inventory like the following example:

"software": "Windows 10 Enterprise Evaluation 2004 \nIntel(R) Network Connections 24.0.0.11 24.0.0.11 ..."

The os name is inserted first in this list followed by a '\n' separated
string of the software packages.

The legacy server code can be found in function actualizaSoftware at
ogServer/src/ogAdmServer.c

It is expected for software inventory payload to change in the future to
a simpler solution using just a json array of strings.
more_events v1.2.7
Jose M. Guisado 2023-03-20 14:24:12 +01:00
parent 2172f00cde
commit 04bb35bd86
2 changed files with 134 additions and 19 deletions

View File

@ -30,6 +30,7 @@ from src.utils.probe import os_probe, cache_probe
from src.utils.disk import *
from src.utils.cache import generate_cache_txt, umount_cache, init_cache
from src.utils.tiptorrent import *
from src.utils.inventory import get_package_set
OG_SHELL = '/bin/bash'
@ -252,30 +253,26 @@ class OgLiveOperations:
def software(self, request, path, ogRest):
disk = request.getDisk()
partition = request.getPartition()
partdev = get_partition_device(int(disk), int(partition))
mountpoint = partdev.replace('dev', 'mnt')
if not mount_mkdir(partdev, mountpoint):
raise RuntimeError(f'Error mounting {partdev} at {mountpoint}')
if not os.path.ismount(mountpoint):
raise RuntimeError('Invalid mountpoint for software inventory')
self._restartBrowser(self._url_log)
try:
cmd = f'{ogClient.OG_PATH}interfaceAdm/InventarioSoftware {disk} ' \
f'{partition} {path}'
ogRest.proc = subprocess.Popen([cmd],
stdout=subprocess.PIPE,
shell=True,
executable=OG_SHELL)
(output, error) = ogRest.proc.communicate()
except:
logging.error('Exception when running software inventory subprocess')
raise ValueError('Error: Incorrect command value')
pkgset = get_package_set(mountpoint)
self._restartBrowser(self._url)
software = ''
with open(path, 'r') as f:
software = f.read()
umount(mountpoint)
logging.info('Software inventory command OK')
return software
# Software inventory result is still processed by legacy server code
# (ogAdmServer.c). Legacy response format is string where each
# software package is separated by a newline '\n'.
# Each package/software line follows this format:
# "{package_name} {package_version}"
return '\n'.join(map(str,pkgset))
def hardware(self, path, ogRest):
self._restartBrowser(self._url_log)

View File

@ -0,0 +1,118 @@
import platform
import re
import os
from collections import namedtuple
import hivex
from src.utils.probe import getwindowsversion, getlinuxversion
Package = namedtuple('Package', ['name', 'version'])
Package.__str__ = lambda pkg: f'{pkg.name} {pkg.version}'
WINDOWS_HIVES_PATH = '/Windows/System32/config'
WINDOWS_HIVES_SOFTWARE = f'{WINDOWS_HIVES_PATH}/SOFTWARE'
DPKG_STATUS_PATH = '/var/lib/dpkg/status'
OSRELEASE_PATH = '/etc/os-release'
def _fill_package_set(h, key, pkg_set):
"""
Fill the package set looking for entries at the current registry
node childs.
Any valid node child must have "DisplayVersion" or "DisplayName" keys.
"""
childs = h.node_children(key)
valid_childs = [h.node_get_child(key, h.node_name(child))
for child in childs
for value in h.node_values(child) if h.value_key(value) == 'DisplayVersion']
for ch in valid_childs:
name = h.value_string(h.node_get_value(ch, 'DisplayName'))
value = h.node_get_value(ch, 'DisplayVersion')
version = h.value_string(value)
pkg = Package(name, version)
pkg_set.add(pkg)
def _fill_package_set_1(h, pkg_set):
"""
Looks for entries in registry path
/Microsoft/Windows/CurrentVersion/Uninstall
Fills the given set with Package instances for each program found.
"""
key = h.root()
key = h.node_get_child(key, 'Microsoft')
key = h.node_get_child(key, 'Windows')
key = h.node_get_child(key, 'CurrentVersion')
key = h.node_get_child(key, 'Uninstall')
_fill_package_set(h, key, pkg_set)
def _fill_package_set_2(h, pkg_set):
"""
Looks for entries in registry path
/Wow6432Node/Microsoft/Windows/CurrentVersion/Uninstall
64 bit Windows only.
Fills the given set with Package instances for each program found.
"""
key = h.root()
key = h.node_get_child(key, 'Wow6432Node')
key = h.node_get_child(key, 'Microsoft')
key = h.node_get_child(key, 'Windows')
key = h.node_get_child(key, 'CurrentVersion')
key = h.node_get_child(key, 'Uninstall')
_fill_package_set(h, key, pkg_set)
def _get_package_set_windows(hivepath):
packages = set()
h = hivex.Hivex(hivepath)
_fill_package_set_1(h, packages)
_fill_package_set_2(h, packages)
return packages
def _get_package_set_dpkg(dpkg_status_path):
regex_pkg = '(?:^Package: )(?P<name>.*)\n(Essential:.*\n)?(?:Status: install ok installed)'
regex_ver = '(?:^Version: )(?P<version>.*)'
packages = set()
with open(dpkg_status_path, 'r') as f:
# Split by empty line
for par in re.split('^\n+', f.read(), flags=re.MULTILINE):
# Search for package with "Status: install ok installed"
result = re.search(regex_pkg, par)
if result is None:
continue
else:
pkg_name = result.group('name')
# If we hit a properly installed package, search for its version
result = re.search(regex_ver, par, flags=re.MULTILINE)
if result is None:
continue
else:
pkg_version = result.group('version')
pkg = Package(pkg_name, pkg_version)
packages.add(pkg)
return packages
def get_package_set(mountpoint):
dpkg_status_path = f'{mountpoint}{DPKG_STATUS_PATH}'
winreghives = f'{mountpoint}{WINDOWS_HIVES_PATH}'
osrelease = f'{mountpoint}{OSRELEASE_PATH}'
softwarehive = f'{mountpoint}{WINDOWS_HIVES_SOFTWARE}'
if os.path.exists(softwarehive):
pkgset = _get_package_set_windows(softwarehive)
osname = getwindowsversion(winreghives)
elif os.path.exists(dpkg_status_path):
pkgset = _get_package_set_dpkg(dpkg_status_path)
osname = getlinuxversion(osrelease)
else:
raise ValueError(f'Cannot fetch software inventory at {mountpoint}')
# Legacy software inventory first element is the OS name
return [osname] + list(pkgset)