ogclient/misc/kiosk_tester.py

78 lines
2.4 KiB
Python

#!/usr/bin/python3
#
# Copyright (C) 2020-2025 Soleta Networks <info@soleta.eu>
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Affero General Public License as published by the
# Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
import os
import sys
import json
import time
import socket
import select
import logging
from src.kiosk.kiosk import launch_kiosk
logging.basicConfig(
level=logging.INFO,
format='(%(asctime)s): [%(levelname)s] - %(message)s', # Optional log format
stream=sys.stdout # Direct the output to stdout
)
def send_event(socket, data):
try:
json_data = json.dumps(data)
logging.info(f'sending to kiosk: {json_data}')
socket.send(json_data.encode('utf-8'))
except TypeError as e:
logging.error(f'Failed to encode data to JSON: {e}')
except Exception as e:
logging.error(f'Unexpected error in send_kiosk_event: {e}')
def test_requests_on_interrupt():
pass
if __name__ == '__main__':
kiosk_sock_pair = socket.socketpair(socket.AF_UNIX, socket.SOCK_DGRAM)
client_socket = kiosk_sock_pair[0]
kiosk_socket = kiosk_sock_pair[1]
try:
kiosk_pid = os.fork()
if kiosk_pid == 0:
logging.info('Launching Kiosk...')
try:
ret = launch_kiosk(socket=kiosk_socket, debug_mode=True)
sys.exit(ret)
except Exception as e:
logging.exception(f"Error during Kiosk execution: {e}")
sys.exit(1)
except Exception as e:
logging.error(f'Could not launch Kiosk: {e}')
raise
try:
logging.info('Parent process: Monitoring Kiosk communication...')
while True:
readable, _, _ = select.select([client_socket], [], [])
if client_socket in readable:
data = client_socket.recv(4096)
if data:
print(f'Received from Kiosk: {data.decode('utf-8')}')
else:
print('Kiosk disconnected')
break
except KeyboardInterrupt:
logging.info('Interrupted by user. Executing test requests')
test_requests_on_interrupt()
except Exception as e:
logging.error(f'Error during communication: {e}')
finally:
send_event(client_socket, {'command': 'close'})
client_socket.close()