1# Copyright (c) 2023 Nordic Semiconductor ASA 2# 3# SPDX-License-Identifier: Apache-2.0 4 5from __future__ import annotations 6 7import abc 8import logging 9import os 10import queue 11import re 12import shutil 13import threading 14import time 15from datetime import datetime 16from pathlib import Path 17from serial import SerialException 18 19from twister_harness.exceptions import ( 20 TwisterHarnessException, 21 TwisterHarnessTimeoutException, 22) 23from twister_harness.twister_harness_config import DeviceConfig 24 25logger = logging.getLogger(__name__) 26 27 28class DeviceAdapter(abc.ABC): 29 """ 30 This class defines a common interface for all device types (hardware, 31 simulator, QEMU) used in tests to gathering device output and send data to 32 it. 33 """ 34 35 def __init__(self, device_config: DeviceConfig) -> None: 36 """ 37 :param device_config: device configuration 38 """ 39 self.device_config: DeviceConfig = device_config 40 self.base_timeout: float = device_config.base_timeout 41 self._device_read_queue: queue.Queue = queue.Queue() 42 self._reader_thread: threading.Thread | None = None 43 self._device_run: threading.Event = threading.Event() 44 self._device_connected: threading.Event = threading.Event() 45 self.command: list[str] = [] 46 self._west: str | None = None 47 48 self.handler_log_path: Path = device_config.build_dir / 'handler.log' 49 self._log_files: list[Path] = [self.handler_log_path] 50 51 def __repr__(self) -> str: 52 return f'{self.__class__.__name__}()' 53 54 @property 55 def env(self) -> dict[str, str]: 56 env = os.environ.copy() 57 return env 58 59 def launch(self) -> None: 60 """ 61 Start by closing previously running application (no effect if not 62 needed). Then, flash and run test application. Finally, start an 63 internal reader thread capturing an output from a device. 64 """ 65 self.close() 66 self._clear_internal_resources() 67 68 if not self.command: 69 self.generate_command() 70 if self.device_config.extra_test_args: 71 self.command.extend(self.device_config.extra_test_args.split()) 72 73 if self.device_config.type != 'hardware': 74 self._flash_and_run() 75 self._device_run.set() 76 self._start_reader_thread() 77 self.connect() 78 return 79 80 self._device_run.set() 81 self._start_reader_thread() 82 83 if self.device_config.flash_before: 84 # For hardware devices with shared USB or software USB, connect after flashing. 85 # Retry for up to 10 seconds for USB-CDC based devices to enumerate. 86 self._flash_and_run() 87 self.connect(retry_s = 10) 88 else: 89 # On hardware, flash after connecting to COM port, otherwise some messages 90 # from target can be lost. 91 self.connect() 92 self._flash_and_run() 93 94 def close(self) -> None: 95 """Disconnect, close device and close reader thread.""" 96 if not self._device_run.is_set(): 97 # device already closed 98 return 99 self.disconnect() 100 self._close_device() 101 self._device_run.clear() 102 self._join_reader_thread() 103 104 def connect(self, retry_s: int = 0) -> None: 105 """Connect to device - allow for output gathering.""" 106 if self.is_device_connected(): 107 logger.debug('Device already connected') 108 return 109 if not self.is_device_running(): 110 msg = 'Cannot connect to not working device' 111 logger.error(msg) 112 raise TwisterHarnessException(msg) 113 114 if retry_s > 0: 115 retry_cycles = retry_s * 10 116 for i in range(retry_cycles): 117 try: 118 self._connect_device() 119 break 120 except SerialException: 121 if i == retry_cycles - 1: 122 raise 123 time.sleep(0.1) 124 else: 125 self._connect_device() 126 127 self._device_connected.set() 128 129 def disconnect(self) -> None: 130 """Disconnect device - block output gathering.""" 131 if not self.is_device_connected(): 132 logger.debug("Device already disconnected") 133 return 134 self._disconnect_device() 135 self._device_connected.clear() 136 137 def readline(self, timeout: float | None = None, print_output: bool = True) -> str: 138 """ 139 Read line from device output. If timeout is not provided, then use 140 base_timeout. 141 """ 142 timeout = timeout or self.base_timeout 143 if self.is_device_connected() or not self._device_read_queue.empty(): 144 data = self._read_from_queue(timeout) 145 else: 146 msg = 'No connection to the device and no more data to read.' 147 logger.error(msg) 148 raise TwisterHarnessException('No connection to the device and no more data to read.') 149 if print_output: 150 logger.debug('#: %s', data) 151 return data 152 153 def readlines_until( 154 self, 155 regex: str | None = None, 156 num_of_lines: int | None = None, 157 timeout: float | None = None, 158 print_output: bool = True, 159 ) -> list[str]: 160 """ 161 Read available output lines produced by device from internal buffer 162 until following conditions: 163 164 1. If regex is provided - read until regex regex is found in read 165 line (or until timeout). 166 2. If num_of_lines is provided - read until number of read lines is 167 equal to num_of_lines (or until timeout). 168 3. If none of above is provided - return immediately lines collected so 169 far in internal buffer. 170 171 If timeout is not provided, then use base_timeout. 172 """ 173 timeout = timeout or self.base_timeout 174 if regex: 175 regex_compiled = re.compile(regex) 176 lines: list[str] = [] 177 if regex or num_of_lines: 178 timeout_time: float = time.time() + timeout 179 while time.time() < timeout_time: 180 try: 181 line = self.readline(0.1, print_output) 182 except TwisterHarnessTimeoutException: 183 continue 184 lines.append(line) 185 if regex and regex_compiled.search(line): 186 break 187 if num_of_lines and len(lines) == num_of_lines: 188 break 189 else: 190 msg = 'Read from device timeout occurred' 191 logger.error(msg) 192 raise TwisterHarnessTimeoutException(msg) 193 else: 194 lines = self.readlines(print_output) 195 return lines 196 197 def readlines(self, print_output: bool = True) -> list[str]: 198 """ 199 Read all available output lines produced by device from internal buffer. 200 """ 201 lines: list[str] = [] 202 while not self._device_read_queue.empty(): 203 line = self.readline(0.1, print_output) 204 lines.append(line) 205 return lines 206 207 def clear_buffer(self) -> None: 208 """ 209 Remove all available output produced by device from internal buffer 210 (queue). 211 """ 212 self.readlines(print_output=False) 213 214 def write(self, data: bytes) -> None: 215 """Write data bytes to device.""" 216 if not self.is_device_connected(): 217 msg = 'No connection to the device' 218 logger.error(msg) 219 raise TwisterHarnessException(msg) 220 self._write_to_device(data) 221 222 def initialize_log_files(self, test_name: str = '') -> None: 223 """ 224 Initialize log files (e.g. handler.log) by adding header with 225 information about performed test and current time. 226 """ 227 for log_file_path in self._log_files: 228 with open(log_file_path, 'a+') as log_file: 229 log_file.write(f'\n==== Test {test_name} started at {datetime.now()} ====\n') 230 231 def _start_reader_thread(self) -> None: 232 self._reader_thread = threading.Thread(target=self._handle_device_output, daemon=True) 233 self._reader_thread.start() 234 235 def _handle_device_output(self) -> None: 236 """ 237 This method is dedicated to run it in separate thread to read output 238 from device and put them into internal queue and save to log file. 239 """ 240 with open(self.handler_log_path, 'a+') as log_file: 241 while self.is_device_running(): 242 if self.is_device_connected(): 243 output = self._read_device_output().decode(errors='replace').strip() 244 if output: 245 self._device_read_queue.put(output) 246 log_file.write(f'{output}\n') 247 log_file.flush() 248 else: 249 # ignore output from device 250 self._flush_device_output() 251 time.sleep(0.1) 252 253 def _read_from_queue(self, timeout: float) -> str: 254 """Read data from internal queue""" 255 try: 256 data: str | object = self._device_read_queue.get(timeout=timeout) 257 except queue.Empty as exc: 258 raise TwisterHarnessTimeoutException(f'Read from device timeout occurred ({timeout}s)') from exc 259 return data 260 261 def _join_reader_thread(self) -> None: 262 if self._reader_thread is not None: 263 self._reader_thread.join(self.base_timeout) 264 self._reader_thread = None 265 266 def _clear_internal_resources(self) -> None: 267 self._reader_thread = None 268 self._device_read_queue = queue.Queue() 269 self._device_run.clear() 270 self._device_connected.clear() 271 272 @property 273 def west(self) -> str: 274 """ 275 Return a path to west or if not found - raise an error. Once found 276 west path is stored as internal property to save time of looking for it 277 in the next time. 278 """ 279 if self._west is None: 280 self._west = shutil.which('west') 281 if self._west is None: 282 msg = 'west not found' 283 logger.error(msg) 284 raise TwisterHarnessException(msg) 285 return self._west 286 287 @abc.abstractmethod 288 def generate_command(self) -> None: 289 """ 290 Generate and set command which will be used during flashing or running 291 device. 292 """ 293 294 @abc.abstractmethod 295 def _flash_and_run(self) -> None: 296 """Flash and run application on a device.""" 297 298 @abc.abstractmethod 299 def _connect_device(self) -> None: 300 """Connect with the device (e.g. via serial port).""" 301 302 @abc.abstractmethod 303 def _disconnect_device(self) -> None: 304 """Disconnect from the device (e.g. from serial port).""" 305 306 @abc.abstractmethod 307 def _close_device(self) -> None: 308 """Stop application""" 309 310 @abc.abstractmethod 311 def _read_device_output(self) -> bytes: 312 """ 313 Read device output directly through serial, subprocess, FIFO, etc. 314 Even if device is not connected, this method has to return something 315 (e.g. empty bytes string). This assumption is made to maintain 316 compatibility between various adapters and their reading technique. 317 """ 318 319 @abc.abstractmethod 320 def _write_to_device(self, data: bytes) -> None: 321 """Write to device directly through serial, subprocess, FIFO, etc.""" 322 323 @abc.abstractmethod 324 def _flush_device_output(self) -> None: 325 """Flush device connection (serial, subprocess output, FIFO, etc.)""" 326 327 @abc.abstractmethod 328 def is_device_running(self) -> bool: 329 """Return true if application is running on device.""" 330 331 @abc.abstractmethod 332 def is_device_connected(self) -> bool: 333 """Return true if device is connected.""" 334