1# Copyright (c) 2023 Nordic Semiconductor ASA 2# 3# SPDX-License-Identifier: Apache-2.0 4 5from __future__ import annotations 6 7import abc 8import logging 9import subprocess 10 11from twister_harness.device.device_adapter import DeviceAdapter 12from twister_harness.device.utils import log_command, terminate_process 13from twister_harness.exceptions import TwisterHarnessException 14from twister_harness.twister_harness_config import DeviceConfig 15 16logger = logging.getLogger(__name__) 17 18 19class BinaryAdapterBase(DeviceAdapter, abc.ABC): 20 def __init__(self, device_config: DeviceConfig) -> None: 21 """ 22 :param twister_config: twister configuration 23 """ 24 super().__init__(device_config) 25 self._process: subprocess.Popen | None = None 26 self.process_kwargs: dict = { 27 'stdout': subprocess.PIPE, 28 'stderr': subprocess.STDOUT, 29 'stdin': subprocess.PIPE, 30 'env': self.env, 31 'cwd': device_config.app_build_dir, 32 } 33 34 @abc.abstractmethod 35 def generate_command(self) -> None: 36 """Generate and set command which will be used during running device.""" 37 38 def _flash_and_run(self) -> None: 39 self._run_subprocess() 40 41 def _run_subprocess(self) -> None: 42 if not self.command: 43 msg = 'Run command is empty, please verify if it was generated properly.' 44 logger.error(msg) 45 raise TwisterHarnessException(msg) 46 log_command(logger, 'Running command', self.command, level=logging.DEBUG) 47 try: 48 self._process = subprocess.Popen(self.command, **self.process_kwargs) 49 except subprocess.SubprocessError as exc: 50 msg = f'Running subprocess failed due to SubprocessError {exc}' 51 logger.error(msg) 52 raise TwisterHarnessException(msg) from exc 53 except FileNotFoundError as exc: 54 msg = f'Running subprocess failed due to file not found: {exc.filename}' 55 logger.error(msg) 56 raise TwisterHarnessException(msg) from exc 57 except Exception as exc: 58 msg = f'Running subprocess failed {exc}' 59 logger.error(msg) 60 raise TwisterHarnessException(msg) from exc 61 62 def _connect_device(self) -> None: 63 """ 64 This method was implemented only to imitate standard connect behavior 65 like in Serial class. 66 """ 67 68 def _disconnect_device(self) -> None: 69 """ 70 This method was implemented only to imitate standard disconnect behavior 71 like in serial connection. 72 """ 73 74 def _close_device(self) -> None: 75 """Terminate subprocess""" 76 self._stop_subprocess() 77 78 def _stop_subprocess(self) -> None: 79 if self._process is None: 80 # subprocess already stopped 81 return 82 return_code: int | None = self._process.poll() 83 if return_code is None: 84 terminate_process(self._process) 85 return_code = self._process.wait(self.base_timeout) 86 self._process = None 87 logger.debug('Running subprocess finished with return code %s', return_code) 88 89 def _read_device_output(self) -> bytes: 90 return self._process.stdout.readline() 91 92 def _write_to_device(self, data: bytes) -> None: 93 self._process.stdin.write(data) 94 self._process.stdin.flush() 95 96 def _flush_device_output(self) -> None: 97 if self.is_device_running(): 98 self._process.stdout.flush() 99 100 def is_device_running(self) -> bool: 101 return self._device_run.is_set() and self._is_binary_running() 102 103 def _is_binary_running(self) -> bool: 104 if self._process is None or self._process.poll() is not None: 105 return False 106 return True 107 108 def is_device_connected(self) -> bool: 109 """Return true if device is connected.""" 110 return self.is_device_running() and self._device_connected.is_set() 111 112 def _clear_internal_resources(self) -> None: 113 super()._clear_internal_resources() 114 self._process = None 115 116 117class NativeSimulatorAdapter(BinaryAdapterBase): 118 """Simulator adapter to run `zephyr.exe` simulation""" 119 120 def generate_command(self) -> None: 121 """Set command to run.""" 122 self.command = [str(self.device_config.app_build_dir / 'zephyr' / 'zephyr.exe')] 123 124 125class UnitSimulatorAdapter(BinaryAdapterBase): 126 """Simulator adapter to run unit tests""" 127 128 def generate_command(self) -> None: 129 """Set command to run.""" 130 self.command = [str(self.device_config.app_build_dir / 'testbinary')] 131 132 133class CustomSimulatorAdapter(BinaryAdapterBase): 134 def generate_command(self) -> None: 135 """Set command to run.""" 136 self.command = [self.west, 'build', '-d', str(self.device_config.app_build_dir), '-t', 'run'] 137