1# Copyright (c) 2023 Nordic Semiconductor ASA
2#
3# SPDX-License-Identifier: Apache-2.0
4
5from __future__ import annotations
6
7import abc
8import logging
9import os
10import queue
11import re
12import shutil
13import threading
14import time
15from datetime import datetime
16from pathlib import Path
17from serial import SerialException
18
19from twister_harness.exceptions import (
20    TwisterHarnessException,
21    TwisterHarnessTimeoutException,
22)
23from twister_harness.twister_harness_config import DeviceConfig
24
25logger = logging.getLogger(__name__)
26
27
28class DeviceAdapter(abc.ABC):
29    """
30    This class defines a common interface for all device types (hardware,
31    simulator, QEMU) used in tests to gathering device output and send data to
32    it.
33    """
34
35    def __init__(self, device_config: DeviceConfig) -> None:
36        """
37        :param device_config: device configuration
38        """
39        self.device_config: DeviceConfig = device_config
40        self.base_timeout: float = device_config.base_timeout
41        self._device_read_queue: queue.Queue = queue.Queue()
42        self._reader_thread: threading.Thread | None = None
43        self._device_run: threading.Event = threading.Event()
44        self._device_connected: threading.Event = threading.Event()
45        self.command: list[str] = []
46        self._west: str | None = None
47
48        self.handler_log_path: Path = device_config.build_dir / 'handler.log'
49        self._log_files: list[Path] = [self.handler_log_path]
50
51    def __repr__(self) -> str:
52        return f'{self.__class__.__name__}()'
53
54    @property
55    def env(self) -> dict[str, str]:
56        env = os.environ.copy()
57        return env
58
59    def launch(self) -> None:
60        """
61        Start by closing previously running application (no effect if not
62        needed). Then, flash and run test application. Finally, start an
63        internal reader thread capturing an output from a device.
64        """
65        self.close()
66        self._clear_internal_resources()
67
68        if not self.command:
69            self.generate_command()
70            if self.device_config.extra_test_args:
71                self.command.extend(self.device_config.extra_test_args.split())
72
73        if self.device_config.type != 'hardware':
74            self._flash_and_run()
75            self._device_run.set()
76            self._start_reader_thread()
77            self.connect()
78            return
79
80        self._device_run.set()
81        self._start_reader_thread()
82
83        if self.device_config.flash_before:
84            # For hardware devices with shared USB or software USB, connect after flashing.
85            # Retry for up to 10 seconds for USB-CDC based devices to enumerate.
86            self._flash_and_run()
87            self.connect(retry_s = 10)
88        else:
89            # On hardware, flash after connecting to COM port, otherwise some messages
90            # from target can be lost.
91            self.connect()
92            self._flash_and_run()
93
94    def close(self) -> None:
95        """Disconnect, close device and close reader thread."""
96        if not self._device_run.is_set():
97            # device already closed
98            return
99        self.disconnect()
100        self._close_device()
101        self._device_run.clear()
102        self._join_reader_thread()
103
104    def connect(self, retry_s: int = 0) -> None:
105        """Connect to device - allow for output gathering."""
106        if self.is_device_connected():
107            logger.debug('Device already connected')
108            return
109        if not self.is_device_running():
110            msg = 'Cannot connect to not working device'
111            logger.error(msg)
112            raise TwisterHarnessException(msg)
113
114        if retry_s > 0:
115            retry_cycles = retry_s * 10
116            for i in range(retry_cycles):
117                try:
118                    self._connect_device()
119                    break
120                except SerialException:
121                    if i == retry_cycles - 1:
122                        raise
123                    time.sleep(0.1)
124        else:
125            self._connect_device()
126
127        self._device_connected.set()
128
129    def disconnect(self) -> None:
130        """Disconnect device - block output gathering."""
131        if not self.is_device_connected():
132            logger.debug("Device already disconnected")
133            return
134        self._disconnect_device()
135        self._device_connected.clear()
136
137    def readline(self, timeout: float | None = None, print_output: bool = True) -> str:
138        """
139        Read line from device output. If timeout is not provided, then use
140        base_timeout.
141        """
142        timeout = timeout or self.base_timeout
143        if self.is_device_connected() or not self._device_read_queue.empty():
144            data = self._read_from_queue(timeout)
145        else:
146            msg = 'No connection to the device and no more data to read.'
147            logger.error(msg)
148            raise TwisterHarnessException('No connection to the device and no more data to read.')
149        if print_output:
150            logger.debug('#: %s', data)
151        return data
152
153    def readlines_until(
154            self,
155            regex: str | None = None,
156            num_of_lines: int | None = None,
157            timeout: float | None = None,
158            print_output: bool = True,
159    ) -> list[str]:
160        """
161        Read available output lines produced by device from internal buffer
162        until following conditions:
163
164        1. If regex is provided - read until regex regex is found in read
165           line (or until timeout).
166        2. If num_of_lines is provided - read until number of read lines is
167           equal to num_of_lines (or until timeout).
168        3. If none of above is provided - return immediately lines collected so
169           far in internal buffer.
170
171        If timeout is not provided, then use base_timeout.
172        """
173        timeout = timeout or self.base_timeout
174        if regex:
175            regex_compiled = re.compile(regex)
176        lines: list[str] = []
177        if regex or num_of_lines:
178            timeout_time: float = time.time() + timeout
179            while time.time() < timeout_time:
180                try:
181                    line = self.readline(0.1, print_output)
182                except TwisterHarnessTimeoutException:
183                    continue
184                lines.append(line)
185                if regex and regex_compiled.search(line):
186                    break
187                if num_of_lines and len(lines) == num_of_lines:
188                    break
189            else:
190                msg = 'Read from device timeout occurred'
191                logger.error(msg)
192                raise TwisterHarnessTimeoutException(msg)
193        else:
194            lines = self.readlines(print_output)
195        return lines
196
197    def readlines(self, print_output: bool = True) -> list[str]:
198        """
199        Read all available output lines produced by device from internal buffer.
200        """
201        lines: list[str] = []
202        while not self._device_read_queue.empty():
203            line = self.readline(0.1, print_output)
204            lines.append(line)
205        return lines
206
207    def clear_buffer(self) -> None:
208        """
209        Remove all available output produced by device from internal buffer
210        (queue).
211        """
212        self.readlines(print_output=False)
213
214    def write(self, data: bytes) -> None:
215        """Write data bytes to device."""
216        if not self.is_device_connected():
217            msg = 'No connection to the device'
218            logger.error(msg)
219            raise TwisterHarnessException(msg)
220        self._write_to_device(data)
221
222    def initialize_log_files(self, test_name: str = '') -> None:
223        """
224        Initialize log files (e.g. handler.log) by adding header with
225        information about performed test and current time.
226        """
227        for log_file_path in self._log_files:
228            with open(log_file_path, 'a+') as log_file:
229                log_file.write(f'\n==== Test {test_name} started at {datetime.now()} ====\n')
230
231    def _start_reader_thread(self) -> None:
232        self._reader_thread = threading.Thread(target=self._handle_device_output, daemon=True)
233        self._reader_thread.start()
234
235    def _handle_device_output(self) -> None:
236        """
237        This method is dedicated to run it in separate thread to read output
238        from device and put them into internal queue and save to log file.
239        """
240        with open(self.handler_log_path, 'a+') as log_file:
241            while self.is_device_running():
242                if self.is_device_connected():
243                    output = self._read_device_output().decode(errors='replace').strip()
244                    if output:
245                        self._device_read_queue.put(output)
246                        log_file.write(f'{output}\n')
247                        log_file.flush()
248                else:
249                    # ignore output from device
250                    self._flush_device_output()
251                    time.sleep(0.1)
252
253    def _read_from_queue(self, timeout: float) -> str:
254        """Read data from internal queue"""
255        try:
256            data: str | object = self._device_read_queue.get(timeout=timeout)
257        except queue.Empty as exc:
258            raise TwisterHarnessTimeoutException(f'Read from device timeout occurred ({timeout}s)') from exc
259        return data
260
261    def _join_reader_thread(self) -> None:
262        if self._reader_thread is not None:
263            self._reader_thread.join(self.base_timeout)
264        self._reader_thread = None
265
266    def _clear_internal_resources(self) -> None:
267        self._reader_thread = None
268        self._device_read_queue = queue.Queue()
269        self._device_run.clear()
270        self._device_connected.clear()
271
272    @property
273    def west(self) -> str:
274        """
275        Return a path to west or if not found - raise an error. Once found
276        west path is stored as internal property to save time of looking for it
277        in the next time.
278        """
279        if self._west is None:
280            self._west = shutil.which('west')
281            if self._west is None:
282                msg = 'west not found'
283                logger.error(msg)
284                raise TwisterHarnessException(msg)
285        return self._west
286
287    @abc.abstractmethod
288    def generate_command(self) -> None:
289        """
290        Generate and set command which will be used during flashing or running
291        device.
292        """
293
294    @abc.abstractmethod
295    def _flash_and_run(self) -> None:
296        """Flash and run application on a device."""
297
298    @abc.abstractmethod
299    def _connect_device(self) -> None:
300        """Connect with the device (e.g. via serial port)."""
301
302    @abc.abstractmethod
303    def _disconnect_device(self) -> None:
304        """Disconnect from the device (e.g. from serial port)."""
305
306    @abc.abstractmethod
307    def _close_device(self) -> None:
308        """Stop application"""
309
310    @abc.abstractmethod
311    def _read_device_output(self) -> bytes:
312        """
313        Read device output directly through serial, subprocess, FIFO, etc.
314        Even if device is not connected, this method has to return something
315        (e.g. empty bytes string). This assumption is made to maintain
316        compatibility between various adapters and their reading technique.
317        """
318
319    @abc.abstractmethod
320    def _write_to_device(self, data: bytes) -> None:
321        """Write to device directly through serial, subprocess, FIFO, etc."""
322
323    @abc.abstractmethod
324    def _flush_device_output(self) -> None:
325        """Flush device connection (serial, subprocess output, FIFO, etc.)"""
326
327    @abc.abstractmethod
328    def is_device_running(self) -> bool:
329        """Return true if application is running on device."""
330
331    @abc.abstractmethod
332    def is_device_connected(self) -> bool:
333        """Return true if device is connected."""
334