source: ogAgent-Git/src/opengnsys/ipc.py @ 0099a05

exec-ogbrowserlog-sess-lenmainoggit 5.1.1
Last change on this file since 0099a05 was b84ab33, checked in by Natalia Serrano <natalia.serrano@…>, 12 months ago

refs #247 migrate agent from py2 & qt4/qt5 to py3 & qt6

  • Update installation document
  • No longer create rpm linux packages
  • Change deb maintainer from one person to one team
  • Remove stray debhelper files
  • Filter more stuff in .gitignore
  • Property mode set to 100644
File size: 15.7 KB
RevLine 
[11f7a07]1# -*- coding: utf-8 -*-
2#
3# Copyright (c) 2014 Virtual Cable S.L.
4# All rights reserved.
5#
6# Redistribution and use in source and binary forms, with or without modification,
7# are permitted provided that the following conditions are met:
8#
9#    * Redistributions of source code must retain the above copyright notice,
10#      this list of conditions and the following disclaimer.
11#    * Redistributions in binary form must reproduce the above copyright notice,
12#      this list of conditions and the following disclaimer in the documentation
13#      and/or other materials provided with the distribution.
14#    * Neither the name of Virtual Cable S.L. nor the names of its contributors
15#      may be used to endorse or promote products derived from this software
16#      without specific prior written permission.
17#
18# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
21# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
22# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
24# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
25# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
26# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
27# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
[e777421]28"""
[11f7a07]29@author: Adolfo Gómez, dkmaster at dkmon dot com
[e777421]30"""
[53e7d45]31
[11f7a07]32
[e777421]33import json
34import queue
[11f7a07]35import socket
36import threading
37import traceback
38
39from opengnsys.utils import toUnicode
40from opengnsys.log import logger
41
42# The IPC Server will wait for connections from clients
43# Clients will open socket, and wait for data from server
44# The messages sent (from server) will be the following (subject to future changes):
45#     Message_id     Data               Action
46#    ------------  --------         --------------------------
47#    MSG_LOGOFF     None            Logout user from session
48#    MSG_MESSAGE    message,level   Display a message with level (INFO, WARN, ERROR, FATAL)     # TODO: Include level, right now only has message
[1deb0d1]49#    MSG_POPUP      title,message   Display a popup box with a title
[11f7a07]50#    MSG_SCRIPT     python script   Execute an specific python script INSIDE CLIENT environment (this messages is not sent right now)
51# The messages received (sent from client) will be the following:
52#     Message_id       Data               Action
53#    ------------    --------         --------------------------
54#    REQ_LOGOUT                   Logout user from session
55#    REQ_INFORMATION  None            Request information from ipc server (maybe configuration parameters in a near future)
56#    REQ_LOGIN        python script   Execute an specific python script INSIDE CLIENT environment (this messages is not sent right now)
57#
58# All messages are in the form:
59# BYTE
60#  0           1-2                        3 4 ...
61# MSG_ID   DATA_LENGTH (little endian)    Data (can be 0 length)
[e777421]62# With a previous "MAGIC" header in front of each message
[11f7a07]63
64# Client messages
65MSG_LOGOFF = 0xA1  # Request log off from an user
66MSG_MESSAGE = 0xB2
[1deb0d1]67MSG_POPUP = 0xB3
[622bc35]68MSG_SCRIPT = 0xC3
[11f7a07]69
70# Request messages
71REQ_MESSAGE = 0xD4
[1deb0d1]72REQ_POPUP = 0xD5
[11f7a07]73REQ_LOGIN = 0xE5
74REQ_LOGOUT = 0xF6
75
76# Reverse msgs dict for debugging
77REV_DICT = {
78    MSG_LOGOFF: 'MSG_LOGOFF',
79    MSG_MESSAGE: 'MSG_MESSAGE',
[1deb0d1]80    MSG_POPUP: 'MSG_POPUP',
[11f7a07]81    MSG_SCRIPT: 'MSG_SCRIPT',
82    REQ_LOGIN: 'REQ_LOGIN',
83    REQ_LOGOUT: 'REQ_LOGOUT',
84    REQ_MESSAGE: 'REQ_MESSAGE'
85}
86
[e777421]87MAGIC = b'\x4F\x47\x41\x00'  # OGA in hex with a padded 0 to the right
[11f7a07]88
89
90# States for client processor
91ST_SECOND_BYTE = 0x01
92ST_RECEIVING = 0x02
93ST_PROCESS_MESSAGE = 0x02
94
95
96class ClientProcessor(threading.Thread):
97    def __init__(self, parent, clientSocket):
98        super(self.__class__, self).__init__()
99        self.parent = parent
100        self.clientSocket = clientSocket
101        self.running = False
[e777421]102        self.messages = queue.Queue(32)
[11f7a07]103
104    def stop(self):
[e777421]105        logger.debug('Stopping client processor')
[11f7a07]106        self.running = False
107
108    def processRequest(self, msg, data):
109        logger.debug('Got Client message {}={}'.format(msg, REV_DICT.get(msg)))
110        if self.parent.clientMessageProcessor is not None:
111            self.parent.clientMessageProcessor(msg, data)
112
113    def run(self):
114        self.running = True
115        self.clientSocket.setblocking(0)
116
117        state = None
118        recv_msg = None
119        recv_data = None
[e777421]120        msg_len = 0
[11f7a07]121        while self.running:
122            try:
123                counter = 1024
124                while counter > 0:  # So we process at least the incoming queue every XX bytes readed
125                    counter -= 1
126                    b = self.clientSocket.recv(1)
127                    if b == b'':
128                        # Client disconnected
129                        self.running = False
130                        break
[b21ea07]131                    buf = int.from_bytes(b, 'big')  # Empty buffer, this is set as non-blocking
[11f7a07]132                    if state is None:
133                        if buf in (REQ_MESSAGE, REQ_LOGIN, REQ_LOGOUT):
134                            logger.debug('State set to {}'.format(buf))
135                            state = buf
136                            recv_msg = buf
137                            continue  # Get next byte
138                        else:
139                            logger.debug('Got unexpected data {}'.format(buf))
140                    elif state in (REQ_MESSAGE, REQ_LOGIN, REQ_LOGOUT):
141                        logger.debug('First length byte is {}'.format(buf))
142                        msg_len = buf
143                        state = ST_SECOND_BYTE
144                        continue
145                    elif state == ST_SECOND_BYTE:
146                        msg_len += buf << 8
147                        logger.debug('Second length byte is {}, len is {}'.format(buf, msg_len))
148                        if msg_len == 0:
149                            self.processRequest(recv_msg, None)
150                            state = None
151                            break
152                        state = ST_RECEIVING
153                        recv_data = b''
154                        continue
155                    elif state == ST_RECEIVING:
[b21ea07]156                        recv_data += bytes([buf])
[11f7a07]157                        msg_len -= 1
158                        if msg_len == 0:
159                            self.processRequest(recv_msg, recv_data)
160                            recv_data = None
161                            state = None
162                            break
163                    else:
164                        logger.debug('Got invalid message from request: {}, state: {}'.format(buf, state))
165            except socket.error as e:
166                # If no data is present, no problem at all, pass to check messages
167                pass
168            except Exception as e:
169                tb = traceback.format_exc()
170                logger.error('Error: {}, trace: {}'.format(e, tb))
171
172            if self.running is False:
173                break
174
175            try:
176                msg = self.messages.get(block=True, timeout=1)
[e777421]177            except queue.Empty:  # No message got in time @UndefinedVariable
[11f7a07]178                continue
179
180            logger.debug('Got message {}={}'.format(msg, REV_DICT.get(msg[0])))
181
182            try:
183                m = msg[1] if msg[1] is not None else b''
184                l = len(m)
[64c933f]185                data = MAGIC + bytes([msg[0]]) + bytes([l & 0xFF]) + bytes([l >> 8]) + m
[11f7a07]186                try:
187                    self.clientSocket.sendall(data)
188                except socket.error as e:
189                    # Send data error
190                    logger.debug('Socket connection is no more available: {}'.format(e.args))
191                    self.running = False
192            except Exception as e:
193                logger.error('Invalid message in queue: {}'.format(e))
194
195        logger.debug('Client processor stopped')
196        try:
197            self.clientSocket.close()
198        except Exception:
199            pass  # If can't close, nothing happens, just end thread
200
201
202class ServerIPC(threading.Thread):
203
204    def __init__(self, listenPort, clientMessageProcessor=None):
205        super(self.__class__, self).__init__()
206        self.port = listenPort
207        self.running = False
208        self.serverSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
209        self.serverSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
210        self.threads = []
211        self.clientMessageProcessor = clientMessageProcessor
212
213    def stop(self):
214        logger.debug('Stopping Server IPC')
215        self.running = False
216        for t in self.threads:
217            t.stop()
218        socket.socket(socket.AF_INET, socket.SOCK_STREAM).connect(('localhost', self.port))
219        self.serverSocket.close()
220
221        for t in self.threads:
222            t.join()
223
[e777421]224    def sendMessage(self, msg_id, msg_data):
225        """
[11f7a07]226        Notify message to all listening threads
[e777421]227        """
228        logger.debug('Sending message {}({}),{} to all clients'.format(msg_id, REV_DICT.get(msg_id), msg_data))
[11f7a07]229
230        # Convert to bytes so length is correctly calculated
[e777421]231        if isinstance(msg_data, str):
232            msg_data = str.encode(msg_data)
[11f7a07]233
234        for t in self.threads:
[b84ab33]235            if t.is_alive():
[11f7a07]236                logger.debug('Sending to {}'.format(t))
[e777421]237                t.messages.put((msg_id, msg_data))
[11f7a07]238
239    def sendLoggofMessage(self):
240        self.sendMessage(MSG_LOGOFF, '')
241
242    def sendMessageMessage(self, message):
243        self.sendMessage(MSG_MESSAGE, message)
244
[1deb0d1]245    def sendPopupMessage(self, title, message):
[e777421]246        self.sendMessage(MSG_POPUP, {'title': title, 'message': message})
[1deb0d1]247
[11f7a07]248    def sendScriptMessage(self, script):
249        self.sendMessage(MSG_SCRIPT, script)
250
251    def cleanupFinishedThreads(self):
[e777421]252        """
[11f7a07]253        Cleans up current threads list
[e777421]254        """
[11f7a07]255        aliveThreads = []
256        for t in self.threads:
[b84ab33]257            if t.is_alive():
[11f7a07]258                logger.debug('Thread {} is alive'.format(t))
259                aliveThreads.append(t)
260        self.threads[:] = aliveThreads
261
262    def run(self):
263        self.running = True
264
265        self.serverSocket.bind(('localhost', self.port))
[e777421]266        self.serverSocket.setblocking(True)
[11f7a07]267        self.serverSocket.listen(4)
268
269        while True:
270            try:
271                (clientSocket, address) = self.serverSocket.accept()
272                # Stop processing if thread is mean to stop
273                if self.running is False:
274                    break
275                logger.debug('Got connection from {}'.format(address))
276
277                self.cleanupFinishedThreads()  # House keeping
278
279                logger.debug('Starting new thread, current: {}'.format(self.threads))
280                t = ClientProcessor(self, clientSocket)
281                self.threads.append(t)
282                t.start()
283            except Exception as e:
284                logger.error('Got an exception on Server ipc thread: {}'.format(e))
285
286
287class ClientIPC(threading.Thread):
288    def __init__(self, listenPort):
289        super(ClientIPC, self).__init__()
290        self.port = listenPort
291        self.running = False
292        self.clientSocket = None
[e777421]293        self.messages = queue.Queue(32)  # @UndefinedVariable
[11f7a07]294
295        self.connect()
296
297    def stop(self):
298        self.running = False
299
300    def getMessage(self):
301        while self.running:
302            try:
303                return self.messages.get(timeout=1)
[e777421]304            except queue.Empty:
[11f7a07]305                continue
306
307        return None
308
309    def sendRequestMessage(self, msg, data=None):
310        logger.debug('Sending request for msg: {}({}), {}'.format(msg, REV_DICT.get(msg), data))
311        if data is None:
312            data = b''
313
[e777421]314        if isinstance(data, str):
315            data = str.encode(data)
[11f7a07]316
317        l = len(data)
[b21ea07]318        msg = bytes([msg]) + bytes([l & 0xFF]) + bytes([l >> 8]) + data
[11f7a07]319        self.clientSocket.sendall(msg)
320
[be263c6]321    def sendLogin(self, user_data):
322        self.sendRequestMessage(REQ_LOGIN, ','.join(user_data))
[11f7a07]323
324    def sendLogout(self, username):
325        self.sendRequestMessage(REQ_LOGOUT, username)
[622bc35]326
[11f7a07]327    def sendMessage(self, module, message, data=None):
[e777421]328        """
[11f7a07]329        Sends a message "message" with data (data will be encoded as json, so ensure that it is serializable)
330        @param module: Module that will receive this message
331        @param message: Message to send. This message is "customized", and understand by modules
332        @param data: Data to be send as message companion
[e777421]333        """
[11f7a07]334        msg = '\0'.join((module, message, json.dumps(data)))
335        self.sendRequestMessage(REQ_MESSAGE, msg)
336
337    def messageReceived(self):
[e777421]338        """
[11f7a07]339        Override this method to automatically get notified on new message
340        received. Message is at self.messages queue
[e777421]341        """
[11f7a07]342        pass
343
344    def receiveBytes(self, number):
345        msg = b''
346        while self.running and len(msg) < number:
347            try:
348                buf = self.clientSocket.recv(number - len(msg))
349                if buf == b'':
350                    logger.debug('Buf {}, msg {}({})'.format(buf, msg, REV_DICT.get(msg)))
351                    self.running = False
352                    break
353                msg += buf
354            except socket.timeout:
355                pass
356
357        if self.running is False:
358            logger.debug('Not running, returning None')
359            return None
360        return msg
361
362    def connect(self):
363        self.clientSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
364        self.clientSocket.connect(('localhost', self.port))
365        self.clientSocket.settimeout(2)  # Static, custom socket timeout of 2 seconds for local connection (no network)
366
367    def run(self):
368        self.running = True
369
370        while self.running:
371            try:
372                msg = b''
373                # We look for magic message header
374                while self.running:  # Wait for MAGIC
375                    try:
376                        buf = self.clientSocket.recv(len(MAGIC) - len(msg))
377                        if buf == b'':
378                            self.running = False
379                            break
380                        msg += buf
381                        if len(msg) != len(MAGIC):
382                            continue  # Do not have message
383                        if msg != MAGIC:  # Skip first byte an continue searchong
384                            msg = msg[1:]
385                            continue
386                        break
387                    except socket.timeout:  # Timeout is here so we can get stop thread
388                        continue
389
390                if self.running is False:
391                    break
392
393                # Now we get message basic data (msg + datalen)
394                msg = bytearray(self.receiveBytes(3))
395
396                # We have the magic header, here comes the message itself
397                if msg is None:
398                    continue
399
400                msgId = msg[0]
401                dataLen = msg[1] + (msg[2] << 8)
402                if msgId not in (MSG_LOGOFF, MSG_MESSAGE, MSG_SCRIPT):
403                    raise Exception('Invalid message id: {}'.format(msgId))
404
405                data = self.receiveBytes(dataLen)
406                if data is None:
407                    continue
408
409                self.messages.put((msgId, data))
410                self.messageReceived()
411
412            except socket.error as e:
413                logger.error('Communication with server got an error: {}'.format(toUnicode(e.strerror)))
414                self.running = False
415                return
416            except Exception as e:
417                tb = traceback.format_exc()
418                logger.error('Error: {}, trace: {}'.format(e, tb))
419
420        try:
421            self.clientSocket.close()
422        except Exception:
423            pass  # If can't close, nothing happens, just end thread
Note: See TracBrowser for help on using the repository browser.