source: ogClient-Git/src/ogRest.py @ ad2d4a1

Last change on this file since ad2d4a1 was 2465ef2, checked in by Jose M. Guisado <jguisado@…>, 3 years ago

#1065 Add event datagram socket

ogClient can receive events via a datagram socket opened at 55885.
This socket is only opened when in windows or linux mode, for
event reporting from within the system.

Events reported this way are sent back to ogServer via a 103 Early
Hints HTTP message. Information regarding the event is sent in the
response's payload.

  • Property mode set to 100644
File size: 12.3 KB
RevLine 
[05b1088]1#
[cb9edc8]2# Copyright (C) 2020-2021 Soleta Networks <info@soleta.eu>
[05b1088]3#
4# This program is free software: you can redistribute it and/or modify it under
5# the terms of the GNU Affero General Public License as published by the
[cb9edc8]6# Free Software Foundation; either version 3 of the License, or
7# (at your option) any later version.
[05b1088]8
[dfc97ff]9import threading
10import platform
11import time
[694bc49]12from enum import Enum
[e20daf6]13import json
14import queue
[d5dca0f]15import sys
16import os
17import signal
[3dfe549]18import logging
19from logging.handlers import SysLogHandler
[694bc49]20
[e39fe2f]21from src.restRequest import *
[2fa8aa4]22
[3dfe549]23LOGGER = logging.getLogger()
24
[583057b]25class ThreadState(Enum):
26        IDLE = 0
27        BUSY = 1
28
[3917c36]29class jsonBody():
[e96e187]30        def __init__(self, dictionary=None):
31                if dictionary:
32                        self.jsontree = dictionary
33                else:
34                        self.jsontree = {}
[7c26c55]35
[3917c36]36        def add_element(self, key, value):
[7c26c55]37                self.jsontree[key] = value
38
[3917c36]39        def dump(self):
[7c26c55]40                return json.dumps(self.jsontree)
41
[0f32b9c]42class restResponse():
[3917c36]43        def __init__(self, response, json_body=None):
[86eb703]44                self.msg = ''
[0f32b9c]45                if response == ogResponses.BAD_REQUEST:
[86eb703]46                        self.msg = 'HTTP/1.0 400 Bad Request'
[0f32b9c]47                elif response == ogResponses.IN_PROGRESS:
[86eb703]48                        self.msg = 'HTTP/1.0 202 Accepted'
[0f32b9c]49                elif response == ogResponses.OK:
[86eb703]50                        self.msg = 'HTTP/1.0 200 OK'
[0f32b9c]51                elif response == ogResponses.INTERNAL_ERR:
[86eb703]52                        self.msg = 'HTTP/1.0 500 Internal Server Error'
[d5dca0f]53                elif response == ogResponses.UNAUTHORIZED:
[86eb703]54                        self.msg = 'HTTP/1.0 401 Unauthorized'
[583057b]55                elif response == ogResponses.SERVICE_UNAVAILABLE:
56                        self.msg = 'HTTP/1.0 503 Service Unavailable'
[2465ef2]57                elif response == ogResponses.EARLY_HINTS:
58                        self.msg = 'HTTP/1.0 103 Early Hints'
[0f32b9c]59                else:
[86eb703]60                        return self.msg
[0f32b9c]61
[93f1b35]62                if response in {ogResponses.OK, ogResponses.IN_PROGRESS}:
[3dfe549]63                        LOGGER.info(self.msg[:ogRest.LOG_LENGTH])
[93f1b35]64                else:
[3dfe549]65                        LOGGER.warn(self.msg[:ogRest.LOG_LENGTH])
[93f1b35]66
[86eb703]67                self.msg += '\r\n'
[3a44e48]68
[3917c36]69                if json_body:
70                        self.msg += 'Content-Length: ' + str(len(json_body.dump()))
[f86999d]71                        self.msg += '\r\nContent-Type: application/json'
[3917c36]72                        self.msg += '\r\n\r\n' + json_body.dump()
[3a44e48]73                else:
[b53f8d0]74                        self.msg += 'Content-Length: 0\r\n' \
75                                    'Content-Type: application/json\r\n\r\n'
[0f32b9c]76
[86eb703]77
78        def get(self):
79                return self.msg
[0f32b9c]80
[59a2823]81class ogThread():
[5964e48]82        def shellrun(client, request, ogRest):
[9890d60]83                if not request.getrun():
[86eb703]84                        response = restResponse(ogResponses.BAD_REQUEST)
85                        client.send(response.get())
[583057b]86                        ogRest.state = ThreadState.IDLE
[230bdca]87                        return
88
89                try:
[269c7b5]90                        shellout = ogRest.operations.shellrun(request, ogRest)
[230bdca]91                except ValueError as err:
[86eb703]92                        response = restResponse(ogResponses.INTERNAL_ERR)
93                        client.send(response.get())
[583057b]94                        ogRest.state = ThreadState.IDLE
[230bdca]95                        return
96
[8fc251e]97                if request.getEcho():
[3917c36]98                        json_body = jsonBody()
99                        json_body.add_element('out', shellout)
100                        response = restResponse(ogResponses.OK, json_body)
[86eb703]101                        client.send(response.get())
[230bdca]102                else:
[86eb703]103                        response = restResponse(ogResponses.OK)
104                        client.send(response.get())
[59a2823]105
[583057b]106                ogRest.state = ThreadState.IDLE
107
[4f03c31]108        def poweroff(ogRest):
[59a2823]109                time.sleep(2)
[4f03c31]110                ogRest.operations.poweroff()
[59a2823]111
[4f03c31]112        def reboot(ogRest):
113                ogRest.operations.reboot()
[59a2823]114
[2e80653]115        def session(client, request, ogRest):
[0f32b9c]116                try:
[4f03c31]117                        ogRest.operations.session(request, ogRest)
[0f32b9c]118                except ValueError as err:
[86eb703]119                        response = restResponse(ogResponses.INTERNAL_ERR)
120                        client.send(response.get())
[583057b]121                        ogRest.state = ThreadState.IDLE
[0f32b9c]122                        return
123
[86eb703]124                response = restResponse(ogResponses.OK)
125                client.send(response.get())
[0593119]126                client.disconnect()
[2fa8aa4]127
[2e80653]128        def software(client, request, path, ogRest):
[683afa6]129                try:
[2e3d47b]130                        software = ogRest.operations.software(request, path, ogRest)
[683afa6]131                except ValueError as err:
[86eb703]132                        response = restResponse(ogResponses.INTERNAL_ERR)
133                        client.send(response.get())
[583057b]134                        ogRest.state = ThreadState.IDLE
[683afa6]135                        return
136
[3917c36]137                json_body = jsonBody()
138                json_body.add_element('partition', request.getPartition())
[2e3d47b]139                json_body.add_element('software', software)
[683afa6]140
[3917c36]141                response = restResponse(ogResponses.OK, json_body)
[86eb703]142                client.send(response.get())
[583057b]143                ogRest.state = ThreadState.IDLE
[6d1e79b]144
[2e80653]145        def hardware(client, path, ogRest):
[1ced3dd]146                try:
[4f03c31]147                        ogRest.operations.hardware(path, ogRest)
[1ced3dd]148                except ValueError as err:
[86eb703]149                        response = restResponse(ogResponses.INTERNAL_ERR)
150                        client.send(response.get())
[583057b]151                        ogRest.state = ThreadState.IDLE
[1ced3dd]152                        return
153
[3917c36]154                json_body = jsonBody()
[1fd9f2e]155                with open(path, 'r') as f:
[3917c36]156                        json_body.add_element('hardware', f.read())
[86eb703]157
[3917c36]158                response = restResponse(ogResponses.OK, json_body)
[86eb703]159                client.send(response.get())
[583057b]160                ogRest.state = ThreadState.IDLE
[261a5ed]161
[2e80653]162        def setup(client, request, ogRest):
[86eb703]163                try:
[4f03c31]164                        out = ogRest.operations.setup(request, ogRest)
[86eb703]165                except ValueError as err:
166                        response = restResponse(ogResponses.INTERNAL_ERR)
167                        client.send(response.get())
[583057b]168                        ogRest.state = ThreadState.IDLE
[86eb703]169                        return
170
[3917c36]171                json_body = jsonBody(out)
[86eb703]172
[3917c36]173                response = restResponse(ogResponses.OK, json_body)
[86eb703]174                client.send(response.get())
[583057b]175                ogRest.state = ThreadState.IDLE
[efbe8a7]176
[2e80653]177        def image_restore(client, request, ogRest):
[a306b8b]178                try:
[4f03c31]179                        ogRest.operations.image_restore(request, ogRest)
[a306b8b]180                except ValueError as err:
[86eb703]181                        response = restResponse(ogResponses.INTERNAL_ERR)
182                        client.send(response.get())
[583057b]183                        ogRest.state = ThreadState.IDLE
[a306b8b]184                        return
185
[3917c36]186                json_body = jsonBody()
187                json_body.add_element('disk', request.getDisk())
188                json_body.add_element('partition', request.getPartition())
189                json_body.add_element('image_id', request.getId())
[fe4236d]190
[3917c36]191                response = restResponse(ogResponses.OK, json_body)
[86eb703]192                client.send(response.get())
[583057b]193                ogRest.state = ThreadState.IDLE
[cc11d8f]194
[2e80653]195        def image_create(client, path, request, ogRest):
[b2fd0b5]196                try:
[c86eae4]197                        image_info = ogRest.operations.image_create(path,
198                                                                    request,
199                                                                    ogRest)
[2e3d47b]200                        software = ogRest.operations.software(request, path, ogRest)
[b2fd0b5]201                except ValueError as err:
[86eb703]202                        response = restResponse(ogResponses.INTERNAL_ERR)
203                        client.send(response.get())
[583057b]204                        ogRest.state = ThreadState.IDLE
[b2fd0b5]205                        return
206
[b138fbc]207                kibi = 1024
208                datasize = int(image_info['datasize']) * kibi
209
[3917c36]210                json_body = jsonBody()
211                json_body.add_element('disk', request.getDisk())
212                json_body.add_element('partition', request.getPartition())
213                json_body.add_element('code', request.getCode())
214                json_body.add_element('id', request.getId())
215                json_body.add_element('name', request.getName())
216                json_body.add_element('repository', request.getRepo())
[2e3d47b]217                json_body.add_element('software', software)
[c86eae4]218                json_body.add_element('clonator', image_info['clonator'])
219                json_body.add_element('compressor', image_info['compressor'])
220                json_body.add_element('filesystem', image_info['filesystem'])
[b138fbc]221                json_body.add_element('datasize', datasize)
[86eb703]222
[3917c36]223                response = restResponse(ogResponses.OK, json_body)
[86eb703]224                client.send(response.get())
[583057b]225                ogRest.state = ThreadState.IDLE
[b2fd0b5]226
[2e80653]227        def refresh(client, ogRest):
[b5e182f]228                try:
[4f03c31]229                        out = ogRest.operations.refresh(ogRest)
[b5e182f]230                except ValueError as err:
[86eb703]231                        response = restResponse(ogResponses.INTERNAL_ERR)
232                        client.send(response.get())
[583057b]233                        ogRest.state = ThreadState.IDLE
[b5e182f]234                        return
235
[3917c36]236                json_body = jsonBody(out)
[b5e182f]237
[3917c36]238                response = restResponse(ogResponses.OK, json_body)
[86eb703]239                client.send(response.get())
[583057b]240                ogRest.state = ThreadState.IDLE
[b5e182f]241
[694bc49]242class ogResponses(Enum):
243        BAD_REQUEST=0
244        IN_PROGRESS=1
245        OK=2
[0f32b9c]246        INTERNAL_ERR=3
[d5dca0f]247        UNAUTHORIZED=4
[583057b]248        SERVICE_UNAVAILABLE=5
[2465ef2]249        EARLY_HINTS=6
[694bc49]250
[dfc97ff]251class ogRest():
[93f1b35]252        LOG_LENGTH = 32
253
[38b6d77]254        def __init__(self, config):
[d5dca0f]255                self.proc = None
256                self.terminated = False
[583057b]257                self.state = ThreadState.IDLE
[38b6d77]258                self.CONFIG = config
259                self.mode = self.CONFIG['opengnsys']['mode']
260                self.samba_config = self.CONFIG['samba']
[4f03c31]261
[1377ace]262                if self.mode == 'live':
[f0aa3df]263                        from src.live.ogOperations import OgLiveOperations
264                        self.operations = OgLiveOperations(self.CONFIG)
[4f03c31]265                elif self.mode == 'virtual':
[b29b2eb]266                        from src.virtual.ogOperations import \
267                                OgVirtualOperations
[4f03c31]268                        self.operations = OgVirtualOperations()
[b29b2eb]269                        threading.Thread(target=self.operations.check_vm_state_loop,
[6ca16dd]270                                         args=(self,)).start()
[2d3d31b]271                elif self.mode == 'linux':
272                        from src.linux.ogOperations import OgLinuxOperations
273                        self.operations = OgLinuxOperations()
[f951193]274                elif self.mode == 'windows':
275                        from src.windows.ogOperations import OgWindowsOperations
276                        self.operations = OgWindowsOperations()
[4f03c31]277                else:
278                        raise ValueError('Mode not supported.')
[d5dca0f]279
[a85c113]280        def process_request(self, request, client):
281                method = request.get_method()
[d69841e]282                URI = request.get_uri()
[d5dca0f]283
[3dfe549]284                LOGGER.debug('%s%s', method, URI[:ogRest.LOG_LENGTH])
[93f1b35]285
[583057b]286                if (not "stop" in URI and
287                    not "reboot" in URI and
288                    not "poweroff" in URI and
289                    not "probe" in URI):
290                        if self.state == ThreadState.BUSY:
[3dfe549]291                                LOGGER.warn('Request has been received '
292                                            'while ogClient is busy')
[583057b]293                                response = restResponse(ogResponses.SERVICE_UNAVAILABLE)
294                                client.send(response.get())
295                                return
296                        else:
297                                self.state = ThreadState.BUSY
[d5dca0f]298
[a85c113]299                if ("GET" in method):
[9c34a8e]300                        if "hardware" in URI:
[261a5ed]301                                self.process_hardware(client)
[900a1c8]302                        elif ("software" in URI):
303                                self.process_software(client, request)
[9fd8f2d]304                        elif ("run/schedule" in URI):
305                                self.process_schedule(client)
[dabc7eb]306                        elif "refresh" in URI:
307                                self.process_refresh(client)
[6764fc4]308                        else:
[3dfe549]309                                LOGGER.warn('Unsupported request: %s',
310                                            {URI[:ogRest.LOG_LENGTH]})
[86eb703]311                                response = restResponse(ogResponses.BAD_REQUEST)
312                                client.send(response.get())
[f8e566b]313                                self.state = ThreadState.IDLE
[a85c113]314                elif ("POST" in method):
[6764fc4]315                        if ("poweroff" in URI):
316                                self.process_poweroff(client)
[9c34a8e]317                        elif "probe" in URI:
318                                self.process_probe(client)
[6764fc4]319                        elif ("reboot" in URI):
320                                self.process_reboot(client)
321                        elif ("shell/run" in URI):
[8fc251e]322                                self.process_shellrun(client, request)
[2fa8aa4]323                        elif ("session" in URI):
[8fc251e]324                                self.process_session(client, request)
[efbe8a7]325                        elif ("setup" in URI):
[8fc251e]326                                self.process_setup(client, request)
[cc11d8f]327                        elif ("image/restore" in URI):
[2e80653]328                                self.process_imagerestore(client, request)
[d5dca0f]329                        elif ("stop" in URI):
330                                self.process_stop(client)
[b2fd0b5]331                        elif ("image/create" in URI):
[2e80653]332                                self.process_imagecreate(client, request)
[6764fc4]333                        else:
[3dfe549]334                                LOGGER.warn('Unsupported request: %s',
335                                            URI[:ogRest.LOG_LENGTH])
[86eb703]336                                response = restResponse(ogResponses.BAD_REQUEST)
337                                client.send(response.get())
[f8e566b]338                                self.state = ThreadState.IDLE
[dfc97ff]339                else:
[86eb703]340                        response = restResponse(ogResponses.BAD_REQUEST)
341                        client.send(response.get())
[f8e566b]342                        self.state = ThreadState.IDLE
[dfc97ff]343
344                return 0
345
[6f7ba32]346        def kill_process(self):
347                try:
348                        os.kill(self.proc.pid, signal.SIGTERM)
349                except:
350                        pass
351
352                time.sleep(2)
353                try:
354                        os.kill(self.proc.pid, signal.SIGKILL)
355                except:
356                        pass
357
358                self.state = ThreadState.IDLE
359
[dfc97ff]360        def process_reboot(self, client):
[86eb703]361                response = restResponse(ogResponses.IN_PROGRESS)
362                client.send(response.get())
363
[b576836]364                if self.mode != 'virtual':
365                        client.disconnect()
366                        if self.state == ThreadState.BUSY:
367                                self.kill_process()
[583057b]368
[f56065a]369                threading.Thread(target=ogThread.reboot, args=(self,)).start()
[dfc97ff]370
371        def process_poweroff(self, client):
[86eb703]372                response = restResponse(ogResponses.IN_PROGRESS)
373                client.send(response.get())
374
[b576836]375                if self.mode != 'virtual':
376                        client.disconnect()
377                        if self.state == ThreadState.BUSY:
378                                self.kill_process()
[583057b]379
[f56065a]380                threading.Thread(target=ogThread.poweroff, args=(self,)).start()
[dfc97ff]381
382        def process_probe(self, client):
[bd98dd1]383                try:
384                        status = self.operations.probe(self)
385                except:
386                        response = restResponse(ogResponses.INTERNAL_ERR)
387                        client.send(response.get())
388                        return
389
[3917c36]390                json_body = jsonBody()
[bd98dd1]391                for k, v in status.items():
392                        json_body.add_element(k, v)
[86eb703]393
[583057b]394                if self.state != ThreadState.BUSY:
395                        response = restResponse(ogResponses.OK, json_body)
396                else:
397                        response = restResponse(ogResponses.IN_PROGRESS, json_body)
398
[86eb703]399                client.send(response.get())
[e20daf6]400
[8fc251e]401        def process_shellrun(self, client, request):
[5964e48]402                threading.Thread(target=ogThread.shellrun, args=(client, request, self,)).start()
[2fa8aa4]403
[8fc251e]404        def process_session(self, client, request):
[2e80653]405                threading.Thread(target=ogThread.session, args=(client, request, self,)).start()
[6d1e79b]406
[8fc251e]407        def process_software(self, client, request):
[ca0a62f]408                path = '/tmp/CSft-' + client.ip + '-' + str(request.getPartition())
[2e80653]409                threading.Thread(target=ogThread.software, args=(client, request, path, self,)).start()
[261a5ed]410
411        def process_hardware(self, client):
412                path = '/tmp/Chrd-' + client.ip
[2e80653]413                threading.Thread(target=ogThread.hardware, args=(client, path, self,)).start()
[9fd8f2d]414
415        def process_schedule(self, client):
[86eb703]416                response = restResponse(ogResponses.OK)
417                client.send(response.get())
[4e1ad0f]418                self.state = ThreadState.IDLE
[efbe8a7]419
[8fc251e]420        def process_setup(self, client, request):
[2e80653]421                threading.Thread(target=ogThread.setup, args=(client, request, self,)).start()
[cc11d8f]422
[2e80653]423        def process_imagerestore(self, client, request):
424                threading.Thread(target=ogThread.image_restore, args=(client, request, self,)).start()
[d5dca0f]425
426        def process_stop(self, client):
427                client.disconnect()
[583057b]428                if self.state == ThreadState.BUSY:
[6f7ba32]429                        self.kill_process()
[d5dca0f]430                        self.terminated = True
[583057b]431
432                sys.exit(0)
[b2fd0b5]433
[2e80653]434        def process_imagecreate(self, client, request):
[8fc251e]435                path = '/tmp/CSft-' + client.ip + '-' + request.getPartition()
[2e80653]436                threading.Thread(target=ogThread.image_create, args=(client, path, request, self,)).start()
[b5e182f]437
438        def process_refresh(self, client):
[2e80653]439                threading.Thread(target=ogThread.refresh, args=(client, self,)).start()
Note: See TracBrowser for help on using the repository browser.