Add utils modules

* disk.py

	Disk discovery

* fs.py

	Uses psutil to fetch fs usage information

* menu.py

	ogBrowser menu generation

* net.py: gets nic status information

	IP address, MAC address and ethernet speed.

* probe.py: probes mountpoints for operating systems

	Uses hivexget command to try fetching Windows installation
	information.
	Looks for /etc/os-release for probing linux systems.
more_events
Jose M. Guisado 2022-04-18 10:59:35 +02:00
parent 79d3062f81
commit 902e019505
5 changed files with 273 additions and 0 deletions

19
src/utils/disk.py 100644
View File

@ -0,0 +1,19 @@
#
# Copyright (C) 2022 Soleta Networks <info@soleta.eu>
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Affero General Public License as published by the
# Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
import os
def get_disks():
"""
Walks /sys/block/ and returns files starting with 'sd',
'nvme' or 'vd'
"""
return sorted([ dev for dev in os.listdir('/sys/block/')
if dev.startswith('sd')
or dev.startswith('nvme')
or dev.startswith('vd')])

60
src/utils/fs.py 100644
View File

@ -0,0 +1,60 @@
#
# Copyright (C) 2022 Soleta Networks <info@soleta.eu>
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Affero General Public License as published by the
# Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
import os
import subprocess
from subprocess import DEVNULL
import psutil
def mount_mkdir(source, target):
"""
Mounts and creates the mountpoint directory if it's not present.
"""
if not os.path.exists(target):
os.mkdir(target)
if not os.path.ismount(target):
return mount(source, target)
return False
def mount(source, target):
"""
Mounts source into target directoru using mount(8).
Return true if exit code is 0. False otherwise.
"""
cmd = f'mount {source} {target}'
proc = subprocess.run(cmd.split(), stderr=DEVNULL)
return not proc.returncode
def umount(target):
"""
Umounts target using umount(8).
Return true if exit code is 0. False otherwise.
"""
cmd = f'umount {target}'
proc = subprocess.run(cmd.split(), stderr=DEVNULL)
return not proc.returncode
def get_usedperc(mountpoint):
"""
Returns percetage of used filesystem as decimal number.
"""
try:
total, used, free, perc = psutil.disk_usage(mountpoint)
except FileNotFoundError:
return '0'
return str(perc)

58
src/utils/menu.py 100644
View File

@ -0,0 +1,58 @@
#
# Copyright (C) 2022 Soleta Networks <info@soleta.eu>
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Affero General Public License as published by the
# Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
"""
Utility module for ogBrowser menu generation
"""
import os
import socket
from src.utils.net import getifaddr, getifhwaddr, ethtool
MENU_TEMPLATE = """
<div align="center" style="font-family: Arial, Helvetica, sans-serif;">
<p style="color:#999999; font-size: 16px; margin: 2em;">
<table border="1" width="100%">
<tr>
<td rowspan="2"><p align="left"><img border="0" src="../images/iconos/logoopengnsys.png"><p> </td>
<td> Hostname </td> <td> IP </td> <td> MAC </td> <td> SPEED </td> </tr>
<tr> <td>{hostname} </td> <td> {ip} </td> <td> {mac} </td> <td> {speed} </td> </tr>
</table>
</p>
{boot_list}
<p><a href="command:poweroff">Power Off</a></p>
</div>
"""
MENU_OS_TEMPLATE = "<p><a href=\"command:bootOs {diskno} {partno}\">Boot {os} ({diskno}, {partno})</a></p>"
def generate_menu(part_setup):
"""
Writes html menu to /opt/opengnsys/log/{ip}.info.html based on a partition
setup
"""
device = os.getenv('DEVICE')
if not device:
return False
ip = getifaddr(device)
mac = getifhwaddr(device)
speed = ethtool(device)
hostname = socket.gethostname()
menufile = f'/opt/opengnsys/log/{ip}.info.html'
l = [MENU_OS_TEMPLATE.format(diskno=part['disk'], partno=part['partition'], os=part['os'])
for part in part_setup if part['os']]
boot_list = '\n'.join(l)
with open(menufile, 'w') as f:
f.write(MENU_TEMPLATE.format(hostname=hostname, ip=ip, mac=mac, speed=speed, boot_list=boot_list))
return True

