1# Copyright (c) 2023 Nordic Semiconductor ASA
2#
3# SPDX-License-Identifier: Apache-2.0
4
5import logging
6import os
7import subprocess
8import time
9from pathlib import Path
10from typing import Generator
11from unittest import mock
12
13import pytest
14
15from twister_harness.device.binary_adapter import (
16    CustomSimulatorAdapter,
17    NativeSimulatorAdapter,
18    UnitSimulatorAdapter,
19)
20from twister_harness.exceptions import TwisterHarnessException, TwisterHarnessTimeoutException
21from twister_harness.twister_harness_config import DeviceConfig
22
23
24@pytest.fixture
25def script_path(resources: Path) -> str:
26    return str(resources.joinpath('mock_script.py'))
27
28
29@pytest.fixture(name='device')
30def fixture_device_adapter(tmp_path: Path) -> Generator[NativeSimulatorAdapter, None, None]:
31    build_dir = tmp_path / 'build_dir'
32    os.mkdir(build_dir)
33    device = NativeSimulatorAdapter(DeviceConfig(build_dir=build_dir, type='native', base_timeout=5.0))
34    try:
35        yield device
36    finally:
37        device.close()  # to make sure all running processes are closed
38
39
40@pytest.fixture(name='launched_device')
41def fixture_launched_device_adapter(
42    device: NativeSimulatorAdapter, script_path: str
43) -> Generator[NativeSimulatorAdapter, None, None]:
44    device.command = ['python3', script_path]
45    try:
46        device.launch()
47        yield device
48    finally:
49        device.close()  # to make sure all running processes are closed
50
51
52def test_if_binary_adapter_runs_without_errors(launched_device: NativeSimulatorAdapter) -> None:
53    """
54    Run script which prints text line by line and ends without errors.
55    Verify if subprocess was ended without errors, and without timeout.
56    """
57    device = launched_device
58    lines = device.readlines_until(regex='Returns with code')
59    device.close()
60    assert 'Readability counts.' in lines
61    assert os.path.isfile(device.handler_log_path)
62    with open(device.handler_log_path, 'r') as file:
63        file_lines = [line.strip() for line in file.readlines()]
64    assert file_lines[-2:] == lines[-2:]
65
66
67def test_if_binary_adapter_finishes_after_timeout_while_there_is_no_data_from_subprocess(
68    device: NativeSimulatorAdapter, script_path: str
69) -> None:
70    """Test if thread finishes after timeout when there is no data on stdout, but subprocess is still running"""
71    device.base_timeout = 0.3
72    device.command = ['python3', script_path, '--long-sleep', '--sleep=5']
73    device.launch()
74    with pytest.raises(TwisterHarnessTimeoutException, match='Read from device timeout occurred'):
75        device.readlines_until(regex='Returns with code')
76    device.close()
77    assert device._process is None
78    with open(device.handler_log_path, 'r') as file:
79        file_lines = [line.strip() for line in file.readlines()]
80    # this message should not be printed because script has been terminated due to timeout
81    assert 'End of script' not in file_lines, 'Script has not been terminated before end'
82
83
84def test_if_binary_adapter_raises_exception_empty_command(device: NativeSimulatorAdapter) -> None:
85    device.command = []
86    exception_msg = 'Run command is empty, please verify if it was generated properly.'
87    with pytest.raises(TwisterHarnessException, match=exception_msg):
88        device._flash_and_run()
89
90
91@mock.patch('subprocess.Popen', side_effect=subprocess.SubprocessError(1, 'Exception message'))
92def test_if_binary_adapter_raises_exception_when_subprocess_raised_subprocess_error(
93    patched_popen, device: NativeSimulatorAdapter
94) -> None:
95    device.command = ['echo', 'TEST']
96    with pytest.raises(TwisterHarnessException, match='Exception message'):
97        device._flash_and_run()
98
99
100@mock.patch('subprocess.Popen', side_effect=FileNotFoundError(1, 'File not found', 'fake_file.txt'))
101def test_if_binary_adapter_raises_exception_file_not_found(
102    patched_popen, device: NativeSimulatorAdapter
103) -> None:
104    device.command = ['echo', 'TEST']
105    with pytest.raises(TwisterHarnessException, match='fake_file.txt'):
106        device._flash_and_run()
107
108
109@mock.patch('subprocess.Popen', side_effect=Exception(1, 'Raised other exception'))
110def test_if_binary_adapter_raises_exception_when_subprocess_raised_an_error(
111    patched_popen, device: NativeSimulatorAdapter
112) -> None:
113    device.command = ['echo', 'TEST']
114    with pytest.raises(TwisterHarnessException, match='Raised other exception'):
115        device._flash_and_run()
116
117
118def test_if_binary_adapter_connect_disconnect_print_warnings_properly(
119    caplog: pytest.LogCaptureFixture, launched_device: NativeSimulatorAdapter
120) -> None:
121    device = launched_device
122    assert device._device_connected.is_set() and device.is_device_connected()
123    caplog.set_level(logging.DEBUG)
124    device.connect()
125    warning_msg = 'Device already connected'
126    assert warning_msg in caplog.text
127    for record in caplog.records:
128        if record.message == warning_msg:
129            assert record.levelname == 'DEBUG'
130            break
131    device.disconnect()
132    assert not device._device_connected.is_set() and not device.is_device_connected()
133    device.disconnect()
134    warning_msg = 'Device already disconnected'
135    assert warning_msg in caplog.text
136    for record in caplog.records:
137        if record.message == warning_msg:
138            assert record.levelname == 'DEBUG'
139            break
140
141
142def test_if_binary_adapter_raise_exc_during_connect_read_and_write_after_close(
143    launched_device: NativeSimulatorAdapter
144) -> None:
145    device = launched_device
146    assert device._device_run.is_set() and device.is_device_running()
147    device.close()
148    assert not device._device_run.is_set() and not device.is_device_running()
149    with pytest.raises(TwisterHarnessException, match='Cannot connect to not working device'):
150        device.connect()
151    with pytest.raises(TwisterHarnessException, match='No connection to the device'):
152        device.write(b'')
153    device.clear_buffer()
154    with pytest.raises(TwisterHarnessException, match='No connection to the device and no more data to read.'):
155        device.readline()
156
157
158def test_if_binary_adapter_raise_exc_during_read_and_write_after_close(
159    launched_device: NativeSimulatorAdapter
160) -> None:
161    device = launched_device
162    device.disconnect()
163    with pytest.raises(TwisterHarnessException, match='No connection to the device'):
164        device.write(b'')
165    device.clear_buffer()
166    with pytest.raises(TwisterHarnessException, match='No connection to the device and no more data to read.'):
167        device.readline()
168
169
170def test_if_binary_adapter_is_able_to_read_leftovers_after_disconnect_or_close(
171    device: NativeSimulatorAdapter, script_path: str
172) -> None:
173    device.command = ['python3', script_path, '--sleep=0.05']
174    device.launch()
175    device.readlines_until(regex='Beautiful is better than ugly.')
176    time.sleep(0.1)
177    device.disconnect()
178    assert len(device.readlines()) > 0
179    device.connect()
180    device.readlines_until(regex='Flat is better than nested.')
181    time.sleep(0.1)
182    device.close()
183    assert len(device.readlines()) > 0
184
185
186def test_if_binary_adapter_properly_send_data_to_subprocess(
187    shell_simulator_adapter: NativeSimulatorAdapter
188) -> None:
189    """Run shell_simulator.py program, send "zen" command and verify output."""
190    device = shell_simulator_adapter
191    time.sleep(0.1)
192    device.write(b'zen\n')
193    time.sleep(1)
194    lines = device.readlines_until(regex='Namespaces are one honking great idea')
195    assert 'The Zen of Python, by Tim Peters' in lines
196
197
198def test_if_native_binary_adapter_get_command_returns_proper_string(device: NativeSimulatorAdapter) -> None:
199    device.generate_command()
200    assert isinstance(device.command, list)
201    assert device.command == [str(device.device_config.build_dir / 'zephyr' / 'zephyr.exe')]
202
203
204@mock.patch('shutil.which', return_value='west')
205def test_if_custom_binary_adapter_get_command_returns_proper_string(patched_which, tmp_path: Path) -> None:
206    device = CustomSimulatorAdapter(DeviceConfig(build_dir=tmp_path, type='custom'))
207    device.generate_command()
208    assert isinstance(device.command, list)
209    assert device.command == ['west', 'build', '-d', str(tmp_path), '-t', 'run']
210
211
212@mock.patch('shutil.which', return_value=None)
213def test_if_custom_binary_adapter_raise_exception_when_west_not_found(patched_which, tmp_path: Path) -> None:
214    device = CustomSimulatorAdapter(DeviceConfig(build_dir=tmp_path, type='custom'))
215    with pytest.raises(TwisterHarnessException, match='west not found'):
216        device.generate_command()
217
218
219def test_if_unit_binary_adapter_get_command_returns_proper_string(tmp_path: Path) -> None:
220    device = UnitSimulatorAdapter(DeviceConfig(build_dir=tmp_path, type='unit'))
221    device.generate_command()
222    assert isinstance(device.command, list)
223    assert device.command == [str(tmp_path / 'testbinary')]
224