ogclient/src/ogClient.py

216 lines
5.9 KiB
Python

#
# Copyright (C) 2020-2024 Soleta Networks <info@soleta.eu>
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Affero General Public License as published by the
# Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
import threading
import errno
import select
import socket
import time
import email
import logging
from io import StringIO
from src.restRequest import *
from src.ogRest import *
from src.log import OgError
from enum import Enum
class State(Enum):
CONNECTING = 0
RECEIVING = 1
FORCE_DISCONNECTED = 2
class ogClient:
OG_PATH = '/opt/opengnsys/'
SESSION_POLL_INTERVAL = 5
EVENT_SOCKET_PORT = 55885
def __init__(self, config):
self.CONFIG = config
self.mode = self.CONFIG['opengnsys']['mode']
if self.mode not in {'virtual', 'live', 'linux', 'windows'}:
raise OgError(f'Invalid ogClient mode: {self.mode}.')
if self.mode in {'linux', 'windows'}:
self.event_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.event_sock.setblocking(0)
self.event_sock.bind(('127.0.0.1', ogClient.EVENT_SOCKET_PORT))
else:
self.event_sock = None
if self.CONFIG['samba']['activate']:
assert('user' in self.CONFIG['samba'])
assert('pass' in self.CONFIG['samba'])
self.ip = self.CONFIG['opengnsys']['ip']
self.port = self.CONFIG['opengnsys']['port']
self.ogrest = ogRest(self.CONFIG)
self.seq = None
self.session_check_thread = threading.Thread(target=self._session_check_loop, daemon=True)
self.session_check_thread.start()
def _session_check_loop(self):
while True:
session_status = self.ogrest.check_interactive_session_change()
if session_status is True:
message = "session start user"
elif session_status is False:
message = "session stop user"
else:
message = None
if message:
self.event_sock.sendto(message.encode('utf-8'),
('127.0.0.1', ogClient.EVENT_SOCKET_PORT))
time.sleep(ogClient.SESSION_POLL_INTERVAL)
def get_socket(self):
return self.sock
def get_event_socket(self):
return self.event_sock
def get_state(self):
return self.state
def send_event_hint(self, message):
try:
event, action, user = message.split(" ")
logging.debug('Sending event: %s, %s, %s', event, action, user)
except:
logging.warning('Error parsing session datagram')
return
if (event != "session" or
action not in ['start', 'stop'] or
not user):
logging.warning('Invalid value in session datagram: %s', message)
payload = jsonBody({'event': event, 'action': action, 'user': user})
response = restResponse(ogResponses.EARLY_HINTS, payload)
self.send(response.get())
logging.debug('Sending event OK')
def cleanup(self):
self.data = ""
self.content_len = 0
self.header_len = 0
self.trailer = False
def connect(self):
logging.debug('Connecting...')
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.setblocking(0)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
self.sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 60)
self.sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 30)
self.sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 4)
self.state = State.CONNECTING
self.cleanup()
try:
self.sock.connect((self.ip, self.port))
except socket.error as err:
if err.errno == errno.EINPROGRESS:
return
elif err.errno == errno.ECONNREFUSED:
return
def send(self, msg):
self.sock.send(bytes(msg, 'utf-8'))
return len(msg)
def connect2(self):
try:
self.sock.connect((self.ip, self.port))
except socket.error as err:
if err.errno == errno.EISCONN:
logging.debug('Connected')
self.state = State.RECEIVING
else:
time.sleep(1)
logging.warning('Connection refused, retrying...')
self.state = State.CONNECTING
self.sock.close()
self.connect()
def receive(self):
try:
data = self.sock.recv(1024).decode('utf-8')
except socket.error as err:
data = ''
logging.warning('Receive failed: %s', str(err))
if len(data) == 0:
self.sock.close()
self.ogrest.kill_process()
self.connect()
return
self.data = self.data + data
request = restRequest()
if not self.trailer:
header_len = self.data.find("\r\n\r\n")
if header_len > 0:
request_line, headers_alone = self.data.split('\n', 1)
headers = email.message_from_file(StringIO(headers_alone))
if 'Content-Length' in headers.keys():
self.content_len = int(headers['Content-Length'])
if 'X-Sequence' in headers.keys():
self.seq = int(headers['X-Sequence'])
self.trailer = True
# Add 4 because self.data.find("\r\n\r\n") does not count
# "\r\n\r\n" for the length
self.header_len = header_len + 4
if self.trailer and (len(self.data) >= self.content_len + self.header_len):
request.parser(self.data)
self.ogrest.process_request(request, self)
self.cleanup()
def disconnect(self):
self.state = State.FORCE_DISCONNECTED
self.sock.shutdown(socket.SHUT_RDWR)
self.sock.close()
def run(self):
while 1:
sock = self.get_socket()
event_sock = self.get_event_socket()
state = self.get_state()
if state == State.CONNECTING:
readset = [ sock ]
writeset = [ sock ]
exceptset = [ sock ]
elif state == State.FORCE_DISCONNECTED:
return 0
else:
readset = [ sock, event_sock ] if event_sock else [ sock ]
writeset = [ ]
exceptset = [ ]
readable, writable, exception = select.select(readset, writeset, exceptset)
if state == State.CONNECTING and sock in writable:
self.connect2()
elif state == State.RECEIVING and sock in readable:
self.receive()
elif state == State.CONNECTING and sock in exception:
self.connect2()
elif state == State.RECEIVING and event_sock in readable:
message = event_sock.recv(4096).decode('utf-8').rstrip()
self.send_event_hint(message)
else:
raise OgError(f'Invalid ogClient run state: {str(state)}.')