source: ogAgent-Git/src/opengnsys/ipc.py @ 324ffda

configure-ptt-chedecorare-oglive-methodsejecutarscript-b64fix-cfg2objfixes-winlgromero-filebeatmainno-ptt-paramogcore1oglogoglog2override-moduleping1ping2ping3ping4report-progresstls
Last change on this file since 324ffda was b84ab33, checked in by Natalia Serrano <natalia.serrano@…>, 11 months ago

refs #247 migrate agent from py2 & qt4/qt5 to py3 & qt6

  • Update installation document
  • No longer create rpm linux packages
  • Change deb maintainer from one person to one team
  • Remove stray debhelper files
  • Filter more stuff in .gitignore
  • Property mode set to 100644
File size: 15.7 KB
Line 
1# -*- coding: utf-8 -*-
2#
3# Copyright (c) 2014 Virtual Cable S.L.
4# All rights reserved.
5#
6# Redistribution and use in source and binary forms, with or without modification,
7# are permitted provided that the following conditions are met:
8#
9#    * Redistributions of source code must retain the above copyright notice,
10#      this list of conditions and the following disclaimer.
11#    * Redistributions in binary form must reproduce the above copyright notice,
12#      this list of conditions and the following disclaimer in the documentation
13#      and/or other materials provided with the distribution.
14#    * Neither the name of Virtual Cable S.L. nor the names of its contributors
15#      may be used to endorse or promote products derived from this software
16#      without specific prior written permission.
17#
18# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
21# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
22# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
24# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
25# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
26# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
27# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28"""
29@author: Adolfo Gómez, dkmaster at dkmon dot com
30"""
31
32
33import json
34import queue
35import socket
36import threading
37import traceback
38
39from opengnsys.utils import toUnicode
40from opengnsys.log import logger
41
42# The IPC Server will wait for connections from clients
43# Clients will open socket, and wait for data from server
44# The messages sent (from server) will be the following (subject to future changes):
45#     Message_id     Data               Action
46#    ------------  --------         --------------------------
47#    MSG_LOGOFF     None            Logout user from session
48#    MSG_MESSAGE    message,level   Display a message with level (INFO, WARN, ERROR, FATAL)     # TODO: Include level, right now only has message
49#    MSG_POPUP      title,message   Display a popup box with a title
50#    MSG_SCRIPT     python script   Execute an specific python script INSIDE CLIENT environment (this messages is not sent right now)
51# The messages received (sent from client) will be the following:
52#     Message_id       Data               Action
53#    ------------    --------         --------------------------
54#    REQ_LOGOUT                   Logout user from session
55#    REQ_INFORMATION  None            Request information from ipc server (maybe configuration parameters in a near future)
56#    REQ_LOGIN        python script   Execute an specific python script INSIDE CLIENT environment (this messages is not sent right now)
57#
58# All messages are in the form:
59# BYTE
60#  0           1-2                        3 4 ...
61# MSG_ID   DATA_LENGTH (little endian)    Data (can be 0 length)
62# With a previous "MAGIC" header in front of each message
63
64# Client messages
65MSG_LOGOFF = 0xA1  # Request log off from an user
66MSG_MESSAGE = 0xB2
67MSG_POPUP = 0xB3
68MSG_SCRIPT = 0xC3
69
70# Request messages
71REQ_MESSAGE = 0xD4
72REQ_POPUP = 0xD5
73REQ_LOGIN = 0xE5
74REQ_LOGOUT = 0xF6
75
76# Reverse msgs dict for debugging
77REV_DICT = {
78    MSG_LOGOFF: 'MSG_LOGOFF',
79    MSG_MESSAGE: 'MSG_MESSAGE',
80    MSG_POPUP: 'MSG_POPUP',
81    MSG_SCRIPT: 'MSG_SCRIPT',
82    REQ_LOGIN: 'REQ_LOGIN',
83    REQ_LOGOUT: 'REQ_LOGOUT',
84    REQ_MESSAGE: 'REQ_MESSAGE'
85}
86
87MAGIC = b'\x4F\x47\x41\x00'  # OGA in hex with a padded 0 to the right
88
89
90# States for client processor
91ST_SECOND_BYTE = 0x01
92ST_RECEIVING = 0x02
93ST_PROCESS_MESSAGE = 0x02
94
95
96class ClientProcessor(threading.Thread):
97    def __init__(self, parent, clientSocket):
98        super(self.__class__, self).__init__()
99        self.parent = parent
100        self.clientSocket = clientSocket
101        self.running = False
102        self.messages = queue.Queue(32)
103
104    def stop(self):
105        logger.debug('Stopping client processor')
106        self.running = False
107
108    def processRequest(self, msg, data):
109        logger.debug('Got Client message {}={}'.format(msg, REV_DICT.get(msg)))
110        if self.parent.clientMessageProcessor is not None:
111            self.parent.clientMessageProcessor(msg, data)
112
113    def run(self):
114        self.running = True
115        self.clientSocket.setblocking(0)
116
117        state = None
118        recv_msg = None
119        recv_data = None
120        msg_len = 0
121        while self.running:
122            try:
123                counter = 1024
124                while counter > 0:  # So we process at least the incoming queue every XX bytes readed
125                    counter -= 1
126                    b = self.clientSocket.recv(1)
127                    if b == b'':
128                        # Client disconnected
129                        self.running = False
130                        break
131                    buf = int.from_bytes(b, 'big')  # Empty buffer, this is set as non-blocking
132                    if state is None:
133                        if buf in (REQ_MESSAGE, REQ_LOGIN, REQ_LOGOUT):
134                            logger.debug('State set to {}'.format(buf))
135                            state = buf
136                            recv_msg = buf
137                            continue  # Get next byte
138                        else:
139                            logger.debug('Got unexpected data {}'.format(buf))
140                    elif state in (REQ_MESSAGE, REQ_LOGIN, REQ_LOGOUT):
141                        logger.debug('First length byte is {}'.format(buf))
142                        msg_len = buf
143                        state = ST_SECOND_BYTE
144                        continue
145                    elif state == ST_SECOND_BYTE:
146                        msg_len += buf << 8
147                        logger.debug('Second length byte is {}, len is {}'.format(buf, msg_len))
148                        if msg_len == 0:
149                            self.processRequest(recv_msg, None)
150                            state = None
151                            break
152                        state = ST_RECEIVING
153                        recv_data = b''
154                        continue
155                    elif state == ST_RECEIVING:
156                        recv_data += bytes([buf])
157                        msg_len -= 1
158                        if msg_len == 0:
159                            self.processRequest(recv_msg, recv_data)
160                            recv_data = None
161                            state = None
162                            break
163                    else:
164                        logger.debug('Got invalid message from request: {}, state: {}'.format(buf, state))
165            except socket.error as e:
166                # If no data is present, no problem at all, pass to check messages
167                pass
168            except Exception as e:
169                tb = traceback.format_exc()
170                logger.error('Error: {}, trace: {}'.format(e, tb))
171
172            if self.running is False:
173                break
174
175            try:
176                msg = self.messages.get(block=True, timeout=1)
177            except queue.Empty:  # No message got in time @UndefinedVariable
178                continue
179
180            logger.debug('Got message {}={}'.format(msg, REV_DICT.get(msg[0])))
181
182            try:
183                m = msg[1] if msg[1] is not None else b''
184                l = len(m)
185                data = MAGIC + bytes([msg[0]]) + bytes([l & 0xFF]) + bytes([l >> 8]) + m
186                try:
187                    self.clientSocket.sendall(data)
188                except socket.error as e:
189                    # Send data error
190                    logger.debug('Socket connection is no more available: {}'.format(e.args))
191                    self.running = False
192            except Exception as e:
193                logger.error('Invalid message in queue: {}'.format(e))
194
195        logger.debug('Client processor stopped')
196        try:
197            self.clientSocket.close()
198        except Exception:
199            pass  # If can't close, nothing happens, just end thread
200
201
202class ServerIPC(threading.Thread):
203
204    def __init__(self, listenPort, clientMessageProcessor=None):
205        super(self.__class__, self).__init__()
206        self.port = listenPort
207        self.running = False
208        self.serverSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
209        self.serverSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
210        self.threads = []
211        self.clientMessageProcessor = clientMessageProcessor
212
213    def stop(self):
214        logger.debug('Stopping Server IPC')
215        self.running = False
216        for t in self.threads:
217            t.stop()
218        socket.socket(socket.AF_INET, socket.SOCK_STREAM).connect(('localhost', self.port))
219        self.serverSocket.close()
220
221        for t in self.threads:
222            t.join()
223
224    def sendMessage(self, msg_id, msg_data):
225        """
226        Notify message to all listening threads
227        """
228        logger.debug('Sending message {}({}),{} to all clients'.format(msg_id, REV_DICT.get(msg_id), msg_data))
229
230        # Convert to bytes so length is correctly calculated
231        if isinstance(msg_data, str):
232            msg_data = str.encode(msg_data)
233
234        for t in self.threads:
235            if t.is_alive():
236                logger.debug('Sending to {}'.format(t))
237                t.messages.put((msg_id, msg_data))
238
239    def sendLoggofMessage(self):
240        self.sendMessage(MSG_LOGOFF, '')
241
242    def sendMessageMessage(self, message):
243        self.sendMessage(MSG_MESSAGE, message)
244
245    def sendPopupMessage(self, title, message):
246        self.sendMessage(MSG_POPUP, {'title': title, 'message': message})
247
248    def sendScriptMessage(self, script):
249        self.sendMessage(MSG_SCRIPT, script)
250
251    def cleanupFinishedThreads(self):
252        """
253        Cleans up current threads list
254        """
255        aliveThreads = []
256        for t in self.threads:
257            if t.is_alive():
258                logger.debug('Thread {} is alive'.format(t))
259                aliveThreads.append(t)
260        self.threads[:] = aliveThreads
261
262    def run(self):
263        self.running = True
264
265        self.serverSocket.bind(('localhost', self.port))
266        self.serverSocket.setblocking(True)
267        self.serverSocket.listen(4)
268
269        while True:
270            try:
271                (clientSocket, address) = self.serverSocket.accept()
272                # Stop processing if thread is mean to stop
273                if self.running is False:
274                    break
275                logger.debug('Got connection from {}'.format(address))
276
277                self.cleanupFinishedThreads()  # House keeping
278
279                logger.debug('Starting new thread, current: {}'.format(self.threads))
280                t = ClientProcessor(self, clientSocket)
281                self.threads.append(t)
282                t.start()
283            except Exception as e:
284                logger.error('Got an exception on Server ipc thread: {}'.format(e))
285
286
287class ClientIPC(threading.Thread):
288    def __init__(self, listenPort):
289        super(ClientIPC, self).__init__()
290        self.port = listenPort
291        self.running = False
292        self.clientSocket = None
293        self.messages = queue.Queue(32)  # @UndefinedVariable
294
295        self.connect()
296
297    def stop(self):
298        self.running = False
299
300    def getMessage(self):
301        while self.running:
302            try:
303                return self.messages.get(timeout=1)
304            except queue.Empty:
305                continue
306
307        return None
308
309    def sendRequestMessage(self, msg, data=None):
310        logger.debug('Sending request for msg: {}({}), {}'.format(msg, REV_DICT.get(msg), data))
311        if data is None:
312            data = b''
313
314        if isinstance(data, str):
315            data = str.encode(data)
316
317        l = len(data)
318        msg = bytes([msg]) + bytes([l & 0xFF]) + bytes([l >> 8]) + data
319        self.clientSocket.sendall(msg)
320
321    def sendLogin(self, user_data):
322        self.sendRequestMessage(REQ_LOGIN, ','.join(user_data))
323
324    def sendLogout(self, username):
325        self.sendRequestMessage(REQ_LOGOUT, username)
326
327    def sendMessage(self, module, message, data=None):
328        """
329        Sends a message "message" with data (data will be encoded as json, so ensure that it is serializable)
330        @param module: Module that will receive this message
331        @param message: Message to send. This message is "customized", and understand by modules
332        @param data: Data to be send as message companion
333        """
334        msg = '\0'.join((module, message, json.dumps(data)))
335        self.sendRequestMessage(REQ_MESSAGE, msg)
336
337    def messageReceived(self):
338        """
339        Override this method to automatically get notified on new message
340        received. Message is at self.messages queue
341        """
342        pass
343
344    def receiveBytes(self, number):
345        msg = b''
346        while self.running and len(msg) < number:
347            try:
348                buf = self.clientSocket.recv(number - len(msg))
349                if buf == b'':
350                    logger.debug('Buf {}, msg {}({})'.format(buf, msg, REV_DICT.get(msg)))
351                    self.running = False
352                    break
353                msg += buf
354            except socket.timeout:
355                pass
356
357        if self.running is False:
358            logger.debug('Not running, returning None')
359            return None
360        return msg
361
362    def connect(self):
363        self.clientSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
364        self.clientSocket.connect(('localhost', self.port))
365        self.clientSocket.settimeout(2)  # Static, custom socket timeout of 2 seconds for local connection (no network)
366
367    def run(self):
368        self.running = True
369
370        while self.running:
371            try:
372                msg = b''
373                # We look for magic message header
374                while self.running:  # Wait for MAGIC
375                    try:
376                        buf = self.clientSocket.recv(len(MAGIC) - len(msg))
377                        if buf == b'':
378                            self.running = False
379                            break
380                        msg += buf
381                        if len(msg) != len(MAGIC):
382                            continue  # Do not have message
383                        if msg != MAGIC:  # Skip first byte an continue searchong
384                            msg = msg[1:]
385                            continue
386                        break
387                    except socket.timeout:  # Timeout is here so we can get stop thread
388                        continue
389
390                if self.running is False:
391                    break
392
393                # Now we get message basic data (msg + datalen)
394                msg = bytearray(self.receiveBytes(3))
395
396                # We have the magic header, here comes the message itself
397                if msg is None:
398                    continue
399
400                msgId = msg[0]
401                dataLen = msg[1] + (msg[2] << 8)
402                if msgId not in (MSG_LOGOFF, MSG_MESSAGE, MSG_SCRIPT):
403                    raise Exception('Invalid message id: {}'.format(msgId))
404
405                data = self.receiveBytes(dataLen)
406                if data is None:
407                    continue
408
409                self.messages.put((msgId, data))
410                self.messageReceived()
411
412            except socket.error as e:
413                logger.error('Communication with server got an error: {}'.format(toUnicode(e.strerror)))
414                self.running = False
415                return
416            except Exception as e:
417                tb = traceback.format_exc()
418                logger.error('Error: {}, trace: {}'.format(e, tb))
419
420        try:
421            self.clientSocket.close()
422        except Exception:
423            pass  # If can't close, nothing happens, just end thread
Note: See TracBrowser for help on using the repository browser.