1import os
2import re
3import socket
4from collections.abc import Callable
5from threading import Thread
6
7import tiny_test_fw
8import ttfw_idf
9from scapy.all import Ether, raw
10from ttfw_idf import TestFormat
11
12try:
13    import typing  # noqa: F401 # pylint: disable=unused-import
14except ImportError:
15    pass
16
17
18def configure_eth_if(func):         # type: (typing.Any) -> typing.Any
19    def inner(*args, **kwargs):     # type: (typing.Any, typing.Any) -> typing.Any
20        # try to determine which interface to use
21        netifs = os.listdir('/sys/class/net/')
22        target_if = ''
23        print('detected interfaces: ' + str(netifs))
24        for netif in netifs:
25            if netif.find('eth') == 0 or netif.find('enp') == 0 or netif.find('eno') == 0:
26                target_if = netif
27                break
28        if target_if == '':
29            raise Exception('no network interface found')
30        print('Use ' + target_if + ' for testing')
31        so = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, 0x2222)
32        so.bind((target_if, 0))
33
34        func(so, *args, **kwargs)
35
36        so.close()
37
38    return inner
39
40
41@configure_eth_if
42def check_eth_recv_packet(so):    # type: (socket.socket) -> None
43    so.settimeout(10)
44    try:
45        eth_frame = Ether(so.recv(1024))
46        for i in range(0, 1010):
47            if eth_frame.load[i] != i & 0xff:
48                raise Exception('Packet content mismatch')
49    except Exception as e:
50        raise e
51
52
53@configure_eth_if
54def send_eth_packet(so, mac):    # type: (socket.socket, str) -> None
55    so.settimeout(10)
56    payload = bytearray(1010)
57    for i, _ in enumerate(payload):
58        payload[i] = i & 0xff
59    eth_frame = Ether(dst=mac, src=so.getsockname()[4], type=0x2222) / raw(payload)
60    try:
61        so.send(raw(eth_frame))
62    except Exception as e:
63        raise e
64
65
66@configure_eth_if
67def recv_resp_poke(so, i):    # type: (socket.socket, int) -> None
68    so.settimeout(10)
69    try:
70        eth_frame = Ether(so.recv(60))
71
72        if eth_frame.type == 0x2222 and eth_frame.load[0] == 0xfa:
73            if eth_frame.load[1] != i:
74                raise Exception('Missed Poke Packet')
75            eth_frame.dst = eth_frame.src
76            eth_frame.src = so.getsockname()[4]
77            eth_frame.load = bytes.fromhex('fb')    # POKE_RESP code
78            so.send(raw(eth_frame))
79    except Exception as e:
80        raise e
81
82
83@configure_eth_if
84def traffic_gen(so, mac, enabled):  # type: (socket.socket, str, Callable) -> None
85    payload = bytes.fromhex('ff')    # DUMMY_TRAFFIC code
86    payload += bytes(1485)
87    eth_frame = Ether(dst=mac, src=so.getsockname()[4], type=0x2222) / raw(payload)
88    try:
89        while enabled() == 1:
90            so.send(raw(eth_frame))
91    except Exception as e:
92        raise e
93
94
95def test_component_ut_esp_eth(env, appname):  # type: (tiny_test_fw.Env, str) -> None
96    dut = env.get_dut('esp_eth', 'components/esp_eth/test_apps', app_config_name=appname)
97    dut.start_app()
98    stdout = dut.expect('Press ENTER to see the list of tests', full_stdout=True)
99    dut.write('"start_and_stop"')
100    stdout += dut.expect("Enter next test, or 'enter' to see menu", full_stdout=True)
101    ttfw_idf.ComponentUTResult.parse_result(stdout, test_format=TestFormat.UNITY_BASIC)
102    dut.write('"get_set_mac"')
103    stdout = dut.expect("Enter next test, or 'enter' to see menu", full_stdout=True)
104    ttfw_idf.ComponentUTResult.parse_result(stdout, test_format=TestFormat.UNITY_BASIC)
105    dut.write('"ethernet_broadcast_transmit"')
106    check_eth_recv_packet()
107    stdout = dut.expect("Enter next test, or 'enter' to see menu", full_stdout=True)
108    ttfw_idf.ComponentUTResult.parse_result(stdout, test_format=TestFormat.UNITY_BASIC)
109    dut.write('"recv_pkt"')
110    expect_result = dut.expect(re.compile(
111        r'([\s\S]*)'
112        r'DUT MAC: ([0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2})'),
113        timeout=10
114    )
115    stdout = expect_result[0]
116    send_eth_packet('ff:ff:ff:ff:ff:ff')      # broadcast frame
117    send_eth_packet('01:00:00:00:00:00')      # multicast frame
118    send_eth_packet(expect_result[1])   # unicast frame
119    stdout += dut.expect("Enter next test, or 'enter' to see menu", full_stdout=True)
120    ttfw_idf.ComponentUTResult.parse_result(stdout, test_format=TestFormat.UNITY_BASIC)
121
122    dut.write('"start_stop_stress_test"')
123    expect_result = dut.expect(re.compile(
124        r'([\s\S]*)'
125        r'DUT MAC: ([0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2})'),
126        timeout=10
127    )
128
129    # Start/stop under heavy Tx traffic
130    for tx_i in range(10):
131        recv_resp_poke(tx_i)
132
133    # Start/stop under heavy Rx traffic
134    traffic_en = 1
135    thread = Thread(target=traffic_gen, args=(expect_result[1], lambda:traffic_en, ))
136    thread.start()
137    try:
138        for rx_i in range(10):
139            recv_resp_poke(rx_i)
140    finally:
141        traffic_en = 0
142        thread.join()
143
144    stdout = dut.expect("Enter next test, or 'enter' to see menu", full_stdout=True)
145    ttfw_idf.ComponentUTResult.parse_result(stdout, test_format=TestFormat.UNITY_BASIC)
146
147
148@ttfw_idf.idf_component_unit_test(env_tag='COMPONENT_UT_IP101', target=['esp32'])
149def test_component_ut_esp_eth_ip101(env, _):  # type: (tiny_test_fw.Env, typing.Any) -> None
150    test_component_ut_esp_eth(env, 'ip101')
151
152
153@ttfw_idf.idf_component_unit_test(env_tag='COMPONENT_UT_LAN8720', target=['esp32'])
154def test_component_ut_esp_eth_lan8720(env, _):  # type: (tiny_test_fw.Env, typing.Any) -> None
155    test_component_ut_esp_eth(env, 'lan8720')
156
157
158if __name__ == '__main__':
159    test_component_ut_esp_eth_ip101()
160    test_component_ut_esp_eth_lan8720()
161