1from __future__ import print_function, unicode_literals 2 3import os 4import random 5import re 6import socket 7import string 8from threading import Event, Thread 9 10import ttfw_idf 11from SimpleWebSocketServer import SimpleWebSocketServer, WebSocket 12from tiny_test_fw import Utility 13 14 15def get_my_ip(): 16 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 17 try: 18 # doesn't even have to be reachable 19 s.connect(('10.255.255.255', 1)) 20 IP = s.getsockname()[0] 21 except Exception: 22 IP = '127.0.0.1' 23 finally: 24 s.close() 25 return IP 26 27 28class TestEcho(WebSocket): 29 30 def handleMessage(self): 31 self.sendMessage(self.data) 32 print('Server sent: {}'.format(self.data)) 33 34 def handleConnected(self): 35 print('Connection from: {}'.format(self.address)) 36 37 def handleClose(self): 38 print('{} closed the connection'.format(self.address)) 39 40 41# Simple Websocket server for testing purposes 42class Websocket(object): 43 44 def send_data(self, data): 45 for nr, conn in self.server.connections.items(): 46 conn.sendMessage(data) 47 48 def run(self): 49 self.server = SimpleWebSocketServer('', self.port, TestEcho) 50 while not self.exit_event.is_set(): 51 self.server.serveonce() 52 53 def __init__(self, port): 54 self.port = port 55 self.exit_event = Event() 56 self.thread = Thread(target=self.run) 57 self.thread.start() 58 59 def __enter__(self): 60 return self 61 62 def __exit__(self, exc_type, exc_value, traceback): 63 self.exit_event.set() 64 self.thread.join(10) 65 if self.thread.is_alive(): 66 Utility.console_log('Thread cannot be joined', 'orange') 67 68 69def test_echo(dut): 70 dut.expect('WEBSOCKET_EVENT_CONNECTED') 71 for i in range(0, 5): 72 dut.expect(re.compile(r'Received=hello (\d)'), timeout=30) 73 print('All echos received') 74 75 76def test_close(dut): 77 code = dut.expect(re.compile(r'WEBSOCKET: Received closed message with code=(\d*)'), timeout=60)[0] 78 print('Received close frame with code {}'.format(code)) 79 80 81def test_recv_long_msg(dut, websocket, msg_len, repeats): 82 send_msg = ''.join(random.choice(string.ascii_uppercase + string.ascii_lowercase + string.digits) for _ in range(msg_len)) 83 84 for _ in range(repeats): 85 websocket.send_data(send_msg) 86 87 recv_msg = '' 88 while len(recv_msg) < msg_len: 89 # Filter out color encoding 90 match = dut.expect(re.compile(r'Received=([a-zA-Z0-9]*).*\n'), timeout=30)[0] 91 recv_msg += match 92 93 if recv_msg == send_msg: 94 print('Sent message and received message are equal') 95 else: 96 raise ValueError('DUT received string do not match sent string, \nexpected: {}\nwith length {}\ 97 \nreceived: {}\nwith length {}'.format(send_msg, len(send_msg), recv_msg, len(recv_msg))) 98 99 100@ttfw_idf.idf_example_test(env_tag='Example_EthKitV1') 101def test_examples_protocol_websocket(env, extra_data): 102 """ 103 steps: 104 1. obtain IP address 105 2. connect to uri specified in the config 106 3. send and receive data 107 """ 108 dut1 = env.get_dut('websocket', 'examples/protocols/websocket', dut_class=ttfw_idf.ESP32DUT) 109 # check and log bin size 110 binary_file = os.path.join(dut1.app.binary_path, 'websocket_example.bin') 111 bin_size = os.path.getsize(binary_file) 112 ttfw_idf.log_performance('websocket_bin_size', '{}KB'.format(bin_size // 1024)) 113 114 try: 115 if 'CONFIG_WEBSOCKET_URI_FROM_STDIN' in dut1.app.get_sdkconfig(): 116 uri_from_stdin = True 117 else: 118 uri = dut1.app.get_sdkconfig()['CONFIG_WEBSOCKET_URI'].strip('"') 119 uri_from_stdin = False 120 121 except Exception: 122 print('ENV_TEST_FAILURE: Cannot find uri settings in sdkconfig') 123 raise 124 125 # start test 126 dut1.start_app() 127 128 if uri_from_stdin: 129 server_port = 4455 130 with Websocket(server_port) as ws: 131 uri = 'ws://{}:{}'.format(get_my_ip(), server_port) 132 print('DUT connecting to {}'.format(uri)) 133 dut1.expect('Please enter uri of websocket endpoint', timeout=30) 134 dut1.write(uri) 135 test_echo(dut1) 136 # Message length should exceed DUT's buffer size to test fragmentation, default is 1024 byte 137 test_recv_long_msg(dut1, ws, 2000, 3) 138 test_close(dut1) 139 140 else: 141 print('DUT connecting to {}'.format(uri)) 142 test_echo(dut1) 143 144 145if __name__ == '__main__': 146 test_examples_protocol_websocket() 147