source: admin/Sources/Clients/ogagent/src/opengnsys/ipc.py @ 1e8645b

918-git-images-111dconfigfileconfigure-oglivegit-imageslgromero-new-oglivemainmaint-cronmount-efivarfsmultivmmultivm-ogboot-installerogClonningEngineogboot-installer-jenkinsoglive-ipv6test-python-scriptsticket-301ticket-50ticket-50-oldticket-577ticket-585ticket-611ticket-612ticket-693ticket-700ubu24tplunification2use-local-agent-oglivevarios-instalacionwebconsole3
Last change on this file since 1e8645b was c3e7c06, checked in by ramon <ramongomez@…>, 9 years ago

#718: Integrar código fuente de agente OGAgent en rama de desarrollo.

git-svn-id: https://opengnsys.es/svn/branches/version1.1@4865 a21b9725-9963-47de-94b9-378ad31fedc9

  • Property mode set to 100644
File size: 15.6 KB
Line 
1# -*- coding: utf-8 -*-
2#
3# Copyright (c) 2014 Virtual Cable S.L.
4# All rights reserved.
5#
6# Redistribution and use in source and binary forms, with or without modification,
7# are permitted provided that the following conditions are met:
8#
9#    * Redistributions of source code must retain the above copyright notice,
10#      this list of conditions and the following disclaimer.
11#    * Redistributions in binary form must reproduce the above copyright notice,
12#      this list of conditions and the following disclaimer in the documentation
13#      and/or other materials provided with the distribution.
14#    * Neither the name of Virtual Cable S.L. nor the names of its contributors
15#      may be used to endorse or promote products derived from this software
16#      without specific prior written permission.
17#
18# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
21# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
22# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
24# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
25# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
26# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
27# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28'''
29@author: Adolfo Gómez, dkmaster at dkmon dot com
30'''
31from __future__ import unicode_literals
32
33import socket
34import threading
35import six
36import traceback
37import json
38
39from opengnsys.utils import toUnicode
40from opengnsys.log import logger
41
42# The IPC Server will wait for connections from clients
43# Clients will open socket, and wait for data from server
44# The messages sent (from server) will be the following (subject to future changes):
45#     Message_id     Data               Action
46#    ------------  --------         --------------------------
47#    MSG_LOGOFF     None            Logout user from session
48#    MSG_MESSAGE    message,level   Display a message with level (INFO, WARN, ERROR, FATAL)     # TODO: Include level, right now only has message
49#    MSG_SCRIPT     python script   Execute an specific python script INSIDE CLIENT environment (this messages is not sent right now)
50# The messages received (sent from client) will be the following:
51#     Message_id       Data               Action
52#    ------------    --------         --------------------------
53#    REQ_LOGOUT                   Logout user from session
54#    REQ_INFORMATION  None            Request information from ipc server (maybe configuration parameters in a near future)
55#    REQ_LOGIN        python script   Execute an specific python script INSIDE CLIENT environment (this messages is not sent right now)
56#
57# All messages are in the form:
58# BYTE
59#  0           1-2                        3 4 ...
60# MSG_ID   DATA_LENGTH (little endian)    Data (can be 0 length)
61# With a previos "MAGIC" header in fron of each message
62
63# Client messages
64MSG_LOGOFF = 0xA1  # Request log off from an user
65MSG_MESSAGE = 0xB2
66MSG_SCRIPT = 0xC3
67
68# Request messages
69REQ_MESSAGE = 0xD4
70REQ_LOGIN = 0xE5
71REQ_LOGOUT = 0xF6
72
73# Reverse msgs dict for debugging
74REV_DICT = {
75    MSG_LOGOFF: 'MSG_LOGOFF',
76    MSG_MESSAGE: 'MSG_MESSAGE',
77    MSG_SCRIPT: 'MSG_SCRIPT',
78    REQ_LOGIN: 'REQ_LOGIN',
79    REQ_LOGOUT: 'REQ_LOGOUT',
80    REQ_MESSAGE: 'REQ_MESSAGE'
81}
82
83MAGIC = b'\x4F\x47\x41\x00'  # OGA in hexa with a padded 0 to the right
84
85
86# States for client processor
87ST_SECOND_BYTE = 0x01
88ST_RECEIVING = 0x02
89ST_PROCESS_MESSAGE = 0x02
90
91
92class ClientProcessor(threading.Thread):
93    def __init__(self, parent, clientSocket):
94        super(self.__class__, self).__init__()
95        self.parent = parent
96        self.clientSocket = clientSocket
97        self.running = False
98        self.messages = six.moves.queue.Queue(32)  # @UndefinedVariable
99
100    def stop(self):
101        logger.debug('Stoping client processor')
102        self.running = False
103
104    def processRequest(self, msg, data):
105        logger.debug('Got Client message {}={}'.format(msg, REV_DICT.get(msg)))
106        if self.parent.clientMessageProcessor is not None:
107            self.parent.clientMessageProcessor(msg, data)
108
109    def run(self):
110        self.running = True
111        self.clientSocket.setblocking(0)
112
113        state = None
114        recv_msg = None
115        recv_data = None
116        while self.running:
117            try:
118                counter = 1024
119                while counter > 0:  # So we process at least the incoming queue every XX bytes readed
120                    counter -= 1
121                    b = self.clientSocket.recv(1)
122                    if b == b'':
123                        # Client disconnected
124                        self.running = False
125                        break
126                    buf = six.byte2int(b)  # Empty buffer, this is set as non-blocking
127                    if state is None:
128                        if buf in (REQ_MESSAGE, REQ_LOGIN, REQ_LOGOUT):
129                            logger.debug('State set to {}'.format(buf))
130                            state = buf
131                            recv_msg = buf
132                            continue  # Get next byte
133                        else:
134                            logger.debug('Got unexpected data {}'.format(buf))
135                    elif state in (REQ_MESSAGE, REQ_LOGIN, REQ_LOGOUT):
136                        logger.debug('First length byte is {}'.format(buf))
137                        msg_len = buf
138                        state = ST_SECOND_BYTE
139                        continue
140                    elif state == ST_SECOND_BYTE:
141                        msg_len += buf << 8
142                        logger.debug('Second length byte is {}, len is {}'.format(buf, msg_len))
143                        if msg_len == 0:
144                            self.processRequest(recv_msg, None)
145                            state = None
146                            break
147                        state = ST_RECEIVING
148                        recv_data = b''
149                        continue
150                    elif state == ST_RECEIVING:
151                        recv_data += six.int2byte(buf)
152                        msg_len -= 1
153                        if msg_len == 0:
154                            self.processRequest(recv_msg, recv_data)
155                            recv_data = None
156                            state = None
157                            break
158                    else:
159                        logger.debug('Got invalid message from request: {}, state: {}'.format(buf, state))
160            except socket.error as e:
161                # If no data is present, no problem at all, pass to check messages
162                pass
163            except Exception as e:
164                tb = traceback.format_exc()
165                logger.error('Error: {}, trace: {}'.format(e, tb))
166
167            if self.running is False:
168                break
169
170            try:
171                msg = self.messages.get(block=True, timeout=1)
172            except six.moves.queue.Empty:  # No message got in time @UndefinedVariable
173                continue
174
175            logger.debug('Got message {}={}'.format(msg, REV_DICT.get(msg[0])))
176
177            try:
178                m = msg[1] if msg[1] is not None else b''
179                l = len(m)
180                data = MAGIC + six.int2byte(msg[0]) + six.int2byte(l & 0xFF) + six.int2byte(l >> 8) + m
181                try:
182                    self.clientSocket.sendall(data)
183                except socket.error as e:
184                    # Send data error
185                    logger.debug('Socket connection is no more available: {}'.format(e.args))
186                    self.running = False
187            except Exception as e:
188                logger.error('Invalid message in queue: {}'.format(e))
189
190        logger.debug('Client processor stopped')
191        try:
192            self.clientSocket.close()
193        except Exception:
194            pass  # If can't close, nothing happens, just end thread
195
196
197class ServerIPC(threading.Thread):
198
199    def __init__(self, listenPort, clientMessageProcessor=None):
200        super(self.__class__, self).__init__()
201        self.port = listenPort
202        self.running = False
203        self.serverSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
204        self.serverSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
205        self.threads = []
206        self.clientMessageProcessor = clientMessageProcessor
207
208    def stop(self):
209        logger.debug('Stopping Server IPC')
210        self.running = False
211        for t in self.threads:
212            t.stop()
213        socket.socket(socket.AF_INET, socket.SOCK_STREAM).connect(('localhost', self.port))
214        self.serverSocket.close()
215
216        for t in self.threads:
217            t.join()
218
219    def sendMessage(self, msgId, msgData):
220        '''
221        Notify message to all listening threads
222        '''
223        logger.debug('Sending message {}({}),{} to all clients'.format(msgId, REV_DICT.get(msgId), msgData))
224
225        # Convert to bytes so length is correctly calculated
226        if isinstance(msgData, six.text_type):
227            msgData = msgData.encode('utf8')
228
229        for t in self.threads:
230            if t.isAlive():
231                logger.debug('Sending to {}'.format(t))
232                t.messages.put((msgId, msgData))
233
234    def sendLoggofMessage(self):
235        self.sendMessage(MSG_LOGOFF, '')
236
237    def sendMessageMessage(self, message):
238        self.sendMessage(MSG_MESSAGE, message)
239
240    def sendScriptMessage(self, script):
241        self.sendMessage(MSG_SCRIPT, script)
242
243    def cleanupFinishedThreads(self):
244        '''
245        Cleans up current threads list
246        '''
247        aliveThreads = []
248        for t in self.threads:
249            if t.isAlive():
250                logger.debug('Thread {} is alive'.format(t))
251                aliveThreads.append(t)
252        self.threads[:] = aliveThreads
253
254    def run(self):
255        self.running = True
256
257        self.serverSocket.bind(('localhost', self.port))
258        self.serverSocket.setblocking(1)
259        self.serverSocket.listen(4)
260
261        while True:
262            try:
263                (clientSocket, address) = self.serverSocket.accept()
264                # Stop processing if thread is mean to stop
265                if self.running is False:
266                    break
267                logger.debug('Got connection from {}'.format(address))
268
269                self.cleanupFinishedThreads()  # House keeping
270
271                logger.debug('Starting new thread, current: {}'.format(self.threads))
272                t = ClientProcessor(self, clientSocket)
273                self.threads.append(t)
274                t.start()
275            except Exception as e:
276                logger.error('Got an exception on Server ipc thread: {}'.format(e))
277
278
279class ClientIPC(threading.Thread):
280    def __init__(self, listenPort):
281        super(ClientIPC, self).__init__()
282        self.port = listenPort
283        self.running = False
284        self.clientSocket = None
285        self.messages = six.moves.queue.Queue(32)  # @UndefinedVariable
286
287        self.connect()
288
289    def stop(self):
290        self.running = False
291
292    def getMessage(self):
293        while self.running:
294            try:
295                return self.messages.get(timeout=1)
296            except six.moves.queue.Empty:  # @UndefinedVariable
297                continue
298
299        return None
300
301    def sendRequestMessage(self, msg, data=None):
302        logger.debug('Sending request for msg: {}({}), {}'.format(msg, REV_DICT.get(msg), data))
303        if data is None:
304            data = b''
305
306        if isinstance(data, six.text_type):  # Convert to bytes if necessary
307            data = data.encode('utf-8')
308
309        l = len(data)
310        msg = six.int2byte(msg) + six.int2byte(l & 0xFF) + six.int2byte(l >> 8) + data
311        self.clientSocket.sendall(msg)
312
313    def sendLogin(self, username):
314        self.sendRequestMessage(REQ_LOGIN, username)
315
316    def sendLogout(self, username):
317        self.sendRequestMessage(REQ_LOGOUT, username)
318       
319    def sendMessage(self, module, message, data=None):
320        '''
321        Sends a message "message" with data (data will be encoded as json, so ensure that it is serializable)
322        @param module: Module that will receive this message
323        @param message: Message to send. This message is "customized", and understand by modules
324        @param data: Data to be send as message companion
325        '''
326        msg = '\0'.join((module, message, json.dumps(data)))
327        self.sendRequestMessage(REQ_MESSAGE, msg)
328
329    def messageReceived(self):
330        '''
331        Override this method to automatically get notified on new message
332        received. Message is at self.messages queue
333        '''
334        pass
335
336    def receiveBytes(self, number):
337        msg = b''
338        while self.running and len(msg) < number:
339            try:
340                buf = self.clientSocket.recv(number - len(msg))
341                if buf == b'':
342                    logger.debug('Buf {}, msg {}({})'.format(buf, msg, REV_DICT.get(msg)))
343                    self.running = False
344                    break
345                msg += buf
346            except socket.timeout:
347                pass
348
349        if self.running is False:
350            logger.debug('Not running, returning None')
351            return None
352        return msg
353
354    def connect(self):
355        self.clientSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
356        self.clientSocket.connect(('localhost', self.port))
357        self.clientSocket.settimeout(2)  # Static, custom socket timeout of 2 seconds for local connection (no network)
358
359    def run(self):
360        self.running = True
361
362        while self.running:
363            try:
364                msg = b''
365                # We look for magic message header
366                while self.running:  # Wait for MAGIC
367                    try:
368                        buf = self.clientSocket.recv(len(MAGIC) - len(msg))
369                        if buf == b'':
370                            self.running = False
371                            break
372                        msg += buf
373                        if len(msg) != len(MAGIC):
374                            continue  # Do not have message
375                        if msg != MAGIC:  # Skip first byte an continue searchong
376                            msg = msg[1:]
377                            continue
378                        break
379                    except socket.timeout:  # Timeout is here so we can get stop thread
380                        continue
381
382                if self.running is False:
383                    break
384
385                # Now we get message basic data (msg + datalen)
386                msg = bytearray(self.receiveBytes(3))
387
388                # We have the magic header, here comes the message itself
389                if msg is None:
390                    continue
391
392                msgId = msg[0]
393                dataLen = msg[1] + (msg[2] << 8)
394                if msgId not in (MSG_LOGOFF, MSG_MESSAGE, MSG_SCRIPT):
395                    raise Exception('Invalid message id: {}'.format(msgId))
396
397                data = self.receiveBytes(dataLen)
398                if data is None:
399                    continue
400
401                self.messages.put((msgId, data))
402                self.messageReceived()
403
404            except socket.error as e:
405                logger.error('Communication with server got an error: {}'.format(toUnicode(e.strerror)))
406                self.running = False
407                return
408            except Exception as e:
409                tb = traceback.format_exc()
410                logger.error('Error: {}, trace: {}'.format(e, tb))
411
412        try:
413            self.clientSocket.close()
414        except Exception:
415            pass  # If can't close, nothing happens, just end thread
416
Note: See TracBrowser for help on using the repository browser.