source: ogClient-Git/tests/units/server.py @ fdd4ba5

Last change on this file since fdd4ba5 was 05b1088, checked in by Alvaro Neira Ayuso <alvaroneay@…>, 5 years ago

Include License header

  • Property mode set to 100644
File size: 1.4 KB
Line 
1#
2# Copyright (C) 2020 Soleta Networks <info@soleta.eu>
3#
4# This program is free software: you can redistribute it and/or modify it under
5# the terms of the GNU Affero General Public License as published by the
6# Free Software Foundation, version 3.
7#
8
9import socket
10import sys
11
12HOST = '127.0.0.1'
13PORT = '1234'
14
15class Server():
16
17    _probe_json = '{"id": 0, "name": "test_local", "center": 0, "room": 0}'
18    _probe_msg = 'POST /probe HTTP/1.0\r\nContent-Length:'+ \
19                 str(len(_probe_json)) + \
20                 '\r\nContent-Type:application/json\r\n\r\n' + _probe_json
21
22    def __init__(self, host='127.0.0.1', port=1234):
23        self.host = host
24        self.port = port
25        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
26        self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
27
28    def connect(self, probe=True):
29        try:
30            self.sock.bind((self.host, self.port))
31        except socket.error as msg:
32            print('Bind failed. Error Code : ' + str(msg[0]) + ' Message '
33                  + msg[1])
34            sys.exit()
35
36        self.sock.listen(10)
37        self.conn, self.addr = self.sock.accept()
38        if probe:
39            self.send(self._probe_msg)
40            return self.recv()
41
42    def send(self, msg):
43        self.conn.send(msg.encode())
44
45    def recv(self):
46        return self.conn.recv(1024).decode('utf-8')
47
48    def stop(self):
49        self.conn.close()
50        self.sock.close()
Note: See TracBrowser for help on using the repository browser.