source: ogAgent-Git/src/opengnsys/ipc.py @ 9525724

decorare-oglive-methodsfixes-winlgromero-filebeatmainmodulesnew-browserno-ptt-paramogadmcliogadmclient-statusogagent-jobsogagent-macosogcore1oglogoglog2override-moduleping1ping2ping3ping4py3-winpython3report-progresstlsunification2unification3versionswindows-fixes
Last change on this file since 9525724 was 53e7d45, checked in by Ramón M. Gómez <ramongomez@…>, 5 years ago

#940: Run 2to3 on OGAgent source code

Result after running the command: 2to3 -w ogagent/src

  • Property mode set to 100644
File size: 15.8 KB
Line 
1# -*- coding: utf-8 -*-
2#
3# Copyright (c) 2014 Virtual Cable S.L.
4# All rights reserved.
5#
6# Redistribution and use in source and binary forms, with or without modification,
7# are permitted provided that the following conditions are met:
8#
9#    * Redistributions of source code must retain the above copyright notice,
10#      this list of conditions and the following disclaimer.
11#    * Redistributions in binary form must reproduce the above copyright notice,
12#      this list of conditions and the following disclaimer in the documentation
13#      and/or other materials provided with the distribution.
14#    * Neither the name of Virtual Cable S.L. nor the names of its contributors
15#      may be used to endorse or promote products derived from this software
16#      without specific prior written permission.
17#
18# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
21# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
22# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
24# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
25# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
26# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
27# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28'''
29@author: Adolfo Gómez, dkmaster at dkmon dot com
30'''
31
32
33import socket
34import threading
35import six
36import traceback
37import json
38
39from opengnsys.utils import toUnicode
40from opengnsys.log import logger
41
42# The IPC Server will wait for connections from clients
43# Clients will open socket, and wait for data from server
44# The messages sent (from server) will be the following (subject to future changes):
45#     Message_id     Data               Action
46#    ------------  --------         --------------------------
47#    MSG_LOGOFF     None            Logout user from session
48#    MSG_MESSAGE    message,level   Display a message with level (INFO, WARN, ERROR, FATAL)     # TODO: Include level, right now only has message
49#    MSG_POPUP      title,message   Display a popup box with a title
50#    MSG_SCRIPT     python script   Execute an specific python script INSIDE CLIENT environment (this messages is not sent right now)
51# The messages received (sent from client) will be the following:
52#     Message_id       Data               Action
53#    ------------    --------         --------------------------
54#    REQ_LOGOUT                   Logout user from session
55#    REQ_INFORMATION  None            Request information from ipc server (maybe configuration parameters in a near future)
56#    REQ_LOGIN        python script   Execute an specific python script INSIDE CLIENT environment (this messages is not sent right now)
57#
58# All messages are in the form:
59# BYTE
60#  0           1-2                        3 4 ...
61# MSG_ID   DATA_LENGTH (little endian)    Data (can be 0 length)
62# With a previos "MAGIC" header in fron of each message
63
64# Client messages
65MSG_LOGOFF = 0xA1  # Request log off from an user
66MSG_MESSAGE = 0xB2
67MSG_POPUP = 0xB3
68MSG_SCRIPT = 0xC3
69
70# Request messages
71REQ_MESSAGE = 0xD4
72REQ_POPUP = 0xD5
73REQ_LOGIN = 0xE5
74REQ_LOGOUT = 0xF6
75
76# Reverse msgs dict for debugging
77REV_DICT = {
78    MSG_LOGOFF: 'MSG_LOGOFF',
79    MSG_MESSAGE: 'MSG_MESSAGE',
80    MSG_POPUP: 'MSG_POPUP',
81    MSG_SCRIPT: 'MSG_SCRIPT',
82    REQ_LOGIN: 'REQ_LOGIN',
83    REQ_LOGOUT: 'REQ_LOGOUT',
84    REQ_MESSAGE: 'REQ_MESSAGE'
85}
86
87MAGIC = b'\x4F\x47\x41\x00'  # OGA in hexa with a padded 0 to the right
88
89
90# States for client processor
91ST_SECOND_BYTE = 0x01
92ST_RECEIVING = 0x02
93ST_PROCESS_MESSAGE = 0x02
94
95
96class ClientProcessor(threading.Thread):
97    def __init__(self, parent, clientSocket):
98        super(self.__class__, self).__init__()
99        self.parent = parent
100        self.clientSocket = clientSocket
101        self.running = False
102        self.messages = six.moves.queue.Queue(32)  # @UndefinedVariable
103
104    def stop(self):
105        logger.debug('Stoping client processor')
106        self.running = False
107
108    def processRequest(self, msg, data):
109        logger.debug('Got Client message {}={}'.format(msg, REV_DICT.get(msg)))
110        if self.parent.clientMessageProcessor is not None:
111            self.parent.clientMessageProcessor(msg, data)
112
113    def run(self):
114        self.running = True
115        self.clientSocket.setblocking(0)
116
117        state = None
118        recv_msg = None
119        recv_data = None
120        while self.running:
121            try:
122                counter = 1024
123                while counter > 0:  # So we process at least the incoming queue every XX bytes readed
124                    counter -= 1
125                    b = self.clientSocket.recv(1)
126                    if b == b'':
127                        # Client disconnected
128                        self.running = False
129                        break
130                    buf = six.byte2int(b)  # Empty buffer, this is set as non-blocking
131                    if state is None:
132                        if buf in (REQ_MESSAGE, REQ_LOGIN, REQ_LOGOUT):
133                            logger.debug('State set to {}'.format(buf))
134                            state = buf
135                            recv_msg = buf
136                            continue  # Get next byte
137                        else:
138                            logger.debug('Got unexpected data {}'.format(buf))
139                    elif state in (REQ_MESSAGE, REQ_LOGIN, REQ_LOGOUT):
140                        logger.debug('First length byte is {}'.format(buf))
141                        msg_len = buf
142                        state = ST_SECOND_BYTE
143                        continue
144                    elif state == ST_SECOND_BYTE:
145                        msg_len += buf << 8
146                        logger.debug('Second length byte is {}, len is {}'.format(buf, msg_len))
147                        if msg_len == 0:
148                            self.processRequest(recv_msg, None)
149                            state = None
150                            break
151                        state = ST_RECEIVING
152                        recv_data = b''
153                        continue
154                    elif state == ST_RECEIVING:
155                        recv_data += six.int2byte(buf)
156                        msg_len -= 1
157                        if msg_len == 0:
158                            self.processRequest(recv_msg, recv_data)
159                            recv_data = None
160                            state = None
161                            break
162                    else:
163                        logger.debug('Got invalid message from request: {}, state: {}'.format(buf, state))
164            except socket.error as e:
165                # If no data is present, no problem at all, pass to check messages
166                pass
167            except Exception as e:
168                tb = traceback.format_exc()
169                logger.error('Error: {}, trace: {}'.format(e, tb))
170
171            if self.running is False:
172                break
173
174            try:
175                msg = self.messages.get(block=True, timeout=1)
176            except six.moves.queue.Empty:  # No message got in time @UndefinedVariable
177                continue
178
179            logger.debug('Got message {}={}'.format(msg, REV_DICT.get(msg[0])))
180
181            try:
182                m = msg[1] if msg[1] is not None else b''
183                l = len(m)
184                data = MAGIC + six.int2byte(msg[0]) + six.int2byte(l & 0xFF) + six.int2byte(l >> 8) + m
185                try:
186                    self.clientSocket.sendall(data)
187                except socket.error as e:
188                    # Send data error
189                    logger.debug('Socket connection is no more available: {}'.format(e.args))
190                    self.running = False
191            except Exception as e:
192                logger.error('Invalid message in queue: {}'.format(e))
193
194        logger.debug('Client processor stopped')
195        try:
196            self.clientSocket.close()
197        except Exception:
198            pass  # If can't close, nothing happens, just end thread
199
200
201class ServerIPC(threading.Thread):
202
203    def __init__(self, listenPort, clientMessageProcessor=None):
204        super(self.__class__, self).__init__()
205        self.port = listenPort
206        self.running = False
207        self.serverSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
208        self.serverSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
209        self.threads = []
210        self.clientMessageProcessor = clientMessageProcessor
211
212    def stop(self):
213        logger.debug('Stopping Server IPC')
214        self.running = False
215        for t in self.threads:
216            t.stop()
217        socket.socket(socket.AF_INET, socket.SOCK_STREAM).connect(('localhost', self.port))
218        self.serverSocket.close()
219
220        for t in self.threads:
221            t.join()
222
223    def sendMessage(self, msgId, msgData):
224        '''
225        Notify message to all listening threads
226        '''
227        logger.debug('Sending message {}({}),{} to all clients'.format(msgId, REV_DICT.get(msgId), msgData))
228
229        # Convert to bytes so length is correctly calculated
230        if isinstance(msgData, six.text_type):
231            msgData = msgData.encode('utf8')
232
233        for t in self.threads:
234            if t.isAlive():
235                logger.debug('Sending to {}'.format(t))
236                t.messages.put((msgId, msgData))
237
238    def sendLoggofMessage(self):
239        self.sendMessage(MSG_LOGOFF, '')
240
241    def sendMessageMessage(self, message):
242        self.sendMessage(MSG_MESSAGE, message)
243
244    def sendPopupMessage(self, title, message):
245        self.sendMessage(MSG_POPUP, {'title':title, 'message':message})
246
247    def sendScriptMessage(self, script):
248        self.sendMessage(MSG_SCRIPT, script)
249
250    def cleanupFinishedThreads(self):
251        '''
252        Cleans up current threads list
253        '''
254        aliveThreads = []
255        for t in self.threads:
256            if t.isAlive():
257                logger.debug('Thread {} is alive'.format(t))
258                aliveThreads.append(t)
259        self.threads[:] = aliveThreads
260
261    def run(self):
262        self.running = True
263
264        self.serverSocket.bind(('localhost', self.port))
265        self.serverSocket.setblocking(1)
266        self.serverSocket.listen(4)
267
268        while True:
269            try:
270                (clientSocket, address) = self.serverSocket.accept()
271                # Stop processing if thread is mean to stop
272                if self.running is False:
273                    break
274                logger.debug('Got connection from {}'.format(address))
275
276                self.cleanupFinishedThreads()  # House keeping
277
278                logger.debug('Starting new thread, current: {}'.format(self.threads))
279                t = ClientProcessor(self, clientSocket)
280                self.threads.append(t)
281                t.start()
282            except Exception as e:
283                logger.error('Got an exception on Server ipc thread: {}'.format(e))
284
285
286class ClientIPC(threading.Thread):
287    def __init__(self, listenPort):
288        super(ClientIPC, self).__init__()
289        self.port = listenPort
290        self.running = False
291        self.clientSocket = None
292        self.messages = six.moves.queue.Queue(32)  # @UndefinedVariable
293
294        self.connect()
295
296    def stop(self):
297        self.running = False
298
299    def getMessage(self):
300        while self.running:
301            try:
302                return self.messages.get(timeout=1)
303            except six.moves.queue.Empty:  # @UndefinedVariable
304                continue
305
306        return None
307
308    def sendRequestMessage(self, msg, data=None):
309        logger.debug('Sending request for msg: {}({}), {}'.format(msg, REV_DICT.get(msg), data))
310        if data is None:
311            data = b''
312
313        if isinstance(data, six.text_type):  # Convert to bytes if necessary
314            data = data.encode('utf-8')
315
316        l = len(data)
317        msg = six.int2byte(msg) + six.int2byte(l & 0xFF) + six.int2byte(l >> 8) + data
318        self.clientSocket.sendall(msg)
319
320    def sendLogin(self, username, language):
321        self.sendRequestMessage(REQ_LOGIN, username+','+language)
322
323    def sendLogout(self, username):
324        self.sendRequestMessage(REQ_LOGOUT, username)
325
326    def sendMessage(self, module, message, data=None):
327        '''
328        Sends a message "message" with data (data will be encoded as json, so ensure that it is serializable)
329        @param module: Module that will receive this message
330        @param message: Message to send. This message is "customized", and understand by modules
331        @param data: Data to be send as message companion
332        '''
333        msg = '\0'.join((module, message, json.dumps(data)))
334        self.sendRequestMessage(REQ_MESSAGE, msg)
335
336    def messageReceived(self):
337        '''
338        Override this method to automatically get notified on new message
339        received. Message is at self.messages queue
340        '''
341        pass
342
343    def receiveBytes(self, number):
344        msg = b''
345        while self.running and len(msg) < number:
346            try:
347                buf = self.clientSocket.recv(number - len(msg))
348                if buf == b'':
349                    logger.debug('Buf {}, msg {}({})'.format(buf, msg, REV_DICT.get(msg)))
350                    self.running = False
351                    break
352                msg += buf
353            except socket.timeout:
354                pass
355
356        if self.running is False:
357            logger.debug('Not running, returning None')
358            return None
359        return msg
360
361    def connect(self):
362        self.clientSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
363        self.clientSocket.connect(('localhost', self.port))
364        self.clientSocket.settimeout(2)  # Static, custom socket timeout of 2 seconds for local connection (no network)
365
366    def run(self):
367        self.running = True
368
369        while self.running:
370            try:
371                msg = b''
372                # We look for magic message header
373                while self.running:  # Wait for MAGIC
374                    try:
375                        buf = self.clientSocket.recv(len(MAGIC) - len(msg))
376                        if buf == b'':
377                            self.running = False
378                            break
379                        msg += buf
380                        if len(msg) != len(MAGIC):
381                            continue  # Do not have message
382                        if msg != MAGIC:  # Skip first byte an continue searchong
383                            msg = msg[1:]
384                            continue
385                        break
386                    except socket.timeout:  # Timeout is here so we can get stop thread
387                        continue
388
389                if self.running is False:
390                    break
391
392                # Now we get message basic data (msg + datalen)
393                msg = bytearray(self.receiveBytes(3))
394
395                # We have the magic header, here comes the message itself
396                if msg is None:
397                    continue
398
399                msgId = msg[0]
400                dataLen = msg[1] + (msg[2] << 8)
401                if msgId not in (MSG_LOGOFF, MSG_MESSAGE, MSG_SCRIPT):
402                    raise Exception('Invalid message id: {}'.format(msgId))
403
404                data = self.receiveBytes(dataLen)
405                if data is None:
406                    continue
407
408                self.messages.put((msgId, data))
409                self.messageReceived()
410
411            except socket.error as e:
412                logger.error('Communication with server got an error: {}'.format(toUnicode(e.strerror)))
413                self.running = False
414                return
415            except Exception as e:
416                tb = traceback.format_exc()
417                logger.error('Error: {}, trace: {}'.format(e, tb))
418
419        try:
420            self.clientSocket.close()
421        except Exception:
422            pass  # If can't close, nothing happens, just end thread
423
Note: See TracBrowser for help on using the repository browser.