1import os
2import re
3import socket
4import subprocess
5import time
6from shutil import copyfile
7from threading import Event, Thread
8
9import ttfw_idf
10from tiny_test_fw import DUT, Utility
11
12stop_sock_listener = Event()
13stop_io_listener = Event()
14sock = None
15client_address = None
16manual_test = False
17
18
19def io_listener(dut1):
20    global sock
21    global client_address
22    data = b''
23    while not stop_io_listener.is_set():
24        try:
25            data = dut1.expect(re.compile(r'PacketOut:\[([a-fA-F0-9]+)\]'), timeout=5)
26        except DUT.ExpectTimeout:
27            continue
28        if data != () and data[0] != b'':
29            packet_data = data[0]
30            print('Packet_data>{}<'.format(packet_data))
31            response = bytearray.fromhex(packet_data.decode())
32            print('Sending to socket:')
33            packet = ' '.join(format(x, '02x') for x in bytearray(response))
34            print('Packet>{}<'.format(packet))
35            if client_address is not None:
36                sock.sendto(response, ('127.0.0.1', 7777))
37
38
39def sock_listener(dut1):
40    global sock
41    global client_address
42    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
43    sock.settimeout(5)
44    server_address = '0.0.0.0'
45    server_port = 7771
46    server = (server_address, server_port)
47    sock.bind(server)
48    try:
49        while not stop_sock_listener.is_set():
50            try:
51                payload, client_address = sock.recvfrom(1024)
52                packet = ' '.join(format(x, '02x') for x in bytearray(payload))
53                print('Received from address {}, data {}'.format(client_address, packet))
54                dut1.write(str.encode(packet))
55            except socket.timeout:
56                pass
57    finally:
58        sock.close()
59        sock = None
60
61
62@ttfw_idf.idf_example_test(env_tag='Example_WIFI')
63def lwip_test_suite(env, extra_data):
64    global stop_io_listener
65    global stop_sock_listener
66    """
67    steps: |
68      1. Rebuilds test suite with esp32_netsuite.ttcn
69      2. Starts listeners on stdout and socket
70      3. Execute ttcn3 test suite
71      4. Collect result from ttcn3
72    """
73    dut1 = env.get_dut('net_suite', 'examples/system/network_tests', dut_class=ttfw_idf.ESP32DUT)
74    # check and log bin size
75    binary_file = os.path.join(dut1.app.binary_path, 'net_suite.bin')
76    bin_size = os.path.getsize(binary_file)
77    ttfw_idf.log_performance('net_suite', '{}KB'.format(bin_size // 1024))
78    ttfw_idf.check_performance('net_suite', bin_size // 1024, dut1.TARGET)
79    dut1.start_app()
80    thread1 = Thread(target=sock_listener, args=(dut1, ))
81    thread2 = Thread(target=io_listener, args=(dut1, ))
82    if not manual_test:
83        # Variables refering to esp32 ttcn test suite
84        TTCN_SRC = 'esp32_netsuite.ttcn'
85        TTCN_CFG = 'esp32_netsuite.cfg'
86        # System Paths
87        netsuite_path = os.getenv('NETSUITE_PATH')
88        netsuite_src_path = os.path.join(netsuite_path, 'src')
89        test_dir = os.path.dirname(os.path.realpath(__file__))
90        # Building the suite
91        print('Rebuilding the test suite')
92        print('-------------------------')
93        # copy esp32 specific files to ttcn net-suite dir
94        copyfile(os.path.join(test_dir, TTCN_SRC), os.path.join(netsuite_src_path, TTCN_SRC))
95        copyfile(os.path.join(test_dir, TTCN_CFG), os.path.join(netsuite_src_path, TTCN_CFG))
96        proc = subprocess.Popen(['bash', '-c', 'cd ' + netsuite_src_path + ' && source make.sh'],
97                                cwd=netsuite_path, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
98        output = proc.stdout.read()
99        print('Note: First build step we expect failure (titan/net_suite build system not suitable for multijob make)')
100        print(output)
101        proc = subprocess.Popen(['bash', '-c', 'cd ' + netsuite_src_path + ' && make'],
102                                cwd=netsuite_path, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
103        print('Note: This time all dependencies shall be generated -- multijob make shall pass')
104        output = proc.stdout.read()
105        print(output)
106        # Executing the test suite
107        thread1.start()
108        thread2.start()
109        time.sleep(2)
110        print('Executing the test suite')
111        print('------------------------')
112        proc = subprocess.Popen(['ttcn3_start', os.path.join(netsuite_src_path,'test_suite'), os.path.join(netsuite_src_path, TTCN_CFG)],
113                                stdout=subprocess.PIPE)
114        output = proc.stdout.read()
115        print(output)
116        print('Collecting results')
117        print('------------------')
118        verdict_stats = re.search('(Verdict statistics:.*)', output)
119        if verdict_stats:
120            verdict_stats = verdict_stats.group(1)
121        else:
122            verdict_stats = b''
123        verdict = re.search('Overall verdict: pass', output)
124        if verdict:
125            print('Test passed!')
126            Utility.console_log(verdict_stats, 'green')
127        else:
128            Utility.console_log(verdict_stats, 'red')
129            raise ValueError('Test failed with: {}'.format(verdict_stats))
130    else:
131        try:
132            # Executing the test suite
133            thread1.start()
134            thread2.start()
135            time.sleep(2)
136            while True:
137                time.sleep(0.5)
138        except KeyboardInterrupt:
139            pass
140    print('Executing done, waiting for tests to finish')
141    print('-------------------------------------------')
142    stop_io_listener.set()
143    stop_sock_listener.set()
144    thread1.join()
145    thread2.join()
146
147
148if __name__ == '__main__':
149    print('Manual execution, please build and start ttcn in a separate console')
150    manual_test = True
151    lwip_test_suite()
152