# Copyright (c) 2024 Vestas Wind Systems A/S # # SPDX-License-Identifier: Apache-2.0 """ Zephyr CAN shell module support for providing a python-can bus interface for testing. """ import logging import re from can import BusABC, CanProtocol, Message from can.exceptions import CanInitializationError, CanOperationError from can.typechecking import CanFilters from twister_harness import DeviceAdapter, Shell logger = logging.getLogger(__name__) class CanShellBus(BusABC): # pylint: disable=abstract-method """ A CAN interface using the Zephyr CAN shell module. """ def __init__( self, dut: DeviceAdapter, shell: Shell, channel: str, can_filters: CanFilters | None = None, **kwargs, ) -> None: self._dut = dut self._shell = shell self._device = channel self._is_filtered = False self._filter_ids = [] self.channel_info = f'Zephyr CAN shell, device "{self._device}"' mode = 'normal' if 'fd' in self._get_capabilities(): self._can_protocol = CanProtocol.CAN_FD mode += ' fd' else: self._can_protocol = CanProtocol.CAN_20 self._set_mode(mode) self._start() super().__init__(channel=channel, can_filters=can_filters, **kwargs) def _retval(self): """Get return value of last shell command.""" return int(self._shell.get_filtered_output(self._shell.exec_command('retval'))[0]) def _get_capabilities(self) -> list[str]: cmd = f'can show {self._device}' lines = self._shell.get_filtered_output(self._shell.exec_command(cmd)) regex_compiled = re.compile(r'capabilities:\s+(?P.*)') for line in lines: m = regex_compiled.match(line) if m: return m.group('caps').split() raise CanOperationError('capabilities not found') def _set_mode(self, mode: str) -> None: self._shell.exec_command(f'can mode {self._device} {mode}') retval = self._retval() if retval != 0: raise CanOperationError(f'failed to set mode "{mode}" (err {retval})') def _start(self): self._shell.exec_command(f'can start {self._device}') retval = self._retval() if retval != 0: raise CanInitializationError(f'failed to start (err {retval})') def _stop(self): self._shell.exec_command(f'can stop {self._device}') def send(self, msg: Message, timeout: float | None = None) -> None: logger.debug('sending: %s', msg) cmd = f'can send {self._device}' cmd += ' -e' if msg.is_extended_id else '' cmd += ' -r' if msg.is_remote_frame else '' cmd += ' -f' if msg.is_fd else '' cmd += ' -b' if msg.bitrate_switch else '' if msg.is_extended_id: cmd += f' {msg.arbitration_id:08x}' else: cmd += f' {msg.arbitration_id:03x}' if msg.data: cmd += ' ' + msg.data.hex(' ', 1) lines = self._shell.exec_command(cmd) regex_compiled = re.compile(r'enqueuing\s+CAN\s+frame\s+#(?P\d+)') frame_num = None for line in lines: m = regex_compiled.match(line) if m: frame_num = m.group('id') break if frame_num is None: raise CanOperationError('frame not enqueued') tx_regex = r'CAN\s+frame\s+#' + frame_num + r'\s+successfully\s+sent' self._dut.readlines_until(regex=tx_regex, timeout=timeout) def _add_filter(self, can_id: int, can_mask: int, extended: bool) -> None: """Add RX filter.""" cmd = f'can filter add {self._device}' cmd += ' -e' if extended else '' if extended: cmd += f' {can_id:08x}' cmd += f' {can_mask:08x}' else: cmd += f' {can_id:03x}' cmd += f' {can_mask:03x}' lines = self._shell.exec_command(cmd) regex_compiled = re.compile(r'filter\s+ID:\s+(?P\d+)') for line in lines: m = regex_compiled.match(line) if m: filter_id = int(m.group('id')) self._filter_ids.append(filter_id) return raise CanOperationError('filter_id not found') def _remove_filter(self, filter_id: int) -> None: """Remove RX filter.""" if filter_id in self._filter_ids: self._filter_ids.remove(filter_id) self._shell.exec_command(f'can filter remove {self._device} {filter_id}') retval = self._retval() if retval != 0: raise CanOperationError(f'failed to remove filter ID {filter_id} (err {retval})') def _remove_all_filters(self) -> None: """Remove all RX filters.""" for filter_id in self._filter_ids[:]: self._remove_filter(filter_id) def _apply_filters(self, filters: CanFilters | None) -> None: self._remove_all_filters() if filters: self._is_filtered = True else: # Accept all frames if no hardware filters provided filters = [ {'can_id': 0x0, 'can_mask': 0x0}, {'can_id': 0x0, 'can_mask': 0x0, 'extended': True}, ] self._is_filtered = False for can_filter in filters: can_id = can_filter['can_id'] can_mask = can_filter['can_mask'] extended = can_filter.get('extended', False) self._add_filter(can_id, can_mask, extended) def _recv_internal(self, timeout: float | None) -> tuple[Message | None, bool]: frame_regex = ( r'.*' + re.escape(self._device) + r'\s+(?P\S)(?P\S)\s+(?P\d+)\s+' + r'\[(?P\d+)\]\s*(?P[a-z0-9 ]*)' ) lines = self._dut.readlines_until(regex=frame_regex, timeout=timeout) msg = None regex_compiled = re.compile(frame_regex) for line in lines: m = regex_compiled.match(line) if m: can_id = int(m.group('can_id'), 16) ext = len(m.group('can_id')) == 8 dlc = int(m.group('dlc')) fd = len(m.group('dlc')) == 2 brs = m.group('brs') == 'B' esi = m.group('esi') == 'P' data = bytearray.fromhex(m.group('data')) msg = Message( arbitration_id=can_id, is_extended_id=ext, data=data, dlc=dlc, is_fd=fd, bitrate_switch=brs, error_state_indicator=esi, channel=self._device, check=True, ) logger.debug('received: %s', msg) return msg, self._is_filtered def shutdown(self) -> None: if not self._is_shutdown: super().shutdown() self._stop() self._remove_all_filters()