1# Copyright (c) 2023 Nordic Semiconductor ASA
2#
3# SPDX-License-Identifier: Apache-2.0
4
5from __future__ import annotations
6
7import logging
8import time
9
10from twister_harness.device.fifo_handler import FifoHandler
11from twister_harness.device.binary_adapter import BinaryAdapterBase
12from twister_harness.exceptions import TwisterHarnessException
13from twister_harness.twister_harness_config import DeviceConfig
14
15logger = logging.getLogger(__name__)
16
17
18class QemuAdapter(BinaryAdapterBase):
19    def __init__(self, device_config: DeviceConfig) -> None:
20        super().__init__(device_config)
21        qemu_fifo_file_path = self.device_config.build_dir / 'qemu-fifo'
22        self._fifo_connection: FifoHandler = FifoHandler(qemu_fifo_file_path, self.base_timeout)
23
24    def generate_command(self) -> None:
25        """Set command to run."""
26        self.command = [self.west, 'build', '-d', str(self.device_config.app_build_dir), '-t', 'run']
27        if 'stdin' in self.process_kwargs:
28            self.process_kwargs.pop('stdin')
29
30    def _flash_and_run(self) -> None:
31        super()._flash_and_run()
32        self._create_fifo_connection()
33
34    def _create_fifo_connection(self) -> None:
35        self._fifo_connection.initiate_connection()
36        timeout_time: float = time.time() + self.base_timeout
37        while time.time() < timeout_time and self._is_binary_running():
38            if self._fifo_connection.is_open:
39                return
40            time.sleep(0.1)
41        msg = 'Cannot establish communication with QEMU device.'
42        logger.error(msg)
43        raise TwisterHarnessException(msg)
44
45    def _stop_subprocess(self) -> None:
46        super()._stop_subprocess()
47        self._fifo_connection.disconnect()
48
49    def _read_device_output(self) -> bytes:
50        try:
51            output = self._fifo_connection.readline()
52        except (OSError, ValueError):
53            # emulation was probably finished and thus fifo file was closed too
54            output = b''
55        return output
56
57    def _write_to_device(self, data: bytes) -> None:
58        self._fifo_connection.write(data)
59        self._fifo_connection.flush_write()
60
61    def _flush_device_output(self) -> None:
62        if self.is_device_running():
63            self._fifo_connection.flush_read()
64
65    def is_device_connected(self) -> bool:
66        """Return true if device is connected."""
67        return bool(super().is_device_connected() and self._fifo_connection.is_open)
68