source: ogAgent-Git/src/opengnsys/modules/server/OpenGnSys/__init__.py @ ddd54ee

oglive
Last change on this file since ddd54ee was ddd54ee, checked in by Ramón M. Gómez <ramongomez@…>, 5 years ago

#750: Using more descriptive status; new route POST /command to launch command/script in a callback thread that returns all data to a server route.

  • Property mode set to 100644
File size: 17.9 KB
Line 
1# -*- coding: utf-8 -*-
2#
3# Copyright (c) 2014 Virtual Cable S.L.
4# All rights reserved.
5#
6# Redistribution and use in source and binary forms, with or without modification,
7# are permitted provided that the following conditions are met:
8#
9#    * Redistributions of source code must retain the above copyright notice,
10#      this list of conditions and the following disclaimer.
11#    * Redistributions in binary form must reproduce the above copyright notice,
12#      this list of conditions and the following disclaimer in the documentation
13#      and/or other materials provided with the distribution.
14#    * Neither the name of Virtual Cable S.L. nor the names of its contributors
15#      may be used to endorse or promote products derived from this software
16#      without specific prior written permission.
17#
18# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
21# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
22# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
24# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
25# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
26# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
27# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28"""
29@author: Ramón M. Gómez, ramongomez at us dot es
30"""
31from __future__ import unicode_literals
32
33import os
34import random
35import shutil
36import string
37import threading
38import time
39import urllib
40
41from opengnsys import REST
42from opengnsys import operations
43from opengnsys.log import logger
44from opengnsys.scriptThread import ScriptExecutorThread
45from opengnsys.workers import ServerWorker
46from six.moves.urllib import parse
47
48
49
50# Check authorization header decorator
51def check_secret(fnc):
52    """
53    Decorator to check for received secret key and raise exception if it isn't valid.
54    """
55    def wrapper(*args, **kwargs):
56        try:
57            this, path, get_params, post_params, server = args  # @UnusedVariable
58            if this.random == server.headers['Authorization']:
59                fnc(*args, **kwargs)
60            else:
61                raise Exception('Unauthorized operation')
62        except Exception as e:
63            logger.error(e)
64            raise Exception(e)
65
66    return wrapper
67
68
69# Error handler decorator.
70def catch_background_error(fnc):
71    def wrapper(*args, **kwargs):
72        this = args[0]
73        try:
74            fnc(*args, **kwargs)
75        except Exception as e:
76            this.REST.sendMessage('error?id={}'.format(kwargs.get('requestId', 'error')), {'error': '{}'.format(e)})
77    return wrapper
78
79
80def check_locked_partition(sync=False):
81    """
82    Decorator to check if a partition is locked
83    """
84    def outer(fnc):
85        def wrapper(*args, **kwargs):
86            part_id = 'None'
87            try:
88                this, path, get_params, post_params, server = args  # @UnusedVariable
89                part_id = post_params['disk'] + post_params['part']
90                if this.locked.get(part_id, False):
91                    this.locked[part_id] = True
92                    fnc(*args, **kwargs)
93                else:
94                    return 'partition locked'
95            except Exception as e:
96                this.locked[part_id] = False
97                return 'error {}'.format(e)
98            finally:
99                if sync is True:
100                    this.locked[part_id] = False
101            logger.debug('Lock status: {} {}'.format(fnc, this.locked))
102        return wrapper
103    return outer
104
105
106class OpenGnSysWorker(ServerWorker):
107    name = 'opengnsys'
108    interface = None  # Bound interface for OpenGnsys
109    REST = None  # REST object
110    logged_in = False  # User session flag
111    locked = {}
112    random = None     # Random string for secure connections
113    length = 32       # Random string length
114
115    def onActivation(self):
116        """
117        Sends OGAgent activation notification to OpenGnsys server
118        """
119        t = 0
120        # Generate random secret to send on activation
121        self.random = ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(self.length))
122        # Ensure cfg has required configuration variables or an exception will be thrown
123        url = self.service.config.get('opengnsys', 'remote')
124        if operations.os_type == 'ogLive' and 'oglive' in os.environ:
125            # Replacing server IP if its running on ogLive clinet
126            logger.debug('Activating on ogLive client, new server is {}'.format(os.environ['oglive']))
127            url = parse.urlsplit(url)._replace(netloc=os.environ['oglive']).geturl()
128        self.REST = REST(url)
129        # Get network interfaces until they are active or timeout (5 minutes)
130        for t in range(0, 300):
131            try:
132                self.interface = list(operations.getNetworkInfo())[0]  # Get first network interface
133            except Exception as e:
134                # Wait 1 sec. and retry
135                time.sleep(1)
136            finally:
137                # Exit loop if interface is active
138                if self.interface:
139                    if t > 0:
140                        logger.debug("Fetch connection data after {} tries".format(t))
141                    break
142        # Raise error after timeout
143        if not self.interface:
144            raise e
145        # Loop to send initialization message
146        for t in range(0, 100):
147            try:
148                try:
149                    self.REST.sendMessage('ogagent/started', {'mac': self.interface.mac, 'ip': self.interface.ip,
150                                                              'secret': self.random, 'ostype': operations.os_type,
151                                                              'osversion': operations.os_version})
152                    break
153                except:
154                    # Trying to initialize on alternative server, if defined
155                    # (used in "exam mode" from the University of Seville)
156                    self.REST = REST(self.service.config.get('opengnsys', 'altremote'))
157                    self.REST.sendMessage('ogagent/started', {'mac': self.interface.mac, 'ip': self.interface.ip,
158                                                              'secret': self.random, 'ostype': operations.os_type,
159                                                              'osversion': operations.os_version, 'alt_url': True})
160                    break
161            except:
162                time.sleep(3)
163        # Raise error after timeout
164        if 0 < t < 100:
165            logger.debug('Successful connection after {} tries'.format(t))
166        elif t == 100:
167            raise Exception('Initialization error: Cannot connect to remote server')
168        # Delete marking files
169        for f in ['ogboot.me', 'ogboot.firstboot', 'ogboot.secondboot']:
170            try:
171                os.remove(os.sep + f)
172            except OSError:
173                pass
174        # Copy file "HostsFile.FirstOctetOfIPAddress" to "HostsFile", if it exists
175        # (used in "exam mode" from the University of Seville)
176        hosts_file = os.path.join(operations.get_etc_path(), 'hosts')
177        new_hosts_file = hosts_file + '.' + self.interface.ip.split('.')[0]
178        if os.path.isfile(new_hosts_file):
179            shutil.copyfile(new_hosts_file, hosts_file)
180
181    def onDeactivation(self):
182        """
183        Sends OGAgent stopping notification to OpenGnsys server
184        """
185        logger.debug('onDeactivation')
186        self.REST.sendMessage('ogagent/stopped', {'mac': self.interface.mac, 'ip': self.interface.ip,
187                                                  'ostype': operations.os_type, 'osversion': operations.os_version})
188
189    def processClientMessage(self, message, data):
190        logger.debug('Got OpenGnsys message from client: {}, data {}'.format(message, data))
191
192    def onLogin(self, data):
193        """
194        Sends session login notification to OpenGnsys server
195        """
196        user, sep, language = data.partition(',')
197        logger.debug('Received login for {} with language {}'.format(user, language))
198        self.logged_in = True
199        self.REST.sendMessage('ogagent/loggedin', {'ip': self.interface.ip, 'user': user, 'language': language,
200                                                   'ostype': operations.os_type, 'osversion': operations.os_version})
201
202    def onLogout(self, user):
203        """
204        Sends session logout notification to OpenGnsys server
205        """
206        logger.debug('Received logout for {}'.format(user))
207        self.logged_in = False
208        self.REST.sendMessage('ogagent/loggedout', {'ip': self.interface.ip, 'user': user})
209
210    def process_ogclient(self, path, get_params, post_params, server):
211        """
212        This method can be overridden to provide your own message processor, or better you can
213        implement a method that is called exactly as "process_" + path[0] (module name has been removed from path
214        array) and this default processMessage will invoke it
215        * Example:
216            Imagine this invocation url (no matter if GET or POST): http://example.com:9999/Sample/mazinger/Z
217            The HTTP Server will remove "Sample" from path, parse arguments and invoke this method as this:
218            module.processMessage(["mazinger","Z"], get_params, post_params)
219
220            This method will process "mazinger", and look for a "self" method that is called "process_mazinger",
221            and invoke it this way:
222               return self.process_mazinger(["Z"], get_params, post_params)
223
224            In the case path is empty (that is, the path is composed only by the module name, like in
225            "http://example.com/Sample", the "process" method will be invoked directly
226
227            The methods must return data that can be serialized to json (i.e. Objects are not serializable to json,
228            basic type are)
229        """
230        if not path:
231            return "ok"
232        try:
233            operation = getattr(self, 'ogclient_' + path[0])
234        except Exception:
235            raise Exception('Message processor for "{}" not found'.format(path[0]))
236        return operation(path[1:], get_params, post_params)
237
238    def process_status(self, path, get_params, post_params, server):
239        """
240        Returns client status (OS type or execution status) and login status
241        :param path:
242        :param get_params:
243        :param post_params:
244        :param server:
245        :return: JSON object {"status": "status_code", "loggedin": boolean}
246        """
247        res = {'loggedin': self.loggedin}
248        try:
249            res['status'] = operations.os_type.lower()
250        except KeyError:
251            res['status'] = ''
252        # Check if OpenGnsys Client is busy
253        if res['status'] == 'oglive' and self.locked:
254            res['status'] = 'busy'
255        return res
256
257    @check_secret
258    def process_reboot(self, path, get_params, post_params, server):
259        """
260        Launches a system reboot operation
261        :param path:
262        :param get_params:
263        :param post_params:
264        :param server: authorization header
265        :return: JSON object {"op": "launched"}
266        """
267        logger.debug('Received reboot operation')
268
269        # Rebooting thread
270        def rebt():
271            operations.reboot()
272        threading.Thread(target=rebt).start()
273        return {'op': 'launched'}
274
275    @check_secret
276    def process_poweroff(self, path, get_params, post_params, server):
277        """
278        Launches a system power off operation
279        :param path:
280        :param get_params:
281        :param post_params:
282        :param server: authorization header
283        :return: JSON object {"op": "launched"}
284        """
285        logger.debug('Received poweroff operation')
286
287        # Powering off thread
288        def pwoff():
289            time.sleep(2)
290            operations.poweroff()
291        threading.Thread(target=pwoff).start()
292        return {'op': 'launched'}
293
294    @check_secret
295    def process_script(self, path, get_params, post_params, server):
296        """
297        Processes an script execution (script should be encoded in base64)
298        :param path:
299        :param get_params:
300        :param post_params: JSON object {"script": "commands"}
301        :param server: authorization header
302        :return: JSON object {"op": "launched"}
303        """
304        logger.debug('Processing script request')
305        # Decoding script
306        script = urllib.unquote(post_params.get('script').decode('base64')).decode('utf8')
307        script = 'import subprocess; subprocess.check_output("""{}""",shell=True)'.format(script)
308        # Executing script.
309        if post_params.get('client', 'false') == 'false':
310            thr = ScriptExecutorThread(script)
311            thr.start()
312        else:
313            self.sendClientMessage('script', {'code': script})
314        return {'op': 'launched'}
315
316    @check_secret
317    def process_logoff(self, path, get_params, post_params, server):
318        """
319        Closes user session
320        """
321        logger.debug('Received logoff operation')
322        # Sending log off message to OGAgent client
323        self.sendClientMessage('logoff', {})
324        return {'op': 'sent to client'}
325
326    @check_secret
327    def process_popup(self, path, get_params, post_params, server):
328        """
329        Shows a message popup on the user's session
330        """
331        logger.debug('Received message operation')
332        # Sending popup message to OGAgent client
333        self.sendClientMessage('popup', post_params)
334        return {'op': 'launched'}
335
336    def process_client_popup(self, params):
337        self.REST.sendMessage('popup_done', params)
338
339    def process_config(self, path, get_params, post_params, server):
340        """
341        Returns client configuration
342        :param path:
343        :param get_params:
344        :param post_params:
345        :param server:
346        :return: object
347        """
348        serialno = ''   # Serial number
349        storage = []    # Storage configuration
350        warnings = 0    # Number of warnings
351        logger.debug('Recieved getconfig operation')
352        self.checkSecret(server)
353        # Processing data
354        for row in operations.get_configuration().split(';'):
355            cols = row.split(':')
356            if len(cols) == 1:
357                if cols[0] != '':
358                    # Serial number
359                    serialno = cols[0]
360                else:
361                    # Skip blank rows
362                    pass
363            elif len(cols) == 7:
364                disk, npart, tpart, fs, opsys, size, usage = cols
365                try:
366                    if int(npart) == 0:
367                        # Disk information
368                        storage.append({'disk': int(disk), 'parttable': int(tpart), 'size': int(size)})
369                    else:
370                        # Partition information
371                        storage.append({'disk': int(disk), 'partition': int(npart), 'parttype': tpart,
372                                        'filesystem': fs, 'operatingsystem': opsys, 'size': int(size),
373                                        'usage': int(usage)})
374                except ValueError:
375                    logger.warn('Configuration parameter error: {}'.format(cols))
376                    warnings += 1
377            else:
378                # Logging warnings
379                logger.warn('Configuration data error: {}'.format(cols))
380                warnings += 1
381        # Returning configuration data and count of warnings
382        return {'serialno': serialno, 'storage': storage, 'warnings': warnings}
383
384    def task_command(self, code, route):
385        """
386        Task to execute a command
387        :param code: Code to execute
388        :param route: server REST route to return results (including its parameters)
389        """
390        (stat, out, err) = operations.exec_command(code)
391        self.REST.sendMessage(route, {'status': stat, 'output': out, 'error': err})
392
393    def process_command(self, path, get_params, post_params, server):
394        """
395        Launches a thread to executing a command
396        :param path: ignored
397        :param get_params: ignored
398        :param post_params: object with format {"id": OperationId, "code": "Code", url: "ReturnURL"}
399        :param server: ignored
400        :rtype: object with launching status
401        """
402        logger.debug('Recieved command operation with params: {}'.format(post_params))
403        self.checkSecret(server)
404        # Processing data
405        try:
406            code = post_params.get('code')
407            cmd_id = post_params.get('id')
408            route = '{}?id={}'.format(post_params.get('route'), cmd_id)
409            # Launching new thread
410            threading.Thread(target=self.task_command, args=(code, route)).start()
411        except Exception as e:
412            logger.error('Got exception {}'.format(e))
413            return {'error': e}
414        return {'op': 'launched'}
415
416    def process_hardware(self, path, get_params, post_params, server):
417        """
418        Returns client's hardware profile
419        :param path:
420        :param get_params:
421        :param post_params:
422        :param server:
423        """
424        data = []
425        logger.debug('Recieved hardware operation')
426        self.checkSecret(server)
427        # Processing data
428        try:
429            for comp in operations.get_hardware():
430                data.append({'component': comp.split('=')[0], 'value': comp.split('=')[1]})
431        except:
432            pass
433        # Return list of hardware components
434        return data
435
436    def process_software(self, path, get_params, post_params, server):
437        """
438        Returns software profile installed on an operating system
439        :param path:
440        :param get_params:
441        :param post_params:
442        :param server:
443        :return:
444        """
445        logger.debug('Recieved software operation with params: {}'.format(post_params))
446        return operations.get_software(post_params.get('disk'), post_params.get('part'))
Note: See TracBrowser for help on using the repository browser.