source: ogClient-Git/tests/units/server.py @ 355e6ca

Last change on this file since 355e6ca was cb9edc8, checked in by OpenGnSys Support Team <soporte-og@…>, 4 years ago

ogClient is AGPLv3+

Update license header in files.

  • Property mode set to 100644
File size: 1.5 KB
Line 
1#
2# Copyright (C) 2020-2021 Soleta Networks <info@soleta.eu>
3#
4# This program is free software: you can redistribute it and/or modify it under
5# the terms of the GNU Affero General Public License as published by the
6# Free Software Foundation; either version 3 of the License, or
7# (at your option) any later version.
8
9import socket
10import sys
11
12class Server():
13
14    _probe_json = '{"id": 0, "name": "test_local", "center": 0, "room": 0}'
15    _probe_msg = 'POST /probe HTTP/1.0\r\nContent-Length: '+ \
16                 str(len(_probe_json)) + \
17                 '\r\nContent-Type: application/json\r\n\r\n' + _probe_json
18    _recv_buffer_size = 16384
19
20    def __init__(self, host='127.0.0.1', port=1234):
21        self.host = host
22        self.port = port
23        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
24        self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
25
26    def connect(self, probe=True):
27        try:
28            self.sock.bind((self.host, self.port))
29        except socket.error as msg:
30            print('Bind failed. Error Code : ' + str(msg[0]) + ' Message '
31                  + msg[1])
32            sys.exit()
33
34        self.sock.listen(10)
35        self.conn, self.addr = self.sock.accept()
36        if probe:
37            self.send(self._probe_msg)
38            return self.recv()
39
40    def send(self, msg):
41        self.conn.send(msg.encode())
42
43    def recv(self):
44        return self.conn.recv(self._recv_buffer_size).decode('utf-8')
45
46    def stop(self):
47        self.conn.close()
48        self.sock.close()
Note: See TracBrowser for help on using the repository browser.