source: admin/Sources/Clients/ogagent/src/opengnsys/ipc.py @ 062ea34

918-git-images-111dconfigfileconfigure-oglivegit-imageslgromero-new-oglivemainmaint-cronmount-efivarfsmultivmmultivm-ogboot-installerogClonningEngineogboot-installer-jenkinsoglive-ipv6test-python-scriptsticket-301ticket-50ticket-50-oldticket-577ticket-585ticket-611ticket-612ticket-693ticket-700ubu24tplunification2use-local-agent-oglivevarios-instalacion
Last change on this file since 062ea34 was 30492e9, checked in by ramon <ramongomez@…>, 7 years ago

#708: OGAgent notifica el idioma al iniciar sesión de usuario.

git-svn-id: https://opengnsys.es/svn/branches/version1.1@5528 a21b9725-9963-47de-94b9-378ad31fedc9

  • Property mode set to 100644
File size: 15.8 KB
Line 
1# -*- coding: utf-8 -*-
2#
3# Copyright (c) 2014 Virtual Cable S.L.
4# All rights reserved.
5#
6# Redistribution and use in source and binary forms, with or without modification,
7# are permitted provided that the following conditions are met:
8#
9#    * Redistributions of source code must retain the above copyright notice,
10#      this list of conditions and the following disclaimer.
11#    * Redistributions in binary form must reproduce the above copyright notice,
12#      this list of conditions and the following disclaimer in the documentation
13#      and/or other materials provided with the distribution.
14#    * Neither the name of Virtual Cable S.L. nor the names of its contributors
15#      may be used to endorse or promote products derived from this software
16#      without specific prior written permission.
17#
18# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
21# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
22# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
24# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
25# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
26# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
27# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28'''
29@author: Adolfo Gómez, dkmaster at dkmon dot com
30'''
31from __future__ import unicode_literals
32
33import socket
34import threading
35import six
36import traceback
37import json
38
39from opengnsys.utils import toUnicode
40from opengnsys.log import logger
41
42# The IPC Server will wait for connections from clients
43# Clients will open socket, and wait for data from server
44# The messages sent (from server) will be the following (subject to future changes):
45#     Message_id     Data               Action
46#    ------------  --------         --------------------------
47#    MSG_LOGOFF     None            Logout user from session
48#    MSG_MESSAGE    message,level   Display a message with level (INFO, WARN, ERROR, FATAL)     # TODO: Include level, right now only has message
49#    MSG_POPUP      title,message   Display a popup box with a title
50#    MSG_SCRIPT     python script   Execute an specific python script INSIDE CLIENT environment (this messages is not sent right now)
51# The messages received (sent from client) will be the following:
52#     Message_id       Data               Action
53#    ------------    --------         --------------------------
54#    REQ_LOGOUT                   Logout user from session
55#    REQ_INFORMATION  None            Request information from ipc server (maybe configuration parameters in a near future)
56#    REQ_LOGIN        python script   Execute an specific python script INSIDE CLIENT environment (this messages is not sent right now)
57#
58# All messages are in the form:
59# BYTE
60#  0           1-2                        3 4 ...
61# MSG_ID   DATA_LENGTH (little endian)    Data (can be 0 length)
62# With a previos "MAGIC" header in fron of each message
63
64# Client messages
65MSG_LOGOFF = 0xA1  # Request log off from an user
66MSG_MESSAGE = 0xB2
67MSG_POPUP = 0xB3
68MSG_SCRIPT = 0xC3
69
70# Request messages
71REQ_MESSAGE = 0xD4
72REQ_POPUP = 0xD5
73REQ_LOGIN = 0xE5
74REQ_LOGOUT = 0xF6
75
76# Reverse msgs dict for debugging
77REV_DICT = {
78    MSG_LOGOFF: 'MSG_LOGOFF',
79    MSG_MESSAGE: 'MSG_MESSAGE',
80    MSG_POPUP: 'MSG_POPUP',
81    MSG_SCRIPT: 'MSG_SCRIPT',
82    REQ_LOGIN: 'REQ_LOGIN',
83    REQ_LOGOUT: 'REQ_LOGOUT',
84    REQ_MESSAGE: 'REQ_MESSAGE'
85}
86
87MAGIC = b'\x4F\x47\x41\x00'  # OGA in hexa with a padded 0 to the right
88
89
90# States for client processor
91ST_SECOND_BYTE = 0x01
92ST_RECEIVING = 0x02
93ST_PROCESS_MESSAGE = 0x02
94
95
96class ClientProcessor(threading.Thread):
97    def __init__(self, parent, clientSocket):
98        super(self.__class__, self).__init__()
99        self.parent = parent
100        self.clientSocket = clientSocket
101        self.running = False
102        self.messages = six.moves.queue.Queue(32)  # @UndefinedVariable
103
104    def stop(self):
105        logger.debug('Stoping client processor')
106        self.running = False
107
108    def processRequest(self, msg, data):
109        logger.debug('Got Client message {}={}'.format(msg, REV_DICT.get(msg)))
110        if self.parent.clientMessageProcessor is not None:
111            self.parent.clientMessageProcessor(msg, data)
112
113    def run(self):
114        self.running = True
115        self.clientSocket.setblocking(0)
116
117        state = None
118        recv_msg = None
119        recv_data = None
120        while self.running:
121            try:
122                counter = 1024
123                while counter > 0:  # So we process at least the incoming queue every XX bytes readed
124                    counter -= 1
125                    b = self.clientSocket.recv(1)
126                    if b == b'':
127                        # Client disconnected
128                        self.running = False
129                        break
130                    buf = six.byte2int(b)  # Empty buffer, this is set as non-blocking
131                    if state is None:
132                        if buf in (REQ_MESSAGE, REQ_LOGIN, REQ_LOGOUT):
133                            logger.debug('State set to {}'.format(buf))
134                            state = buf
135                            recv_msg = buf
136                            continue  # Get next byte
137                        else:
138                            logger.debug('Got unexpected data {}'.format(buf))
139                    elif state in (REQ_MESSAGE, REQ_LOGIN, REQ_LOGOUT):
140                        logger.debug('First length byte is {}'.format(buf))
141                        msg_len = buf
142                        state = ST_SECOND_BYTE
143                        continue
144                    elif state == ST_SECOND_BYTE:
145                        msg_len += buf << 8
146                        logger.debug('Second length byte is {}, len is {}'.format(buf, msg_len))
147                        if msg_len == 0:
148                            self.processRequest(recv_msg, None)
149                            state = None
150                            break
151                        state = ST_RECEIVING
152                        recv_data = b''
153                        continue
154                    elif state == ST_RECEIVING:
155                        recv_data += six.int2byte(buf)
156                        msg_len -= 1
157                        if msg_len == 0:
158                            self.processRequest(recv_msg, recv_data)
159                            recv_data = None
160                            state = None
161                            break
162                    else:
163                        logger.debug('Got invalid message from request: {}, state: {}'.format(buf, state))
164            except socket.error as e:
165                # If no data is present, no problem at all, pass to check messages
166                pass
167            except Exception as e:
168                tb = traceback.format_exc()
169                logger.error('Error: {}, trace: {}'.format(e, tb))
170
171            if self.running is False:
172                break
173
174            try:
175                msg = self.messages.get(block=True, timeout=1)
176            except six.moves.queue.Empty:  # No message got in time @UndefinedVariable
177                continue
178
179            logger.debug('Got message {}={}'.format(msg, REV_DICT.get(msg[0])))
180
181            try:
182                m = msg[1] if msg[1] is not None else b''
183                l = len(m)
184                data = MAGIC + six.int2byte(msg[0]) + six.int2byte(l & 0xFF) + six.int2byte(l >> 8) + m
185                try:
186                    self.clientSocket.sendall(data)
187                except socket.error as e:
188                    # Send data error
189                    logger.debug('Socket connection is no more available: {}'.format(e.args))
190                    self.running = False
191            except Exception as e:
192                logger.error('Invalid message in queue: {}'.format(e))
193
194        logger.debug('Client processor stopped')
195        try:
196            self.clientSocket.close()
197        except Exception:
198            pass  # If can't close, nothing happens, just end thread
199
200
201class ServerIPC(threading.Thread):
202
203    def __init__(self, listenPort, clientMessageProcessor=None):
204        super(self.__class__, self).__init__()
205        self.port = listenPort
206        self.running = False
207        self.serverSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
208        self.serverSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
209        self.threads = []
210        self.clientMessageProcessor = clientMessageProcessor
211
212    def stop(self):
213        logger.debug('Stopping Server IPC')
214        self.running = False
215        for t in self.threads:
216            t.stop()
217        socket.socket(socket.AF_INET, socket.SOCK_STREAM).connect(('localhost', self.port))
218        self.serverSocket.close()
219
220        for t in self.threads:
221            t.join()
222
223    def sendMessage(self, msgId, msgData):
224        '''
225        Notify message to all listening threads
226        '''
227        logger.debug('Sending message {}({}),{} to all clients'.format(msgId, REV_DICT.get(msgId), msgData))
228
229        # Convert to bytes so length is correctly calculated
230        if isinstance(msgData, six.text_type):
231            msgData = msgData.encode('utf8')
232
233        for t in self.threads:
234            if t.isAlive():
235                logger.debug('Sending to {}'.format(t))
236                t.messages.put((msgId, msgData))
237
238    def sendLoggofMessage(self):
239        self.sendMessage(MSG_LOGOFF, '')
240
241    def sendMessageMessage(self, message):
242        self.sendMessage(MSG_MESSAGE, message)
243
244    def sendPopupMessage(self, title, message):
245        self.sendMessage(MSG_POPUP, {'title':title, 'message':message})
246
247    def sendScriptMessage(self, script):
248        self.sendMessage(MSG_SCRIPT, script)
249
250    def cleanupFinishedThreads(self):
251        '''
252        Cleans up current threads list
253        '''
254        aliveThreads = []
255        for t in self.threads:
256            if t.isAlive():
257                logger.debug('Thread {} is alive'.format(t))
258                aliveThreads.append(t)
259        self.threads[:] = aliveThreads
260
261    def run(self):
262        self.running = True
263
264        self.serverSocket.bind(('localhost', self.port))
265        self.serverSocket.setblocking(1)
266        self.serverSocket.listen(4)
267
268        while True:
269            try:
270                (clientSocket, address) = self.serverSocket.accept()
271                # Stop processing if thread is mean to stop
272                if self.running is False:
273                    break
274                logger.debug('Got connection from {}'.format(address))
275
276                self.cleanupFinishedThreads()  # House keeping
277
278                logger.debug('Starting new thread, current: {}'.format(self.threads))
279                t = ClientProcessor(self, clientSocket)
280                self.threads.append(t)
281                t.start()
282            except Exception as e:
283                logger.error('Got an exception on Server ipc thread: {}'.format(e))
284
285
286class ClientIPC(threading.Thread):
287    def __init__(self, listenPort):
288        super(ClientIPC, self).__init__()
289        self.port = listenPort
290        self.running = False
291        self.clientSocket = None
292        self.messages = six.moves.queue.Queue(32)  # @UndefinedVariable
293
294        self.connect()
295
296    def stop(self):
297        self.running = False
298
299    def getMessage(self):
300        while self.running:
301            try:
302                return self.messages.get(timeout=1)
303            except six.moves.queue.Empty:  # @UndefinedVariable
304                continue
305
306        return None
307
308    def sendRequestMessage(self, msg, data=None):
309        logger.debug('Sending request for msg: {}({}), {}'.format(msg, REV_DICT.get(msg), data))
310        if data is None:
311            data = b''
312
313        if isinstance(data, six.text_type):  # Convert to bytes if necessary
314            data = data.encode('utf-8')
315
316        l = len(data)
317        msg = six.int2byte(msg) + six.int2byte(l & 0xFF) + six.int2byte(l >> 8) + data
318        self.clientSocket.sendall(msg)
319
320    def sendLogin(self, username, language):
321        self.sendRequestMessage(REQ_LOGIN, username+','+language)
322
323    def sendLogout(self, username):
324        self.sendRequestMessage(REQ_LOGOUT, username)
325
326    def sendMessage(self, module, message, data=None):
327        '''
328        Sends a message "message" with data (data will be encoded as json, so ensure that it is serializable)
329        @param module: Module that will receive this message
330        @param message: Message to send. This message is "customized", and understand by modules
331        @param data: Data to be send as message companion
332        '''
333        msg = '\0'.join((module, message, json.dumps(data)))
334        self.sendRequestMessage(REQ_MESSAGE, msg)
335
336    def messageReceived(self):
337        '''
338        Override this method to automatically get notified on new message
339        received. Message is at self.messages queue
340        '''
341        pass
342
343    def receiveBytes(self, number):
344        msg = b''
345        while self.running and len(msg) < number:
346            try:
347                buf = self.clientSocket.recv(number - len(msg))
348                if buf == b'':
349                    logger.debug('Buf {}, msg {}({})'.format(buf, msg, REV_DICT.get(msg)))
350                    self.running = False
351                    break
352                msg += buf
353            except socket.timeout:
354                pass
355
356        if self.running is False:
357            logger.debug('Not running, returning None')
358            return None
359        return msg
360
361    def connect(self):
362        self.clientSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
363        self.clientSocket.connect(('localhost', self.port))
364        self.clientSocket.settimeout(2)  # Static, custom socket timeout of 2 seconds for local connection (no network)
365
366    def run(self):
367        self.running = True
368
369        while self.running:
370            try:
371                msg = b''
372                # We look for magic message header
373                while self.running:  # Wait for MAGIC
374                    try:
375                        buf = self.clientSocket.recv(len(MAGIC) - len(msg))
376                        if buf == b'':
377                            self.running = False
378                            break
379                        msg += buf
380                        if len(msg) != len(MAGIC):
381                            continue  # Do not have message
382                        if msg != MAGIC:  # Skip first byte an continue searchong
383                            msg = msg[1:]
384                            continue
385                        break
386                    except socket.timeout:  # Timeout is here so we can get stop thread
387                        continue
388
389                if self.running is False:
390                    break
391
392                # Now we get message basic data (msg + datalen)
393                msg = bytearray(self.receiveBytes(3))
394
395                # We have the magic header, here comes the message itself
396                if msg is None:
397                    continue
398
399                msgId = msg[0]
400                dataLen = msg[1] + (msg[2] << 8)
401                if msgId not in (MSG_LOGOFF, MSG_MESSAGE, MSG_SCRIPT):
402                    raise Exception('Invalid message id: {}'.format(msgId))
403
404                data = self.receiveBytes(dataLen)
405                if data is None:
406                    continue
407
408                self.messages.put((msgId, data))
409                self.messageReceived()
410
411            except socket.error as e:
412                logger.error('Communication with server got an error: {}'.format(toUnicode(e.strerror)))
413                self.running = False
414                return
415            except Exception as e:
416                tb = traceback.format_exc()
417                logger.error('Error: {}, trace: {}'.format(e, tb))
418
419        try:
420            self.clientSocket.close()
421        except Exception:
422            pass  # If can't close, nothing happens, just end thread
423
Note: See TracBrowser for help on using the repository browser.