source: ogClient-Git/src/ogRest.py @ 1ab981a

Last change on this file since 1ab981a was 2465ef2, checked in by Jose M. Guisado <jguisado@…>, 3 years ago

#1065 Add event datagram socket

ogClient can receive events via a datagram socket opened at 55885.
This socket is only opened when in windows or linux mode, for
event reporting from within the system.

Events reported this way are sent back to ogServer via a 103 Early
Hints HTTP message. Information regarding the event is sent in the
response's payload.

  • Property mode set to 100644
File size: 12.3 KB
Line 
1#
2# Copyright (C) 2020-2021 Soleta Networks <info@soleta.eu>
3#
4# This program is free software: you can redistribute it and/or modify it under
5# the terms of the GNU Affero General Public License as published by the
6# Free Software Foundation; either version 3 of the License, or
7# (at your option) any later version.
8
9import threading
10import platform
11import time
12from enum import Enum
13import json
14import queue
15import sys
16import os
17import signal
18import logging
19from logging.handlers import SysLogHandler
20
21from src.restRequest import *
22
23LOGGER = logging.getLogger()
24
25class ThreadState(Enum):
26        IDLE = 0
27        BUSY = 1
28
29class jsonBody():
30        def __init__(self, dictionary=None):
31                if dictionary:
32                        self.jsontree = dictionary
33                else:
34                        self.jsontree = {}
35
36        def add_element(self, key, value):
37                self.jsontree[key] = value
38
39        def dump(self):
40                return json.dumps(self.jsontree)
41
42class restResponse():
43        def __init__(self, response, json_body=None):
44                self.msg = ''
45                if response == ogResponses.BAD_REQUEST:
46                        self.msg = 'HTTP/1.0 400 Bad Request'
47                elif response == ogResponses.IN_PROGRESS:
48                        self.msg = 'HTTP/1.0 202 Accepted'
49                elif response == ogResponses.OK:
50                        self.msg = 'HTTP/1.0 200 OK'
51                elif response == ogResponses.INTERNAL_ERR:
52                        self.msg = 'HTTP/1.0 500 Internal Server Error'
53                elif response == ogResponses.UNAUTHORIZED:
54                        self.msg = 'HTTP/1.0 401 Unauthorized'
55                elif response == ogResponses.SERVICE_UNAVAILABLE:
56                        self.msg = 'HTTP/1.0 503 Service Unavailable'
57                elif response == ogResponses.EARLY_HINTS:
58                        self.msg = 'HTTP/1.0 103 Early Hints'
59                else:
60                        return self.msg
61
62                if response in {ogResponses.OK, ogResponses.IN_PROGRESS}:
63                        LOGGER.info(self.msg[:ogRest.LOG_LENGTH])
64                else:
65                        LOGGER.warn(self.msg[:ogRest.LOG_LENGTH])
66
67                self.msg += '\r\n'
68
69                if json_body:
70                        self.msg += 'Content-Length: ' + str(len(json_body.dump()))
71                        self.msg += '\r\nContent-Type: application/json'
72                        self.msg += '\r\n\r\n' + json_body.dump()
73                else:
74                        self.msg += 'Content-Length: 0\r\n' \
75                                    'Content-Type: application/json\r\n\r\n'
76
77
78        def get(self):
79                return self.msg
80
81class ogThread():
82        def shellrun(client, request, ogRest):
83                if not request.getrun():
84                        response = restResponse(ogResponses.BAD_REQUEST)
85                        client.send(response.get())
86                        ogRest.state = ThreadState.IDLE
87                        return
88
89                try:
90                        shellout = ogRest.operations.shellrun(request, ogRest)
91                except ValueError as err:
92                        response = restResponse(ogResponses.INTERNAL_ERR)
93                        client.send(response.get())
94                        ogRest.state = ThreadState.IDLE
95                        return
96
97                if request.getEcho():
98                        json_body = jsonBody()
99                        json_body.add_element('out', shellout)
100                        response = restResponse(ogResponses.OK, json_body)
101                        client.send(response.get())
102                else:
103                        response = restResponse(ogResponses.OK)
104                        client.send(response.get())
105
106                ogRest.state = ThreadState.IDLE
107
108        def poweroff(ogRest):
109                time.sleep(2)
110                ogRest.operations.poweroff()
111
112        def reboot(ogRest):
113                ogRest.operations.reboot()
114
115        def session(client, request, ogRest):
116                try:
117                        ogRest.operations.session(request, ogRest)
118                except ValueError as err:
119                        response = restResponse(ogResponses.INTERNAL_ERR)
120                        client.send(response.get())
121                        ogRest.state = ThreadState.IDLE
122                        return
123
124                response = restResponse(ogResponses.OK)
125                client.send(response.get())
126                client.disconnect()
127
128        def software(client, request, path, ogRest):
129                try:
130                        software = ogRest.operations.software(request, path, ogRest)
131                except ValueError as err:
132                        response = restResponse(ogResponses.INTERNAL_ERR)
133                        client.send(response.get())
134                        ogRest.state = ThreadState.IDLE
135                        return
136
137                json_body = jsonBody()
138                json_body.add_element('partition', request.getPartition())
139                json_body.add_element('software', software)
140
141                response = restResponse(ogResponses.OK, json_body)
142                client.send(response.get())
143                ogRest.state = ThreadState.IDLE
144
145        def hardware(client, path, ogRest):
146                try:
147                        ogRest.operations.hardware(path, ogRest)
148                except ValueError as err:
149                        response = restResponse(ogResponses.INTERNAL_ERR)
150                        client.send(response.get())
151                        ogRest.state = ThreadState.IDLE
152                        return
153
154                json_body = jsonBody()
155                with open(path, 'r') as f:
156                        json_body.add_element('hardware', f.read())
157
158                response = restResponse(ogResponses.OK, json_body)
159                client.send(response.get())
160                ogRest.state = ThreadState.IDLE
161
162        def setup(client, request, ogRest):
163                try:
164                        out = ogRest.operations.setup(request, ogRest)
165                except ValueError as err:
166                        response = restResponse(ogResponses.INTERNAL_ERR)
167                        client.send(response.get())
168                        ogRest.state = ThreadState.IDLE
169                        return
170
171                json_body = jsonBody(out)
172
173                response = restResponse(ogResponses.OK, json_body)
174                client.send(response.get())
175                ogRest.state = ThreadState.IDLE
176
177        def image_restore(client, request, ogRest):
178                try:
179                        ogRest.operations.image_restore(request, ogRest)
180                except ValueError as err:
181                        response = restResponse(ogResponses.INTERNAL_ERR)
182                        client.send(response.get())
183                        ogRest.state = ThreadState.IDLE
184                        return
185
186                json_body = jsonBody()
187                json_body.add_element('disk', request.getDisk())
188                json_body.add_element('partition', request.getPartition())
189                json_body.add_element('image_id', request.getId())
190
191                response = restResponse(ogResponses.OK, json_body)
192                client.send(response.get())
193                ogRest.state = ThreadState.IDLE
194
195        def image_create(client, path, request, ogRest):
196                try:
197                        image_info = ogRest.operations.image_create(path,
198                                                                    request,
199                                                                    ogRest)
200                        software = ogRest.operations.software(request, path, ogRest)
201                except ValueError as err:
202                        response = restResponse(ogResponses.INTERNAL_ERR)
203                        client.send(response.get())
204                        ogRest.state = ThreadState.IDLE
205                        return
206
207                kibi = 1024
208                datasize = int(image_info['datasize']) * kibi
209
210                json_body = jsonBody()
211                json_body.add_element('disk', request.getDisk())
212                json_body.add_element('partition', request.getPartition())
213                json_body.add_element('code', request.getCode())
214                json_body.add_element('id', request.getId())
215                json_body.add_element('name', request.getName())
216                json_body.add_element('repository', request.getRepo())
217                json_body.add_element('software', software)
218                json_body.add_element('clonator', image_info['clonator'])
219                json_body.add_element('compressor', image_info['compressor'])
220                json_body.add_element('filesystem', image_info['filesystem'])
221                json_body.add_element('datasize', datasize)
222
223                response = restResponse(ogResponses.OK, json_body)
224                client.send(response.get())
225                ogRest.state = ThreadState.IDLE
226
227        def refresh(client, ogRest):
228                try:
229                        out = ogRest.operations.refresh(ogRest)
230                except ValueError as err:
231                        response = restResponse(ogResponses.INTERNAL_ERR)
232                        client.send(response.get())
233                        ogRest.state = ThreadState.IDLE
234                        return
235
236                json_body = jsonBody(out)
237
238                response = restResponse(ogResponses.OK, json_body)
239                client.send(response.get())
240                ogRest.state = ThreadState.IDLE
241
242class ogResponses(Enum):
243        BAD_REQUEST=0
244        IN_PROGRESS=1
245        OK=2
246        INTERNAL_ERR=3
247        UNAUTHORIZED=4
248        SERVICE_UNAVAILABLE=5
249        EARLY_HINTS=6
250
251class ogRest():
252        LOG_LENGTH = 32
253
254        def __init__(self, config):
255                self.proc = None
256                self.terminated = False
257                self.state = ThreadState.IDLE
258                self.CONFIG = config
259                self.mode = self.CONFIG['opengnsys']['mode']
260                self.samba_config = self.CONFIG['samba']
261
262                if self.mode == 'live':
263                        from src.live.ogOperations import OgLiveOperations
264                        self.operations = OgLiveOperations(self.CONFIG)
265                elif self.mode == 'virtual':
266                        from src.virtual.ogOperations import \
267                                OgVirtualOperations
268                        self.operations = OgVirtualOperations()
269                        threading.Thread(target=self.operations.check_vm_state_loop,
270                                         args=(self,)).start()
271                elif self.mode == 'linux':
272                        from src.linux.ogOperations import OgLinuxOperations
273                        self.operations = OgLinuxOperations()
274                elif self.mode == 'windows':
275                        from src.windows.ogOperations import OgWindowsOperations
276                        self.operations = OgWindowsOperations()
277                else:
278                        raise ValueError('Mode not supported.')
279
280        def process_request(self, request, client):
281                method = request.get_method()
282                URI = request.get_uri()
283
284                LOGGER.debug('%s%s', method, URI[:ogRest.LOG_LENGTH])
285
286                if (not "stop" in URI and
287                    not "reboot" in URI and
288                    not "poweroff" in URI and
289                    not "probe" in URI):
290                        if self.state == ThreadState.BUSY:
291                                LOGGER.warn('Request has been received '
292                                            'while ogClient is busy')
293                                response = restResponse(ogResponses.SERVICE_UNAVAILABLE)
294                                client.send(response.get())
295                                return
296                        else:
297                                self.state = ThreadState.BUSY
298
299                if ("GET" in method):
300                        if "hardware" in URI:
301                                self.process_hardware(client)
302                        elif ("software" in URI):
303                                self.process_software(client, request)
304                        elif ("run/schedule" in URI):
305                                self.process_schedule(client)
306                        elif "refresh" in URI:
307                                self.process_refresh(client)
308                        else:
309                                LOGGER.warn('Unsupported request: %s',
310                                            {URI[:ogRest.LOG_LENGTH]})
311                                response = restResponse(ogResponses.BAD_REQUEST)
312                                client.send(response.get())
313                                self.state = ThreadState.IDLE
314                elif ("POST" in method):
315                        if ("poweroff" in URI):
316                                self.process_poweroff(client)
317                        elif "probe" in URI:
318                                self.process_probe(client)
319                        elif ("reboot" in URI):
320                                self.process_reboot(client)
321                        elif ("shell/run" in URI):
322                                self.process_shellrun(client, request)
323                        elif ("session" in URI):
324                                self.process_session(client, request)
325                        elif ("setup" in URI):
326                                self.process_setup(client, request)
327                        elif ("image/restore" in URI):
328                                self.process_imagerestore(client, request)
329                        elif ("stop" in URI):
330                                self.process_stop(client)
331                        elif ("image/create" in URI):
332                                self.process_imagecreate(client, request)
333                        else:
334                                LOGGER.warn('Unsupported request: %s',
335                                            URI[:ogRest.LOG_LENGTH])
336                                response = restResponse(ogResponses.BAD_REQUEST)
337                                client.send(response.get())
338                                self.state = ThreadState.IDLE
339                else:
340                        response = restResponse(ogResponses.BAD_REQUEST)
341                        client.send(response.get())
342                        self.state = ThreadState.IDLE
343
344                return 0
345
346        def kill_process(self):
347                try:
348                        os.kill(self.proc.pid, signal.SIGTERM)
349                except:
350                        pass
351
352                time.sleep(2)
353                try:
354                        os.kill(self.proc.pid, signal.SIGKILL)
355                except:
356                        pass
357
358                self.state = ThreadState.IDLE
359
360        def process_reboot(self, client):
361                response = restResponse(ogResponses.IN_PROGRESS)
362                client.send(response.get())
363
364                if self.mode != 'virtual':
365                        client.disconnect()
366                        if self.state == ThreadState.BUSY:
367                                self.kill_process()
368
369                threading.Thread(target=ogThread.reboot, args=(self,)).start()
370
371        def process_poweroff(self, client):
372                response = restResponse(ogResponses.IN_PROGRESS)
373                client.send(response.get())
374
375                if self.mode != 'virtual':
376                        client.disconnect()
377                        if self.state == ThreadState.BUSY:
378                                self.kill_process()
379
380                threading.Thread(target=ogThread.poweroff, args=(self,)).start()
381
382        def process_probe(self, client):
383                try:
384                        status = self.operations.probe(self)
385                except:
386                        response = restResponse(ogResponses.INTERNAL_ERR)
387                        client.send(response.get())
388                        return
389
390                json_body = jsonBody()
391                for k, v in status.items():
392                        json_body.add_element(k, v)
393
394                if self.state != ThreadState.BUSY:
395                        response = restResponse(ogResponses.OK, json_body)
396                else:
397                        response = restResponse(ogResponses.IN_PROGRESS, json_body)
398
399                client.send(response.get())
400
401        def process_shellrun(self, client, request):
402                threading.Thread(target=ogThread.shellrun, args=(client, request, self,)).start()
403
404        def process_session(self, client, request):
405                threading.Thread(target=ogThread.session, args=(client, request, self,)).start()
406
407        def process_software(self, client, request):
408                path = '/tmp/CSft-' + client.ip + '-' + str(request.getPartition())
409                threading.Thread(target=ogThread.software, args=(client, request, path, self,)).start()
410
411        def process_hardware(self, client):
412                path = '/tmp/Chrd-' + client.ip
413                threading.Thread(target=ogThread.hardware, args=(client, path, self,)).start()
414
415        def process_schedule(self, client):
416                response = restResponse(ogResponses.OK)
417                client.send(response.get())
418                self.state = ThreadState.IDLE
419
420        def process_setup(self, client, request):
421                threading.Thread(target=ogThread.setup, args=(client, request, self,)).start()
422
423        def process_imagerestore(self, client, request):
424                threading.Thread(target=ogThread.image_restore, args=(client, request, self,)).start()
425
426        def process_stop(self, client):
427                client.disconnect()
428                if self.state == ThreadState.BUSY:
429                        self.kill_process()
430                        self.terminated = True
431
432                sys.exit(0)
433
434        def process_imagecreate(self, client, request):
435                path = '/tmp/CSft-' + client.ip + '-' + request.getPartition()
436                threading.Thread(target=ogThread.image_create, args=(client, path, request, self,)).start()
437
438        def process_refresh(self, client):
439                threading.Thread(target=ogThread.refresh, args=(client, self,)).start()
Note: See TracBrowser for help on using the repository browser.