ogclient/src/kiosk/monitor.py

479 lines
16 KiB
Python

# Copyright (C) 2020-2025 Soleta Networks <info@soleta.eu>
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Affero General Public License as published by the
# Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
from PyQt6.QtWidgets import (
QSizePolicy, QVBoxLayout, QHBoxLayout, QLabel, QWidget, QProgressBar,
QFrame, QTextEdit
)
from PyQt6.QtCore import (
QTimer, Qt, QFileSystemWatcher, QRegularExpression, pyqtSignal
)
from PyQt6.QtGui import (
QTextCharFormat, QColor, QFont, QTextCursor, QSyntaxHighlighter
)
from collections import defaultdict, deque
from src.kiosk.config import *
from src.utils.net import *
import psutil
import socket
import sys
import os
POLLING_INTERVAL = 500
MAX_ITEM_WIDTH = 300
class CPUMonitor(QFrame):
def __init__(self):
super().__init__()
self.setFrameStyle(QFrame.Shape.StyledPanel | QFrame.Shadow.Plain)
frame_layout = QVBoxLayout()
self.setLayout(frame_layout)
self.cpu_label = QLabel()
self.cpu_label.setAlignment(Qt.AlignmentFlag.AlignCenter)
self.cpu_bar = QProgressBar()
self.cpu_bar.setFixedWidth(MAX_ITEM_WIDTH)
title_label = QLabel(self.tr('CPU'))
title_label.setStyleSheet('font-weight: 600;')
title_label.setAlignment(Qt.AlignmentFlag.AlignCenter)
frame_layout.addWidget(title_label)
frame_layout.addWidget(self.cpu_label)
frame_layout.addStretch()
frame_layout.addWidget(self.cpu_bar)
frame_layout.setAlignment(self.cpu_bar, Qt.AlignmentFlag.AlignCenter)
def update_metrics(self):
cpu_percent = psutil.cpu_percent(interval=0)
self.cpu_bar.setValue(int(cpu_percent))
self.cpu_label.setText(self.tr('CPU Usage: {cpu_percent}%')
.format(cpu_percent=cpu_percent))
class MemoryMonitor(QFrame):
def __init__(self):
super().__init__()
self.setFrameStyle(QFrame.Shape.StyledPanel | QFrame.Shadow.Plain)
frame_layout = QVBoxLayout()
self.setLayout(frame_layout)
self.memory_label = QLabel()
self.memory_label.setAlignment(Qt.AlignmentFlag.AlignCenter)
self.memory_bar = QProgressBar()
self.memory_bar.setFixedWidth(MAX_ITEM_WIDTH)
title_label = QLabel(self.tr('MEMORY'))
title_label.setStyleSheet('font-weight: 600;')
title_label.setAlignment(Qt.AlignmentFlag.AlignCenter)
frame_layout.addWidget(title_label)
frame_layout.addWidget(self.memory_label)
frame_layout.addStretch()
frame_layout.addWidget(self.memory_bar)
frame_layout.setAlignment(self.memory_bar, Qt.AlignmentFlag.AlignCenter)
def update_metrics(self):
memory = psutil.virtual_memory()
memory_percent = memory.percent
self.memory_bar.setValue(int(memory_percent))
total_mib = memory.total // (1024**2)
available_mib = memory.available // (1024**2)
buffers_mib = memory.buffers // (1024**2)
cached_mib = memory.cached // (1024**2)
buffcache_mib = buffers_mib + cached_mib
used_mib = total_mib - available_mib
self.memory_label.setText(self.tr(
'Total: {total_mib} MiB\n'
'Used: {used_mib} MiB\n'
'Buff/Cache: {buffcache_mib} MiB'
).format(
total_mib=total_mib,
used_mib=used_mib,
buffcache_mib=buffcache_mib
))
class BaseMonitorMultiWidget(QFrame):
def __init__(self):
super().__init__()
self.setFrameStyle(QFrame.Shape.StyledPanel | QFrame.Shadow.Plain)
self.layout = QVBoxLayout()
self.setLayout(self.layout)
self.title_label = QLabel(self.tr('MONITOR'))
self.title_label.setStyleSheet('font-weight: 600;')
self.title_label.setAlignment(Qt.AlignmentFlag.AlignCenter)
self.layout.addWidget(self.title_label)
self.data_layout = QHBoxLayout()
self.layout.addLayout(self.data_layout)
# Dictionary to track widgets by name
self.widget_map = {}
def update_widgets(self, data):
# Remove widgets that are no longer in the data
existing_names = set(self.widget_map.keys())
current_names = set(data.keys())
for name in existing_names - current_names:
self.remove_widget(name)
# Update or create widgets for current data
for name, info in data.items():
if name in self.widget_map:
self.update_widget(self.widget_map[name], name, info)
else:
self.add_widget(name, info)
def add_widget(self, name, info):
widget = self.create_widget(name, info)
self.widget_map[name] = widget
self.data_layout.addWidget(widget)
def remove_widget(self, name):
widget = self.widget_map.pop(name, None)
if widget:
self.data_layout.removeWidget(widget)
widget.setParent(None)
widget.deleteLater()
def create_widget(self, name, info):
raise NotImplementedError("Subclasses must implement create_widget.")
def update_widget(self, widget, name, info):
raise NotImplementedError("Subclasses must implement update_widget.")
class DiskMonitor(BaseMonitorMultiWidget):
def __init__(self):
super().__init__()
self.title_label.setText(self.tr('DISK USAGE'))
self.prev_disk_io = psutil.disk_io_counters(perdisk=True)
def collect_data(self):
data = {}
current_disk_io = psutil.disk_io_counters(perdisk=True)
# Collect disk usage information
disk_usage = defaultdict(lambda: {'total': 0, 'used': 0})
partitions = psutil.disk_partitions(all=False)
for partition in partitions:
try:
usage = psutil.disk_usage(partition.mountpoint)
device = os.path.basename(partition.device).rstrip('1234567890')
if not (device.startswith('sd') or device.startswith('nvme') or device.startswith('vd')):
continue
disk_usage[device]['total'] += usage.total
disk_usage[device]['used'] += usage.used
except PermissionError:
continue
# Combine usage and I/O statistics
for disk, usage in disk_usage.items():
if disk in self.prev_disk_io and disk in current_disk_io:
read_bytes = current_disk_io[disk].read_bytes - self.prev_disk_io[disk].read_bytes
write_bytes = current_disk_io[disk].write_bytes - self.prev_disk_io[disk].write_bytes
data[disk] = {
'total_bytes': usage['total'],
'used_bytes': usage['used'],
'read_bytes_per_sec': read_bytes / 1024,
'write_bytes_per_sec': write_bytes / 1024,
}
# Update previous disk I/O stats for the next cycle
self.prev_disk_io = current_disk_io
return data
def create_widget(self, name, info) -> QWidget:
widget = QWidget()
widget.setMaximumWidth(MAX_ITEM_WIDTH + 12)
layout = QVBoxLayout()
widget.setLayout(layout)
label = QLabel()
label.setAlignment(Qt.AlignmentFlag.AlignCenter)
layout.addWidget(label)
progress_bar = QProgressBar()
progress_bar.setMaximumWidth(MAX_ITEM_WIDTH)
progress_bar.setMaximum(100)
layout.addWidget(progress_bar)
# Attach data for updates
widget.label = label
widget.progress_bar = progress_bar
return widget
def update_widget(self, widget, name, info):
total_gib = info['total_bytes'] / (1024 ** 3)
used_gib = info['used_bytes'] / (1024 ** 3)
read_kib = info['read_bytes_per_sec']
write_kib = info['write_bytes_per_sec']
widget.label.setText(self.tr(
'Device: /dev/{name}\n'
'Total: {total_gib:.1f} GiB\n'
'Used: {used_gib:.1f} GiB\n'
'Read: {read_kib:6.1f} KiB/s\n'
'Write: {write_kib:6.1f} KiB/s'
).format(
name=name,
total_gib=total_gib,
used_gib=used_gib,
read_kib=read_kib,
write_kib=write_kib
))
usage_percent = (info['used_bytes'] / info['total_bytes']) * 100
widget.progress_bar.setValue(int(usage_percent))
def update_metrics(self):
data = self.collect_data()
self.update_widgets(data)
class NetworkMonitor(BaseMonitorMultiWidget):
def __init__(self):
super().__init__()
self.title_label.setText(self.tr('NETWORK'))
self.prev_net_io = psutil.net_io_counters(pernic=True)
def collect_data(self):
current_net_io = psutil.net_io_counters(pernic=True)
interfaces = psutil.net_if_addrs()
stats = psutil.net_if_stats()
data = {}
for iface, addrs in interfaces.items():
if not stats[iface].isup or iface == 'lo':
continue
if iface in self.prev_net_io and iface in current_net_io:
prev_io = self.prev_net_io[iface]
curr_io = current_net_io[iface]
bytes_recv_rate = (curr_io.bytes_recv - prev_io.bytes_recv) / 1024
bytes_sent_rate = (curr_io.bytes_sent - prev_io.bytes_sent) / 1024
else:
bytes_recv_rate = bytes_sent_rate = 0
ipv4 = ipv6 = 'N/A'
for addr in addrs:
if addr.family == socket.AF_INET:
ipv4 = addr.address
elif addr.family == socket.AF_INET6 and not is_link_local_ipv6(addr.address):
ipv6 = addr.address.split('%')[0]
data[iface] = {
'ipv4': ipv4,
'ipv6': ipv6,
'recv_rate': bytes_recv_rate,
'send_rate': bytes_sent_rate,
}
self.prev_net_io = current_net_io
return data
def create_widget(self, name, info) -> QWidget:
widget = QWidget()
layout = QVBoxLayout()
widget.setLayout(layout)
label = QLabel()
label.setAlignment(Qt.AlignmentFlag.AlignCenter)
layout.addWidget(label)
# Attach data for updates
widget.label = label
widget.progress_bar = None
return widget
def update_widget(self, widget, name, info):
widget.label.setText(self.tr(
'Interface: {name}\n'
'IPv4: {ipv4}\n'
'{recv_rate:6.1f} KiB/s\n'
'{send_rate:6.1f} KiB/s'
).format(
name=name,
ipv4=info['ipv4'],
recv_rate=info['recv_rate'],
send_rate=info['send_rate']
))
def update_metrics(self):
data = self.collect_data()
self.update_widgets(data)
class LogHighlighter(QSyntaxHighlighter):
def __init__(self, parent):
super().__init__(parent)
self.highlight_rules = []
patterns = [
(r'\(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\)', QColor('darkblue'), QFont.Weight.Normal), # Timestamp
(r'\S+[\\/]\S+', QColor('black'), 420),
(r'ogClient', QColor('blue'), QFont.Weight.Normal),
(r'\[INFO\]', QColor('darkgreen'), QFont.Weight.DemiBold),
(r'\[DEBUG\]', QColor('navy'), QFont.Weight.DemiBold),
(r'\[WARNING\]', QColor('orangered'), QFont.Weight.DemiBold),
(r'\[ERROR\]', QColor('red'), QFont.Weight.DemiBold),
]
for pattern, color, weight in patterns:
fmt = QTextCharFormat()
fmt.setForeground(color)
fmt.setFontWeight(weight)
self.highlight_rules.append((QRegularExpression(pattern), fmt))
def highlightBlock(self, text):
for pattern, fmt in self.highlight_rules:
match_iter = pattern.globalMatch(text)
while match_iter.hasNext():
match = match_iter.next()
self.setFormat(match.capturedStart(), match.capturedLength(), fmt)
class LogViewWidget(QFrame):
MAX_LOG_LINES = 300
AUTOSCROLL_THRESHOLD = 50
def __init__(self, log_file_path):
super().__init__()
self.setFrameStyle(QFrame.Shape.StyledPanel | QFrame.Shadow.Plain)
self.log_file_path = log_file_path
self.setMinimumWidth(300)
self.layout = QVBoxLayout()
self.setLayout(self.layout)
title_label = QLabel(self.tr('LOGS'))
title_label.setStyleSheet('font-weight: 600;')
title_label.setAlignment(Qt.AlignmentFlag.AlignCenter)
self.layout.addWidget(title_label)
self.log_display = QTextEdit(self)
self.log_display.setReadOnly(True)
self.highlighter = LogHighlighter(self.log_display.document())
self.layout.addWidget(self.log_display)
self.file_watcher = QFileSystemWatcher(self)
self.file_watcher.fileChanged.connect(self.on_file_changed)
self.file_watcher.addPath(self.log_file_path)
self.user_scrolling = False
self.log_display.verticalScrollBar().valueChanged.connect(self.on_scroll)
self.read_log_file()
self.log_display.moveCursor(self.log_display.textCursor().MoveOperation.End)
def read_last_lines(file_path, num_lines):
with open(file_path, 'r') as log_file:
last_lines = deque(log_file, maxlen=num_lines)
return ''.join(last_lines)
def read_log_file(self):
try:
new_content = LogViewWidget.read_last_lines(
self.log_file_path, self.MAX_LOG_LINES)
except Exception as e:
self.log_display.append(self.tr('Error reading file: {error}').format(error=e))
return
scroll_bar = self.log_display.verticalScrollBar()
previous_value = scroll_bar.value()
scroll_bar.valueChanged.disconnect(self.on_scroll)
# Update the log display
current_content = self.log_display.toPlainText()
if new_content != current_content:
self.log_display.setPlainText(new_content)
# Restore or update scrolling based on user activity
if self.user_scrolling:
scroll_bar.setValue(previous_value)
else:
self.log_display.moveCursor(self.log_display.textCursor().MoveOperation.End)
scroll_bar.valueChanged.connect(self.on_scroll)
def on_scroll(self, value):
scroll_bar = self.log_display.verticalScrollBar()
if value < (scroll_bar.maximum() - self.AUTOSCROLL_THRESHOLD):
self.user_scrolling = True
else:
self.user_scrolling = False
def on_file_changed(self, path):
if path == self.log_file_path:
self.read_log_file()
class MonitorView(QWidget):
command_requested = pyqtSignal(dict)
def request_command(self, payload):
self.data_emitted.emit(payload)
def __init__(self):
super().__init__()
self.layout = QHBoxLayout()
self.monitor_layout = QVBoxLayout()
self.layout0 = QHBoxLayout()
self.monitor_layout.addLayout(self.layout0)
# Memory Usage
self.mem_monitor = MemoryMonitor()
self.layout0.addWidget(self.mem_monitor)
# CPU Usage
self.cpu_monitor = CPUMonitor()
self.layout0.addWidget(self.cpu_monitor)
# Disk Usage
self.disk_monitor = DiskMonitor()
self.monitor_layout.addWidget(self.disk_monitor)
# Network Usage
self.net_monitor = NetworkMonitor()
self.monitor_layout.addWidget(self.net_monitor)
self.monitor_layout.addStretch(1)
self.layout.addLayout(self.monitor_layout)
# Timer to refresh the data
self.timer = QTimer(self)
self.timer.timeout.connect(self.update_metrics)
self.timer.start(POLLING_INTERVAL)
self.update_metrics()
log_widget = LogViewWidget(get_log_path())
self.layout.addWidget(log_widget)
self.setLayout(self.layout)
def update_metrics(self):
self.cpu_monitor.update_metrics()
# Memory usage
self.mem_monitor.update_metrics()
# Disk usage
self.disk_monitor.update_metrics()
# Network usage
self.net_monitor.update_metrics()