1 | # |
---|
2 | # Copyright (C) 2020-2021 Soleta Networks <info@soleta.eu> |
---|
3 | # |
---|
4 | # This program is free software: you can redistribute it and/or modify it under |
---|
5 | # the terms of the GNU Affero General Public License as published by the |
---|
6 | # Free Software Foundation; either version 3 of the License, or |
---|
7 | # (at your option) any later version. |
---|
8 | |
---|
9 | import socket |
---|
10 | import sys |
---|
11 | |
---|
12 | class Server(): |
---|
13 | |
---|
14 | _probe_json = '{"id": 0, "name": "test_local", "center": 0, "room": 0}' |
---|
15 | _probe_msg = 'POST /probe HTTP/1.0\r\nContent-Length: '+ \ |
---|
16 | str(len(_probe_json)) + \ |
---|
17 | '\r\nContent-Type: application/json\r\n\r\n' + _probe_json |
---|
18 | _recv_buffer_size = 16384 |
---|
19 | |
---|
20 | def __init__(self, host='127.0.0.1', port=1234): |
---|
21 | self.host = host |
---|
22 | self.port = port |
---|
23 | self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
---|
24 | self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) |
---|
25 | |
---|
26 | def connect(self, probe=True): |
---|
27 | try: |
---|
28 | self.sock.bind((self.host, self.port)) |
---|
29 | except socket.error as msg: |
---|
30 | print('Bind failed. Error Code : ' + str(msg[0]) + ' Message ' |
---|
31 | + msg[1]) |
---|
32 | sys.exit() |
---|
33 | |
---|
34 | self.sock.listen(10) |
---|
35 | self.conn, self.addr = self.sock.accept() |
---|
36 | if probe: |
---|
37 | self.send(self._probe_msg) |
---|
38 | return self.recv() |
---|
39 | |
---|
40 | def send(self, msg): |
---|
41 | self.conn.send(msg.encode()) |
---|
42 | |
---|
43 | def recv(self): |
---|
44 | return self.conn.recv(self._recv_buffer_size).decode('utf-8') |
---|
45 | |
---|
46 | def stop(self): |
---|
47 | self.conn.close() |
---|
48 | self.sock.close() |
---|