1# Copyright (c) 2023 Nordic Semiconductor ASA 2# 3# SPDX-License-Identifier: Apache-2.0 4 5from __future__ import annotations 6 7import logging 8import time 9 10from twister_harness.device.fifo_handler import FifoHandler 11from twister_harness.device.binary_adapter import BinaryAdapterBase 12from twister_harness.exceptions import TwisterHarnessException 13from twister_harness.twister_harness_config import DeviceConfig 14 15logger = logging.getLogger(__name__) 16 17 18class QemuAdapter(BinaryAdapterBase): 19 def __init__(self, device_config: DeviceConfig) -> None: 20 super().__init__(device_config) 21 qemu_fifo_file_path = self.device_config.build_dir / 'qemu-fifo' 22 self._fifo_connection: FifoHandler = FifoHandler(qemu_fifo_file_path, self.base_timeout) 23 24 def generate_command(self) -> None: 25 """Set command to run.""" 26 self.command = [self.west, 'build', '-d', str(self.device_config.app_build_dir), '-t', 'run'] 27 if 'stdin' in self.process_kwargs: 28 self.process_kwargs.pop('stdin') 29 30 def _flash_and_run(self) -> None: 31 super()._flash_and_run() 32 self._create_fifo_connection() 33 34 def _create_fifo_connection(self) -> None: 35 self._fifo_connection.initiate_connection() 36 timeout_time: float = time.time() + self.base_timeout 37 while time.time() < timeout_time and self._is_binary_running(): 38 if self._fifo_connection.is_open: 39 return 40 time.sleep(0.1) 41 msg = 'Cannot establish communication with QEMU device.' 42 logger.error(msg) 43 raise TwisterHarnessException(msg) 44 45 def _stop_subprocess(self) -> None: 46 super()._stop_subprocess() 47 self._fifo_connection.disconnect() 48 49 def _read_device_output(self) -> bytes: 50 try: 51 output = self._fifo_connection.readline() 52 except (OSError, ValueError): 53 # emulation was probably finished and thus fifo file was closed too 54 output = b'' 55 return output 56 57 def _write_to_device(self, data: bytes) -> None: 58 self._fifo_connection.write(data) 59 self._fifo_connection.flush_write() 60 61 def _flush_device_output(self) -> None: 62 if self.is_device_running(): 63 self._fifo_connection.flush_read() 64 65 def is_device_connected(self) -> bool: 66 """Return true if device is connected.""" 67 return bool(super().is_device_connected() and self._fifo_connection.is_open) 68