53
src/utils/net.py 100644
View File

@ -0,0 +1,53 @@
#
# Copyright (C) 2022 Soleta Networks <info@soleta.eu>
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Affero General Public License as published by the
# Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
import array
import fcntl
import socket
import struct
def ethtool(interface):
try:
ETHTOOL_GSET = 0x00000001
SIOCETHTOOL = 0x8946
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sockfd = sock.fileno()
ecmd = array.array(
"B", struct.pack("I39s", ETHTOOL_GSET, b"\x00" * 39)
)
interface = interface.encode("utf-8")
ifreq = struct.pack("16sP", interface, ecmd.buffer_info()[0])
fcntl.ioctl(sockfd, SIOCETHTOOL, ifreq)
res = ecmd.tobytes()
speed = struct.unpack("12xH29x", res)[0]
except IOError:
speed = 0
finally:
sock.close()
return speed
def getifaddr(device):
"""
"""
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
return socket.inet_ntoa(fcntl.ioctl(
s.fileno(),
0x8915, # SIOCGIFADDR
struct.pack('256s', bytes(device[:15], 'utf-8'))
)[20:24])
def getifhwaddr(device):
"""
"""
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
hwaddr = fcntl.ioctl(
s.fileno(),
0x8927, # SIOCGIFHWADDR
struct.pack('256s', bytes(device[:15], 'utf-8'))
)[18:24]
return "%02x:%02x:%02x:%02x:%02x:%02x" % struct.unpack("BBBBBB", hwaddr)

83
src/utils/probe.py 100644
View File

@ -0,0 +1,83 @@
#
# Copyright (C) 2022 Soleta Networks <info@soleta.eu>
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Affero General Public License as published by the
# Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
import os
import subprocess
from subprocess import PIPE
def getlinuxversion(osrelease):
"""
Parses a os-release file to fetch 'PRETTY_NAME' key.
If file or key are not found, then returns generic
'Linux' string.
"""
with open(osrelease, 'r') as f:
for line in f:
if line.find('=') == -1:
continue
key, value = line.split('=')
if key == 'PRETTY_NAME':
value = value.replace('\n', '')
value = value.strip('"')
return value
return 'Linux'
def getwindowsversion(winreghives):
"""
Tries to obtain windows version information by
querying the SOFTWARE registry hive. Registry
hives path is a required parameter.
Runs hivexget(1) to fetch ProductName and
ReleaseId. If something fails (hivexget is
not installed, or registry is not found) it
returns a generic "Microsoft Windows" string.
"""
# XXX: 3.6 friendly
try:
proc_prodname = subprocess.run(['hivexget',
f'{winreghives}/SOFTWARE',
'microsoft\windows nt\currentversion',
'ProductName'], stdout=PIPE)
proc_releaseid = subprocess.run(['hivexget',
f'{winreghives}/SOFTWARE',
'microsoft\windows nt\currentversion',
'ReleaseId'], stdout=PIPE)
proc_prodname = proc_releaseid.stdout.replace('\n', '')
releaseid = proc_releaseid.stdout.replace('\n', '')
if proc_prodname.returncode == 0 and proc_releaseid.returncode == 0:
return f'{prodname} {releaseid}'
except FileNotFoundError: # hivexget command not found
pass
return 'Microsoft Windows'
def os_probe(mountpoint):
"""
Probes mountpoint for typical OS dependant files.
Windows: Tries finding and querying the software
registry hive.
Linux: Looks for /etc/os-release file.
Returns a string depending on the OS it detects.
"""
winreghives = f'{mountpoint}Windows/System32/config'
osrelease = f'{mountpoint}/etc/os-release'
if os.path.exists(osrelease):
return getlinuxversion(osrelease)
elif os.path.exists(winreghives):
return getwindowsversion(winreghives)
else:
return ''