1# Copyright (c) 2023 Nordic Semiconductor ASA
2#
3# SPDX-License-Identifier: Apache-2.0
4
5from __future__ import annotations
6
7import abc
8import logging
9import subprocess
10
11from twister_harness.device.device_adapter import DeviceAdapter
12from twister_harness.device.utils import log_command, terminate_process
13from twister_harness.exceptions import TwisterHarnessException
14from twister_harness.twister_harness_config import DeviceConfig
15
16logger = logging.getLogger(__name__)
17
18
19class BinaryAdapterBase(DeviceAdapter, abc.ABC):
20    def __init__(self, device_config: DeviceConfig) -> None:
21        """
22        :param twister_config: twister configuration
23        """
24        super().__init__(device_config)
25        self._process: subprocess.Popen | None = None
26        self.process_kwargs: dict = {
27            'stdout': subprocess.PIPE,
28            'stderr': subprocess.STDOUT,
29            'stdin': subprocess.PIPE,
30            'env': self.env,
31            'cwd': device_config.app_build_dir,
32        }
33
34    @abc.abstractmethod
35    def generate_command(self) -> None:
36        """Generate and set command which will be used during running device."""
37
38    def _flash_and_run(self) -> None:
39        self._run_subprocess()
40
41    def _run_subprocess(self) -> None:
42        if not self.command:
43            msg = 'Run command is empty, please verify if it was generated properly.'
44            logger.error(msg)
45            raise TwisterHarnessException(msg)
46        log_command(logger, 'Running command', self.command, level=logging.DEBUG)
47        try:
48            self._process = subprocess.Popen(self.command, **self.process_kwargs)
49        except subprocess.SubprocessError as exc:
50            msg = f'Running subprocess failed due to SubprocessError {exc}'
51            logger.error(msg)
52            raise TwisterHarnessException(msg) from exc
53        except FileNotFoundError as exc:
54            msg = f'Running subprocess failed due to file not found: {exc.filename}'
55            logger.error(msg)
56            raise TwisterHarnessException(msg) from exc
57        except Exception as exc:
58            msg = f'Running subprocess failed {exc}'
59            logger.error(msg)
60            raise TwisterHarnessException(msg) from exc
61
62    def _connect_device(self) -> None:
63        """
64        This method was implemented only to imitate standard connect behavior
65        like in Serial class.
66        """
67
68    def _disconnect_device(self) -> None:
69        """
70        This method was implemented only to imitate standard disconnect behavior
71        like in serial connection.
72        """
73
74    def _close_device(self) -> None:
75        """Terminate subprocess"""
76        self._stop_subprocess()
77
78    def _stop_subprocess(self) -> None:
79        if self._process is None:
80            # subprocess already stopped
81            return
82        return_code: int | None = self._process.poll()
83        if return_code is None:
84            terminate_process(self._process)
85            return_code = self._process.wait(self.base_timeout)
86        self._process = None
87        logger.debug('Running subprocess finished with return code %s', return_code)
88
89    def _read_device_output(self) -> bytes:
90        return self._process.stdout.readline()
91
92    def _write_to_device(self, data: bytes) -> None:
93        self._process.stdin.write(data)
94        self._process.stdin.flush()
95
96    def _flush_device_output(self) -> None:
97        if self.is_device_running():
98            self._process.stdout.flush()
99
100    def is_device_running(self) -> bool:
101        return self._device_run.is_set() and self._is_binary_running()
102
103    def _is_binary_running(self) -> bool:
104        if self._process is None or self._process.poll() is not None:
105            return False
106        return True
107
108    def is_device_connected(self) -> bool:
109        """Return true if device is connected."""
110        return self.is_device_running() and self._device_connected.is_set()
111
112    def _clear_internal_resources(self) -> None:
113        super()._clear_internal_resources()
114        self._process = None
115
116
117class NativeSimulatorAdapter(BinaryAdapterBase):
118    """Simulator adapter to run `zephyr.exe` simulation"""
119
120    def generate_command(self) -> None:
121        """Set command to run."""
122        self.command = [str(self.device_config.app_build_dir / 'zephyr' / 'zephyr.exe')]
123
124
125class UnitSimulatorAdapter(BinaryAdapterBase):
126    """Simulator adapter to run unit tests"""
127
128    def generate_command(self) -> None:
129        """Set command to run."""
130        self.command = [str(self.device_config.app_build_dir / 'testbinary')]
131
132
133class CustomSimulatorAdapter(BinaryAdapterBase):
134    def generate_command(self) -> None:
135        """Set command to run."""
136        self.command = [self.west, 'build', '-d', str(self.device_config.app_build_dir), '-t', 'run']
137