1# Copyright (c) 2024 Vestas Wind Systems A/S
2#
3# SPDX-License-Identifier: Apache-2.0
4
5"""
6Test suites for testing Zephyr CAN <=> host CAN.
7"""
8
9import logging
10import pytest
11
12import can
13from can import BusABC, CanProtocol
14
15# RX/TX timeout in seconds
16TIMEOUT = 1.0
17
18logger = logging.getLogger(__name__)
19
20@pytest.mark.parametrize('msg', [
21    pytest.param(
22        can.Message(arbitration_id=0x10,
23                    is_extended_id=False),
24        id='std_id_dlc_0'
25    ),
26    pytest.param(
27        can.Message(arbitration_id=0x20,
28                    data=[0xaa, 0xbb, 0xcc, 0xdd],
29                    is_extended_id=False),
30        id='std_id_dlc_4'
31    ),
32    pytest.param(
33        can.Message(arbitration_id=0x30,
34                    data=[0xee, 0xff, 0xee, 0xff, 0xee, 0xff, 0xee, 0xff],
35                    is_extended_id=True),
36        id='ext_id_dlc_8'
37    ),
38    pytest.param(
39        can.Message(arbitration_id=0x40,
40                    data=[0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09,
41                          0x10, 0x11],
42                    is_fd=True, is_extended_id=False),
43        id='std_id_fdf_dlc_9'
44    ),
45    pytest.param(
46        can.Message(arbitration_id=0x50,
47                    data=[0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09,
48                          0x10, 0x11],
49                    is_fd=True, bitrate_switch=True, is_extended_id=False),
50        id='std_id_fdf_brs_dlc_9'
51    ),
52])
53class TestCanRxTx():
54    """
55    Class for testing CAN RX/TX between Zephyr DUT and host.
56    """
57
58    @staticmethod
59    def check_rx(tx: can.Message, rx: can.Message) -> None:
60        """Check if received message matches transmitted message."""
61        # pylint: disable-next=unused-variable
62        __tracebackhide__ = True
63
64        if rx is None:
65            pytest.fail('no message received')
66
67        if not tx.equals(rx, timestamp_delta=None, check_channel=False,
68                         check_direction=False):
69            pytest.fail(f'rx message "{rx}" not equal to tx message "{tx}"')
70
71    @staticmethod
72    def skip_if_unsupported(can_dut: BusABC, can_host: BusABC, msg: can.Message) -> None:
73        """Skip test if message format is not supported by both DUT and host."""
74        if msg.is_fd:
75            if can_dut.protocol == CanProtocol.CAN_20:
76                pytest.skip('CAN FD not supported by DUT')
77            if can_host.protocol == CanProtocol.CAN_20:
78                pytest.skip('CAN FD not supported by host')
79
80    def test_dut_to_host(self, can_dut: BusABC, can_host: BusABC, msg: can.Message) -> None:
81        """Test DUT to host communication."""
82        self.skip_if_unsupported(can_dut, can_host, msg)
83
84        can_dut.send(msg, timeout=TIMEOUT)
85        rx = can_host.recv(timeout=TIMEOUT)
86        self.check_rx(msg, rx)
87
88    def test_host_to_dut(self, can_dut: BusABC, can_host: BusABC, msg: can.Message) -> None:
89        """Test host to DUT communication."""
90        self.skip_if_unsupported(can_dut, can_host, msg)
91
92        can_host.send(msg, timeout=TIMEOUT)
93        rx = can_dut.recv(timeout=TIMEOUT)
94        self.check_rx(msg, rx)
95