1import os 2import re 3import socket 4import subprocess 5import time 6from shutil import copyfile 7from threading import Event, Thread 8 9import ttfw_idf 10from tiny_test_fw import DUT, Utility 11 12stop_sock_listener = Event() 13stop_io_listener = Event() 14sock = None 15client_address = None 16manual_test = False 17 18 19def io_listener(dut1): 20 global sock 21 global client_address 22 data = b'' 23 while not stop_io_listener.is_set(): 24 try: 25 data = dut1.expect(re.compile(r'PacketOut:\[([a-fA-F0-9]+)\]'), timeout=5) 26 except DUT.ExpectTimeout: 27 continue 28 if data != () and data[0] != b'': 29 packet_data = data[0] 30 print('Packet_data>{}<'.format(packet_data)) 31 response = bytearray.fromhex(packet_data.decode()) 32 print('Sending to socket:') 33 packet = ' '.join(format(x, '02x') for x in bytearray(response)) 34 print('Packet>{}<'.format(packet)) 35 if client_address is not None: 36 sock.sendto(response, ('127.0.0.1', 7777)) 37 38 39def sock_listener(dut1): 40 global sock 41 global client_address 42 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 43 sock.settimeout(5) 44 server_address = '0.0.0.0' 45 server_port = 7771 46 server = (server_address, server_port) 47 sock.bind(server) 48 try: 49 while not stop_sock_listener.is_set(): 50 try: 51 payload, client_address = sock.recvfrom(1024) 52 packet = ' '.join(format(x, '02x') for x in bytearray(payload)) 53 print('Received from address {}, data {}'.format(client_address, packet)) 54 dut1.write(str.encode(packet)) 55 except socket.timeout: 56 pass 57 finally: 58 sock.close() 59 sock = None 60 61 62@ttfw_idf.idf_example_test(env_tag='Example_WIFI') 63def lwip_test_suite(env, extra_data): 64 global stop_io_listener 65 global stop_sock_listener 66 """ 67 steps: | 68 1. Rebuilds test suite with esp32_netsuite.ttcn 69 2. Starts listeners on stdout and socket 70 3. Execute ttcn3 test suite 71 4. Collect result from ttcn3 72 """ 73 dut1 = env.get_dut('net_suite', 'examples/system/network_tests', dut_class=ttfw_idf.ESP32DUT) 74 # check and log bin size 75 binary_file = os.path.join(dut1.app.binary_path, 'net_suite.bin') 76 bin_size = os.path.getsize(binary_file) 77 ttfw_idf.log_performance('net_suite', '{}KB'.format(bin_size // 1024)) 78 ttfw_idf.check_performance('net_suite', bin_size // 1024, dut1.TARGET) 79 dut1.start_app() 80 thread1 = Thread(target=sock_listener, args=(dut1, )) 81 thread2 = Thread(target=io_listener, args=(dut1, )) 82 if not manual_test: 83 # Variables refering to esp32 ttcn test suite 84 TTCN_SRC = 'esp32_netsuite.ttcn' 85 TTCN_CFG = 'esp32_netsuite.cfg' 86 # System Paths 87 netsuite_path = os.getenv('NETSUITE_PATH') 88 netsuite_src_path = os.path.join(netsuite_path, 'src') 89 test_dir = os.path.dirname(os.path.realpath(__file__)) 90 # Building the suite 91 print('Rebuilding the test suite') 92 print('-------------------------') 93 # copy esp32 specific files to ttcn net-suite dir 94 copyfile(os.path.join(test_dir, TTCN_SRC), os.path.join(netsuite_src_path, TTCN_SRC)) 95 copyfile(os.path.join(test_dir, TTCN_CFG), os.path.join(netsuite_src_path, TTCN_CFG)) 96 proc = subprocess.Popen(['bash', '-c', 'cd ' + netsuite_src_path + ' && source make.sh'], 97 cwd=netsuite_path, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 98 output = proc.stdout.read() 99 print('Note: First build step we expect failure (titan/net_suite build system not suitable for multijob make)') 100 print(output) 101 proc = subprocess.Popen(['bash', '-c', 'cd ' + netsuite_src_path + ' && make'], 102 cwd=netsuite_path, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 103 print('Note: This time all dependencies shall be generated -- multijob make shall pass') 104 output = proc.stdout.read() 105 print(output) 106 # Executing the test suite 107 thread1.start() 108 thread2.start() 109 time.sleep(2) 110 print('Executing the test suite') 111 print('------------------------') 112 proc = subprocess.Popen(['ttcn3_start', os.path.join(netsuite_src_path,'test_suite'), os.path.join(netsuite_src_path, TTCN_CFG)], 113 stdout=subprocess.PIPE) 114 output = proc.stdout.read() 115 print(output) 116 print('Collecting results') 117 print('------------------') 118 verdict_stats = re.search('(Verdict statistics:.*)', output) 119 if verdict_stats: 120 verdict_stats = verdict_stats.group(1) 121 else: 122 verdict_stats = b'' 123 verdict = re.search('Overall verdict: pass', output) 124 if verdict: 125 print('Test passed!') 126 Utility.console_log(verdict_stats, 'green') 127 else: 128 Utility.console_log(verdict_stats, 'red') 129 raise ValueError('Test failed with: {}'.format(verdict_stats)) 130 else: 131 try: 132 # Executing the test suite 133 thread1.start() 134 thread2.start() 135 time.sleep(2) 136 while True: 137 time.sleep(0.5) 138 except KeyboardInterrupt: 139 pass 140 print('Executing done, waiting for tests to finish') 141 print('-------------------------------------------') 142 stop_io_listener.set() 143 stop_sock_listener.set() 144 thread1.join() 145 thread2.join() 146 147 148if __name__ == '__main__': 149 print('Manual execution, please build and start ttcn in a separate console') 150 manual_test = True 151 lwip_test_suite() 152