source: admin/Sources/Clients/ogagent/src/opengnsys/service.py @ 4e12ae6

918-git-images-111dconfigfileconfigure-oglivegit-imageslgromero-new-oglivemainmaint-cronmount-efivarfsmultivmmultivm-ogboot-installerogClonningEngineogboot-installer-jenkinsoglive-ipv6test-python-scriptsticket-301ticket-50ticket-50-oldticket-577ticket-585ticket-611ticket-612ticket-693ticket-700ubu24tplunification2use-local-agent-oglivevarios-instalacionwebconsole3
Last change on this file since 4e12ae6 was c3e7c06, checked in by ramon <ramongomez@…>, 9 years ago

#718: Integrar código fuente de agente OGAgent en rama de desarrollo.

git-svn-id: https://opengnsys.es/svn/branches/version1.1@4865 a21b9725-9963-47de-94b9-378ad31fedc9

  • Property mode set to 100644
File size: 8.9 KB
Line 
1# -*- coding: utf-8 -*-
2#
3# Copyright (c) 2014 Virtual Cable S.L.
4# All rights reserved.
5#
6# Redistribution and use in source and binary forms, with or without modification,
7# are permitted provided that the following conditions are met:
8#
9#    * Redistributions of source code must retain the above copyright notice,
10#      this list of conditions and the following disclaimer.
11#    * Redistributions in binary form must reproduce the above copyright notice,
12#      this list of conditions and the following disclaimer in the documentation
13#      and/or other materials provided with the distribution.
14#    * Neither the name of Virtual Cable S.L. nor the names of its contributors
15#      may be used to endorse or promote products derived from this software
16#      without specific prior written permission.
17#
18# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
21# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
22# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
24# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
25# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
26# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
27# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28
29'''
30@author: Adolfo Gómez, dkmaster at dkmon dot com
31'''
32from __future__ import unicode_literals
33
34from .log import logger
35from .config import readConfig
36from .utils import exceptionToMessage
37
38from . import ipc
39from . import httpserver
40from .loader import loadModules
41
42import socket
43import time
44import json
45import six
46
47IPC_PORT = 10398
48
49
50class CommonService(object):
51    isAlive = True
52    ipc = None
53    httpServer = None
54    modules = None
55   
56    def __init__(self):
57        logger.info('----------------------------------------')
58        logger.info('Initializing OpenGnsys Agent')
59       
60        # Read configuration file before proceding & ensures minimal config is there
61
62        self.config = readConfig()
63
64        # Get opengnsys section as dict       
65        cfg = dict(self.config.items('opengnsys'))
66   
67        # Set up log level
68        logger.setLevel(cfg.get('log', 'INFO'))
69
70       
71        logger.debug('Loaded configuration from opengnsys.cfg:')
72        for section in self.config.sections():
73            logger.debug('Section {} = {}'.format(section, self.config.items(section)))
74           
75   
76        if logger.logger.isWindows():
77            # Logs will also go to windows event log for services
78            logger.logger.serviceLogger = True
79           
80        self.address = (cfg.get('address', '0.0.0.0'), int(cfg.get('port', '10997')))
81        self.ipcport = int(cfg.get('ipc_port', IPC_PORT))
82       
83        self.timeout = int(cfg.get('timeout', '20'))
84       
85        logger.debug('Socket timeout: {}'.format(self.timeout))
86        socket.setdefaulttimeout(self.timeout)
87       
88        # Now load modules
89        self.modules = loadModules(self)
90        logger.debug('Modules: {}'.format(list(v.name for v in self.modules)))
91       
92    def stop(self):
93        '''
94        Requests service termination
95        '''
96        self.isAlive = False
97       
98    # ********************************
99    # * Internal messages processors *
100    # ********************************
101    def notifyLogin(self, username):
102        for v in self.modules:
103            try:
104                logger.debug('Notifying login of user {} to module {}'.format(username, v.name))
105                v.onLogin(username)
106            except Exception as e:
107                logger.error('Got exception {} processing login message on {}'.format(e, v.name))
108   
109    def notifyLogout(self, username):
110        for v in self.modules:
111            try:
112                logger.debug('Notifying logout of user {} to module {}'.format(username, v.name))
113                v.onLogout(username)
114            except Exception as e:
115                logger.error('Got exception {} processing logout message on {}'.format(e, v.name))
116               
117    def notifyMessage(self, data):
118        module, message, data = data.split('\0')
119        for v in self.modules:
120            if v.name == module:  # Case Sensitive!!!!
121                try:
122                    logger.debug('Notifying message {} to module {} with json data {}'.format(message, v.name, data))
123                    v.processClientMessage(message, json.loads(data))
124                    return
125                except Exception as e:
126                    logger.error('Got exception {} processing generic message on {}'.format(e, v.name))
127
128        logger.error('Module {} not found, messsage {} not sent'.format(module, message))
129                     
130
131    def clientMessageProcessor(self, msg, data):
132        '''
133        Callback, invoked from IPC, on its own thread (not the main thread).
134        This thread will "block" communication with agent untill finished, but this should be no problem
135        '''
136        logger.debug('Got message {}'.format(msg))
137       
138        if msg == ipc.REQ_LOGIN:
139            self.notifyLogin(data)
140        elif msg == ipc.REQ_LOGOUT:
141            self.notifyLogout(data)
142        elif msg == ipc.REQ_MESSAGE:
143            self.notifyMessage(data)
144
145    def initialize(self):
146        # ******************************************
147        # * Initialize listeners, modules, etc...
148        # ******************************************
149       
150        if six.PY3 is False:
151            import threading
152            threading._DummyThread._Thread__stop = lambda x: 42
153       
154        logger.debug('Starting IPC listener at {}'.format(IPC_PORT))
155        self.ipc = ipc.ServerIPC(self.ipcport, clientMessageProcessor=self.clientMessageProcessor)
156        self.ipc.start()
157
158        # And http threaded server
159        self.httpServer = httpserver.HTTPServerThread(self.address, self)
160        self.httpServer.start()
161       
162        # And lastly invoke modules activation
163        validMods = []
164        for mod in self.modules:
165            try:
166                logger.debug('Activating module {}'.format(mod.name))
167                mod.activate()
168                validMods.append(mod)
169            except Exception as e:
170                logger.exception()
171                logger.error("Activation of {} failed: {}".format(mod.name, exceptionToMessage(e)))
172       
173        self.modules[:] = validMods  # copy instead of assignment
174       
175        logger.debug('Modules after activation: {}'.format(list(v.name for v in self.modules)))
176
177    def terminate(self):
178        # First invoke deactivate on modules
179        for mod in reversed(self.modules):
180            try:
181                logger.debug('Deactivating module {}'.format(mod.name))
182                mod.deactivate()
183            except Exception as e:
184                logger.exception()
185                logger.error("Deactivation of {} failed: {}".format(mod.name, exceptionToMessage(e)))
186       
187        # Remove IPC threads
188        if self.ipc is not None:
189            try:
190                self.ipc.stop()
191            except Exception:
192                logger.error('Couln\'t stop ipc server')
193               
194        if self.httpServer is not None:
195            try:
196                self.httpServer.stop()
197            except Exception:
198                logger.error('Couln\'t stop RESTApi server')
199
200        self.notifyStop()
201
202    # ****************************************
203    # Methods that CAN BE overridden by agents
204    # ****************************************
205    def doWait(self, miliseconds):
206        '''
207        Invoked to wait a bit
208        CAN be OVERRIDDEN
209        '''
210        time.sleep(float(miliseconds) / 1000)
211
212    def notifyStop(self):
213        '''
214        Overridden to log stop
215        '''
216        logger.info('Service is being stopped')
217       
218    # ***************************************************
219    # * Helpers, convenient methods to facilitate comms *
220    # ***************************************************
221    def sendClientMessage(self, toModule, message, data):
222        '''
223        Sends a message to the clients using IPC
224        The data is converted to json, so ensure that it is serializable.
225        All IPC is asynchronous, so if you expect a response, this will be sent by client using another message
226       
227        @param toModule: Module that will receive this message
228        @param message: Message to send
229        @param data: data to send
230        '''
231        self.ipc.sendMessageMessage('\0'.join((toModule, message, json.dumps(data))))
232       
233    def sendScriptMessage(self, script):
234        '''
235        Sends an script to be executed by client
236        '''
237        self.ipc.sendScriptMessage(script)
238       
239    def sendLogoffMessage(self):
240        '''
241        Sends a logoff message to client
242        '''
243        self.ipc.sendLoggofMessage()
Note: See TracBrowser for help on using the repository browser.