1from __future__ import print_function, unicode_literals
2
3import os
4import random
5import re
6import socket
7import string
8from threading import Event, Thread
9
10import ttfw_idf
11from SimpleWebSocketServer import SimpleWebSocketServer, WebSocket
12from tiny_test_fw import Utility
13
14
15def get_my_ip():
16    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
17    try:
18        # doesn't even have to be reachable
19        s.connect(('10.255.255.255', 1))
20        IP = s.getsockname()[0]
21    except Exception:
22        IP = '127.0.0.1'
23    finally:
24        s.close()
25    return IP
26
27
28class TestEcho(WebSocket):
29
30    def handleMessage(self):
31        self.sendMessage(self.data)
32        print('Server sent: {}'.format(self.data))
33
34    def handleConnected(self):
35        print('Connection from: {}'.format(self.address))
36
37    def handleClose(self):
38        print('{} closed the connection'.format(self.address))
39
40
41# Simple Websocket server for testing purposes
42class Websocket(object):
43
44    def send_data(self, data):
45        for nr, conn in self.server.connections.items():
46            conn.sendMessage(data)
47
48    def run(self):
49        self.server = SimpleWebSocketServer('', self.port, TestEcho)
50        while not self.exit_event.is_set():
51            self.server.serveonce()
52
53    def __init__(self, port):
54        self.port = port
55        self.exit_event = Event()
56        self.thread = Thread(target=self.run)
57        self.thread.start()
58
59    def __enter__(self):
60        return self
61
62    def __exit__(self, exc_type, exc_value, traceback):
63        self.exit_event.set()
64        self.thread.join(10)
65        if self.thread.is_alive():
66            Utility.console_log('Thread cannot be joined', 'orange')
67
68
69def test_echo(dut):
70    dut.expect('WEBSOCKET_EVENT_CONNECTED')
71    for i in range(0, 5):
72        dut.expect(re.compile(r'Received=hello (\d)'), timeout=30)
73    print('All echos received')
74
75
76def test_close(dut):
77    code = dut.expect(re.compile(r'WEBSOCKET: Received closed message with code=(\d*)'), timeout=60)[0]
78    print('Received close frame with code {}'.format(code))
79
80
81def test_recv_long_msg(dut, websocket, msg_len, repeats):
82    send_msg = ''.join(random.choice(string.ascii_uppercase + string.ascii_lowercase + string.digits) for _ in range(msg_len))
83
84    for _ in range(repeats):
85        websocket.send_data(send_msg)
86
87        recv_msg = ''
88        while len(recv_msg) < msg_len:
89            # Filter out color encoding
90            match = dut.expect(re.compile(r'Received=([a-zA-Z0-9]*).*\n'), timeout=30)[0]
91            recv_msg += match
92
93        if recv_msg == send_msg:
94            print('Sent message and received message are equal')
95        else:
96            raise ValueError('DUT received string do not match sent string, \nexpected: {}\nwith length {}\
97                            \nreceived: {}\nwith length {}'.format(send_msg, len(send_msg), recv_msg, len(recv_msg)))
98
99
100@ttfw_idf.idf_example_test(env_tag='Example_EthKitV1')
101def test_examples_protocol_websocket(env, extra_data):
102    """
103    steps:
104      1. obtain IP address
105      2. connect to uri specified in the config
106      3. send and receive data
107    """
108    dut1 = env.get_dut('websocket', 'examples/protocols/websocket', dut_class=ttfw_idf.ESP32DUT)
109    # check and log bin size
110    binary_file = os.path.join(dut1.app.binary_path, 'websocket_example.bin')
111    bin_size = os.path.getsize(binary_file)
112    ttfw_idf.log_performance('websocket_bin_size', '{}KB'.format(bin_size // 1024))
113
114    try:
115        if 'CONFIG_WEBSOCKET_URI_FROM_STDIN' in dut1.app.get_sdkconfig():
116            uri_from_stdin = True
117        else:
118            uri = dut1.app.get_sdkconfig()['CONFIG_WEBSOCKET_URI'].strip('"')
119            uri_from_stdin = False
120
121    except Exception:
122        print('ENV_TEST_FAILURE: Cannot find uri settings in sdkconfig')
123        raise
124
125    # start test
126    dut1.start_app()
127
128    if uri_from_stdin:
129        server_port = 4455
130        with Websocket(server_port) as ws:
131            uri = 'ws://{}:{}'.format(get_my_ip(), server_port)
132            print('DUT connecting to {}'.format(uri))
133            dut1.expect('Please enter uri of websocket endpoint', timeout=30)
134            dut1.write(uri)
135            test_echo(dut1)
136            # Message length should exceed DUT's buffer size to test fragmentation, default is 1024 byte
137            test_recv_long_msg(dut1, ws, 2000, 3)
138            test_close(dut1)
139
140    else:
141        print('DUT connecting to {}'.format(uri))
142        test_echo(dut1)
143
144
145if __name__ == '__main__':
146    test_examples_protocol_websocket()
147