1# Copyright (c) 2024 Vestas Wind Systems A/S 2# 3# SPDX-License-Identifier: Apache-2.0 4 5""" 6Test suites for testing Zephyr CAN <=> host CAN. 7""" 8 9import logging 10import pytest 11 12import can 13from can import BusABC, CanProtocol 14 15# RX/TX timeout in seconds 16TIMEOUT = 1.0 17 18logger = logging.getLogger(__name__) 19 20@pytest.mark.parametrize('msg', [ 21 pytest.param( 22 can.Message(arbitration_id=0x10, 23 is_extended_id=False), 24 id='std_id_dlc_0' 25 ), 26 pytest.param( 27 can.Message(arbitration_id=0x20, 28 data=[0xaa, 0xbb, 0xcc, 0xdd], 29 is_extended_id=False), 30 id='std_id_dlc_4' 31 ), 32 pytest.param( 33 can.Message(arbitration_id=0x30, 34 data=[0xee, 0xff, 0xee, 0xff, 0xee, 0xff, 0xee, 0xff], 35 is_extended_id=True), 36 id='ext_id_dlc_8' 37 ), 38 pytest.param( 39 can.Message(arbitration_id=0x40, 40 data=[0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 41 0x10, 0x11], 42 is_fd=True, is_extended_id=False), 43 id='std_id_fdf_dlc_9' 44 ), 45 pytest.param( 46 can.Message(arbitration_id=0x50, 47 data=[0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 48 0x10, 0x11], 49 is_fd=True, bitrate_switch=True, is_extended_id=False), 50 id='std_id_fdf_brs_dlc_9' 51 ), 52]) 53class TestCanRxTx(): 54 """ 55 Class for testing CAN RX/TX between Zephyr DUT and host. 56 """ 57 58 @staticmethod 59 def check_rx(tx: can.Message, rx: can.Message) -> None: 60 """Check if received message matches transmitted message.""" 61 # pylint: disable-next=unused-variable 62 __tracebackhide__ = True 63 64 if rx is None: 65 pytest.fail('no message received') 66 67 if not tx.equals(rx, timestamp_delta=None, check_channel=False, 68 check_direction=False): 69 pytest.fail(f'rx message "{rx}" not equal to tx message "{tx}"') 70 71 @staticmethod 72 def skip_if_unsupported(can_dut: BusABC, can_host: BusABC, msg: can.Message) -> None: 73 """Skip test if message format is not supported by both DUT and host.""" 74 if msg.is_fd: 75 if can_dut.protocol == CanProtocol.CAN_20: 76 pytest.skip('CAN FD not supported by DUT') 77 if can_host.protocol == CanProtocol.CAN_20: 78 pytest.skip('CAN FD not supported by host') 79 80 def test_dut_to_host(self, can_dut: BusABC, can_host: BusABC, msg: can.Message) -> None: 81 """Test DUT to host communication.""" 82 self.skip_if_unsupported(can_dut, can_host, msg) 83 84 can_dut.send(msg, timeout=TIMEOUT) 85 rx = can_host.recv(timeout=TIMEOUT) 86 self.check_rx(msg, rx) 87 88 def test_host_to_dut(self, can_dut: BusABC, can_host: BusABC, msg: can.Message) -> None: 89 """Test host to DUT communication.""" 90 self.skip_if_unsupported(can_dut, can_host, msg) 91 92 can_host.send(msg, timeout=TIMEOUT) 93 rx = can_dut.recv(timeout=TIMEOUT) 94 self.check_rx(msg, rx) 95