1# Copyright (c) 2023 Nordic Semiconductor ASA
3# SPDX-License-Identifier: Apache-2.0
5from __future__ import annotations
7import logging
8import re
9import time
11from dataclasses import dataclass, field
12from inspect import signature
14from twister_harness.device.device_adapter import DeviceAdapter
15from twister_harness.exceptions import TwisterHarnessTimeoutException
17logger = logging.getLogger(__name__)
20class Shell:
21    """
22    Helper class that provides methods used to interact with shell application.
23    """
25    def __init__(
26        self, device: DeviceAdapter, prompt: str = 'uart:~$', timeout: float | None = None
27    ) -> None:
28        self._device: DeviceAdapter = device
29        self.prompt: str = prompt
30        self.base_timeout: float = timeout or device.base_timeout
32    def wait_for_prompt(self, timeout: float | None = None) -> bool:
33        """
34        Send every 0.5 second "enter" command to the device until shell prompt
35        statement will occur (return True) or timeout will be exceeded (return
36        False).
37        """
38        timeout = timeout or self.base_timeout
39        timeout_time = time.time() + timeout
40        self._device.clear_buffer()
41        while time.time() < timeout_time:
42            self._device.write(b'\n')
43            try:
44                line = self._device.readline(timeout=0.5, print_output=False)
45            except TwisterHarnessTimeoutException:
46                # ignore read timeout and try to send enter once again
47                continue
48            if self.prompt in line:
49                logger.debug('Got prompt')
50                return True
51        return False
53    def exec_command(
54        self, command: str, timeout: float | None = None, print_output: bool = True
55    ) -> list[str]:
56        """
57        Send shell command to a device and return response. Passed command
58        is extended by double enter sings - first one to execute this command
59        on a device, second one to receive next prompt what is a signal that
60        execution was finished. Method returns printout of the executed command.
61        """
62        timeout = timeout or self.base_timeout
63        command_ext = f'{command}\n\n'
64        regex_prompt = re.escape(self.prompt)
65        regex_command = f'.*{re.escape(command)}'
66        self._device.clear_buffer()
67        self._device.write(command_ext.encode())
68        lines: list[str] = []
69        # wait for device command print - it should be done immediately after sending command to device
70        lines.extend(
71            self._device.readlines_until(
72                regex=regex_command, timeout=1.0, print_output=print_output
73            )
74        )
75        # wait for device command execution
76        lines.extend(
77            self._device.readlines_until(
78                regex=regex_prompt, timeout=timeout, print_output=print_output
79            )
80        )
81        return lines
83    def get_filtered_output(self, command_lines: list[str]) -> list[str]:
84        """
85        Filter out prompts and log messages
87        Take the output of exec_command, which can contain log messages and command prompts,
88        and filter them to obtain only the command output.
90        Example:
91            >>> # equivalent to `lines = shell.exec_command("kernel version")`
92            >>> lines = [
93            >>>    'uart:~$',                    # filter prompts
94            >>>    'Zephyr version 3.6.0',       # keep this line
95            >>>    'uart:~$ <dbg> debug message' # filter log messages
96            >>> ]
97            >>> filtered_output = shell.get_filtered_output(output)
98            >>> filtered_output
99            ['Zephyr version 3.6.0']
101        :param command_lines: List of strings i.e. the output of `exec_command`.
102        :return: A list of strings containing, excluding prompts and log messages.
103        """
104        regex_filter = re.compile(
105            '|'.join([re.escape(self.prompt), '<dbg>', '<inf>', '<wrn>', '<err>'])
106        )
107        return list(filter(lambda l: not regex_filter.search(l), command_lines))
111class ShellMCUbootArea:
112    name: str
113    version: str
114    image_size: str
115    magic: str = 'unset'
116    swap_type: str = 'none'
117    copy_done: str = 'unset'
118    image_ok: str = 'unset'
120    @classmethod
121    def from_kwargs(cls, **kwargs) -> ShellMCUbootArea:
122        cls_fields = {field for field in signature(cls).parameters}
123        native_args = {}
124        for name, val in kwargs.items():
125            if name in cls_fields:
126                native_args[name] = val
127        return cls(**native_args)
131class ShellMCUbootCommandParsed:
132    """
133    Helper class to keep data from `mcuboot` shell command.
134    """
136    areas: list[ShellMCUbootArea] = field(default_factory=list)
138    @classmethod
139    def create_from_cmd_output(cls, cmd_output: list[str]) -> ShellMCUbootCommandParsed:
140        """
141        Factory to create class from the output of `mcuboot` shell command.
142        """
143        areas: list[dict] = []
144        re_area = re.compile(r'(.+ area.*):\s*$')
145        re_key = re.compile(r'(?P<key>.+):(?P<val>.+)')
146        for line in cmd_output:
147            if m := re_area.search(line):
148                areas.append({'name': m.group(1)})
149            elif areas:
150                if m := re_key.search(line):
151                    areas[-1][m.group('key').strip().replace(' ', '_')] = m.group('val').strip()
152        data_areas: list[ShellMCUbootArea] = []
153        for area in areas:
154            data_areas.append(ShellMCUbootArea.from_kwargs(**area))
156        return cls(data_areas)