#!/usr/bin/env python3 # Copyright (c) 2023 Intel Corporation # # SPDX-License-Identifier: Apache-2.0 """ Tests for handlers.py classes' methods """ import itertools import mock import os import pytest import signal import subprocess import sys from contextlib import nullcontext from importlib import reload from serial import SerialException from subprocess import CalledProcessError, TimeoutExpired import twisterlib.harness from conftest import ZEPHYR_BASE from twisterlib.error import TwisterException from twisterlib.handlers import ( Handler, BinaryHandler, DeviceHandler, QEMUHandler, SimulationHandler ) @pytest.fixture def mocked_instance(tmp_path): instance = mock.Mock() testsuite = mock.Mock() type(testsuite).source_dir = mock.PropertyMock(return_value='') instance.testsuite = testsuite build_dir = tmp_path / 'build_dir' os.makedirs(build_dir) type(instance).build_dir = mock.PropertyMock(return_value=str(build_dir)) platform = mock.Mock() type(platform).binaries = mock.PropertyMock(return_value=[]) instance.platform = platform type(instance.testsuite).timeout = mock.PropertyMock(return_value=60) type(instance.platform).timeout_multiplier = mock.PropertyMock( return_value=2 ) instance.status = None instance.reason = 'Unknown' return instance @pytest.fixture def faux_timer(): class Counter: def __init__(self): self.t = 0 def time(self): self.t += 1 return self.t return Counter() TESTDATA_1 = [ (True, False, 'posix', ['Install pyserial python module with pip to use' \ ' --device-testing option.'], None), (False, True, 'nt', [], None), (True, True, 'posix', ['Install pyserial python module with pip to use' \ ' --device-testing option.'], ImportError), ] @pytest.mark.parametrize( 'fail_serial, fail_pty, os_name, expected_outs, expected_error', TESTDATA_1, ids=['import serial', 'import pty nt', 'import serial+pty posix'] ) def test_imports( capfd, fail_serial, fail_pty, os_name, expected_outs, expected_error ): class ImportRaiser: def find_spec(self, fullname, path, target=None): if fullname == 'serial' and fail_serial: raise ImportError() if fullname == 'pty' and fail_pty: raise ImportError() modules_mock = sys.modules.copy() modules_mock['serial'] = None if fail_serial else modules_mock['serial'] modules_mock['pty'] = None if fail_pty else modules_mock['pty'] meta_path_mock = sys.meta_path[:] meta_path_mock.insert(0, ImportRaiser()) with mock.patch('os.name', os_name), \ mock.patch.dict('sys.modules', modules_mock, clear=True), \ mock.patch('sys.meta_path', meta_path_mock), \ pytest.raises(expected_error) if expected_error else nullcontext(): reload(twisterlib.handlers) out, _ = capfd.readouterr() assert all([expected_out in out for expected_out in expected_outs]) def test_handler_final_handle_actions(mocked_instance): instance = mocked_instance instance.testcases = [mock.Mock()] handler = Handler(mocked_instance) handler.suite_name_check = True harness = twisterlib.harness.Test() harness.state = mock.Mock() harness.detected_suite_names = mock.Mock() harness.matched_run_id = False harness.run_id_exists = True handler_time = mock.Mock() handler._final_handle_actions(harness, handler_time) assert handler.instance.status == 'failed' assert handler.instance.execution_time == handler_time assert handler.instance.reason == 'RunID mismatch' assert all(testcase.status == 'failed' for \ testcase in handler.instance.testcases) handler.instance.reason = 'This reason shan\'t be changed.' handler._final_handle_actions(harness, handler_time) assert handler.instance.reason == 'This reason shan\'t be changed.' TESTDATA_2 = [ (['dummy_testsuite_name'], False), ([], True), (['another_dummy_name', 'yet_another_dummy_name'], True), ] @pytest.mark.parametrize( 'detected_suite_names, should_be_called', TESTDATA_2, ids=['detected one expected', 'detected none', 'detected two unexpected'] ) def test_handler_verify_ztest_suite_name( mocked_instance, detected_suite_names, should_be_called ): instance = mocked_instance type(instance.testsuite).ztest_suite_names = ['dummy_testsuite_name'] harness_state = 'passed' handler_time = mock.Mock() with mock.patch.object(Handler, '_missing_suite_name') as _missing_mocked: handler = Handler(instance) handler._verify_ztest_suite_name( harness_state, detected_suite_names, handler_time ) if should_be_called: _missing_mocked.assert_called_once() else: _missing_mocked.assert_not_called() def test_handler_missing_suite_name(mocked_instance): instance = mocked_instance instance.testcases = [mock.Mock()] handler = Handler(mocked_instance) handler.suite_name_check = True expected_suite_names = ['dummy_testsuite_name'] handler_time = mock.Mock() handler._missing_suite_name(expected_suite_names, handler_time) assert handler.instance.status == 'failed' assert handler.instance.execution_time == handler_time assert handler.instance.reason == 'Testsuite mismatch' assert all( testcase.status == 'failed' for testcase in handler.instance.testcases ) def test_handler_record(mocked_instance): instance = mocked_instance instance.testcases = [mock.Mock()] handler = Handler(instance) harness = twisterlib.harness.Harness() harness.recording = [ {'field_1': 'recording_1_1', 'field_2': 'recording_1_2'}, {'field_1': 'recording_2_1', 'field_2': 'recording_2_2'} ] with mock.patch( 'builtins.open', mock.mock_open(read_data='') ) as mock_file, \ mock.patch( 'csv.DictWriter.writerow', mock.Mock() ) as mock_writeheader, \ mock.patch( 'csv.DictWriter.writerows', mock.Mock() ) as mock_writerows: handler.record(harness) print(mock_file.mock_calls) mock_file.assert_called_with( os.path.join(instance.build_dir, 'recording.csv'), 'at' ) mock_writeheader.assert_has_calls([mock.call({ k:k for k in harness.recording[0].keys()})]) mock_writerows.assert_has_calls([mock.call(harness.recording)]) def test_handler_terminate(mocked_instance): def mock_kill_function(pid, sig): if pid < 0: raise ProcessLookupError instance = mocked_instance handler = Handler(instance) mock_process = mock.Mock() mock_child1 = mock.Mock(pid=1) mock_child2 = mock.Mock(pid=2) mock_process.children = mock.Mock(return_value=[mock_child1, mock_child2]) mock_proc = mock.Mock(pid=0) mock_proc.terminate = mock.Mock(return_value=None) mock_proc.kill = mock.Mock(return_value=None) with mock.patch('psutil.Process', return_value=mock_process), \ mock.patch( 'os.kill', mock.Mock(side_effect=mock_kill_function) ) as mock_kill: handler.terminate(mock_proc) assert handler.terminated mock_proc.terminate.assert_called_once() mock_proc.kill.assert_called_once() mock_kill.assert_has_calls( [mock.call(1, signal.SIGTERM), mock.call(2, signal.SIGTERM)] ) mock_child_neg1 = mock.Mock(pid=-1) mock_process.children = mock.Mock( return_value=[mock_child_neg1, mock_child2] ) handler.terminated = False mock_kill.reset_mock() handler.terminate(mock_proc) mock_kill.assert_has_calls( [mock.call(-1, signal.SIGTERM), mock.call(2, signal.SIGTERM)] ) def test_binaryhandler_try_kill_process_by_pid(mocked_instance): def mock_kill_function(pid, sig): if pid < 0: raise ProcessLookupError instance = mocked_instance handler = BinaryHandler(instance, 'build') handler.pid_fn = os.path.join('dummy', 'path', 'to', 'pid.pid') with mock.patch( 'os.kill', mock.Mock(side_effect=mock_kill_function) ) as mock_kill, \ mock.patch('os.unlink', mock.Mock()) as mock_unlink: with mock.patch('builtins.open', mock.mock_open(read_data='1')): handler.try_kill_process_by_pid() mock_unlink.assert_called_once_with( os.path.join('dummy', 'path', 'to', 'pid.pid') ) mock_kill.assert_called_once_with(1, signal.SIGKILL) mock_unlink.reset_mock() mock_kill.reset_mock() handler.pid_fn = os.path.join('dummy', 'path', 'to', 'pid.pid') with mock.patch('builtins.open', mock.mock_open(read_data='-1')): handler.try_kill_process_by_pid() mock_unlink.assert_called_once_with( os.path.join('dummy', 'path', 'to', 'pid.pid') ) mock_kill.assert_called_once_with(-1, signal.SIGKILL) TESTDATA_3 = [ ( [b'This\\r\\n', b'is\r', b'a short', b'file.'], mock.Mock(state=False, capture_coverage=False), [ mock.call('This\\r\\n'), mock.call('is\r'), mock.call('a short'), mock.call('file.') ], [ mock.call('This'), mock.call('is'), mock.call('a short'), mock.call('file.') ], None, False ), ( [b'Too much.'] * 120, # Should be more than the timeout mock.Mock(state=False, capture_coverage=False), None, None, True, False ), ( [b'Too much.'] * 120, # Should be more than the timeout mock.Mock(state=True, capture_coverage=False), None, None, True, False ), ( [b'Too much.'] * 120, # Should be more than the timeout mock.Mock(state=True, capture_coverage=True), None, None, False, True ), ] @pytest.mark.parametrize( 'proc_stdout, harness, expected_handler_calls,' ' expected_harness_calls, should_be_less, timeout_wait', TESTDATA_3, ids=[ 'no timeout', 'timeout', 'timeout with harness state', 'timeout with capture_coverage, wait timeout' ] ) def test_binaryhandler_output_handler( mocked_instance, faux_timer, proc_stdout, harness, expected_handler_calls, expected_harness_calls, should_be_less, timeout_wait ): class MockStdout(mock.Mock): def __init__(self, text): super().__init__(text) self.text = text self.line_index = 0 def readline(self): if self.line_index == len(self.text): self.line_index = 0 return b'' else: line = self.text[self.line_index] self.line_index += 1 return line class MockProc(mock.Mock): def __init__(self, pid, stdout): super().__init__(pid, stdout) self.pid = mock.PropertyMock(return_value=pid) self.stdout = MockStdout(stdout) def wait(self, *args, **kwargs): if timeout_wait: raise TimeoutExpired('dummy cmd', 'dummyamount') handler = BinaryHandler(mocked_instance, 'build') handler.terminate = mock.Mock() handler.options = mock.Mock(timeout_multiplier=1) proc = MockProc(1, proc_stdout) with mock.patch( 'builtins.open', mock.mock_open(read_data='') ) as mock_file, \ mock.patch('time.time', side_effect=faux_timer.time): handler._output_handler(proc, harness) mock_file.assert_called_with(handler.log, 'wt') if expected_handler_calls: mock_file.return_value.write.assert_has_calls(expected_handler_calls) if expected_harness_calls: harness.handle.assert_has_calls(expected_harness_calls) if should_be_less is not None: if should_be_less: assert mock_file.return_value.write.call_count < len(proc_stdout) else: assert mock_file.return_value.write.call_count == len(proc_stdout) if timeout_wait: handler.terminate.assert_called_once_with(proc) TESTDATA_4 = [ (True, False, False, True, None, None, ['valgrind', '--error-exitcode=2', '--leak-check=full', f'--suppressions={ZEPHYR_BASE}/scripts/valgrind.supp', '--log-file=build_dir/valgrind.log', '--track-origins=yes', 'generator', 'run_renode_test']), (False, True, False, False, 123, None, ['generator', 'run', '--seed=123']), (False, False, True, False, None, None, ['west', 'flash', '--skip-rebuild', '-d', 'build_dir']), (False, False, False, False, None, ['ex1', 'ex2'], ['bin', 'ex1', 'ex2']), ] @pytest.mark.parametrize( 'robot_test, call_make_run, call_west_flash, enable_valgrind, seed,' \ ' extra_args, expected', TESTDATA_4, ids=['robot, valgrind', 'make run, seed', 'west flash', 'binary, extra'] ) def test_binaryhandler_create_command( mocked_instance, robot_test, call_make_run, call_west_flash, enable_valgrind, seed, extra_args, expected ): handler = BinaryHandler(mocked_instance, 'build') handler.generator_cmd = 'generator' handler.binary = 'bin' handler.call_make_run = call_make_run handler.call_west_flash = call_west_flash handler.options = mock.Mock(enable_valgrind=enable_valgrind) handler.seed = seed handler.extra_test_args = extra_args handler.build_dir = 'build_dir' command = handler._create_command(robot_test) assert command == expected TESTDATA_5 = [ (False, False, False), (True, False, False), (True, True, False), (False, False, True), ] @pytest.mark.parametrize( 'enable_asan, enable_lsan, enable_ubsan', TESTDATA_5, ids=['none', 'asan', 'asan, lsan', 'ubsan'] ) def test_binaryhandler_create_env( mocked_instance, enable_asan, enable_lsan, enable_ubsan ): handler = BinaryHandler(mocked_instance, 'build') handler.options = mock.Mock( enable_asan=enable_asan, enable_lsan=enable_lsan, enable_ubsan=enable_ubsan ) env = { 'example_env_var': True, 'ASAN_OPTIONS': 'dummy=dummy:', 'UBSAN_OPTIONS': 'dummy=dummy:' } with mock.patch('os.environ', env): res = handler._create_env() assert env['example_env_var'] == res['example_env_var'] if enable_ubsan: assert env['UBSAN_OPTIONS'] in res['UBSAN_OPTIONS'] assert 'log_path=stdout:' in res['UBSAN_OPTIONS'] assert 'halt_on_error=1:' in res['UBSAN_OPTIONS'] if enable_asan: assert env['ASAN_OPTIONS'] in res['ASAN_OPTIONS'] assert 'log_path=stdout:' in res['ASAN_OPTIONS'] if not enable_lsan: assert 'detect_leaks=0' in res['ASAN_OPTIONS'] TESTDATA_6 = [ (None, False, 2, True, 'failed', 'Valgrind error', False), (None, False, 1, False, 'failed', 'Failed', False), ('failed', False, 0, False, 'failed', 'Failed', False), ('success', False, 0, False, 'success', 'Unknown', False), (None, True, 1, True, 'failed', 'Timeout', True), ] @pytest.mark.parametrize( 'harness_state, terminated, returncode, enable_valgrind,' \ ' expected_status, expected_reason, do_add_missing', TESTDATA_6, ids=['valgrind error', 'failed', 'harness failed', 'success', 'no state'] ) def test_binaryhandler_update_instance_info( mocked_instance, harness_state, terminated, returncode, enable_valgrind, expected_status, expected_reason, do_add_missing ): handler = BinaryHandler(mocked_instance, 'build') handler_time = 59 handler.terminated = terminated handler.returncode = returncode handler.options = mock.Mock(enable_valgrind=enable_valgrind) missing_mock = mock.Mock() handler.instance.add_missing_case_status = missing_mock handler._update_instance_info(harness_state, handler_time) assert handler.instance.execution_time == handler_time assert handler.instance.status == expected_status assert handler.instance.reason == expected_reason if do_add_missing: missing_mock.assert_called_once_with('blocked', expected_reason) TESTDATA_7 = [ (True, False, False), (False, True, False), (False, False, True), ] @pytest.mark.parametrize( 'is_robot_test, coverage, isatty', TESTDATA_7, ids=['robot test', 'coverage', 'isatty'] ) def test_binaryhandler_handle( mocked_instance, caplog, is_robot_test, coverage, isatty ): thread_mock_obj = mock.Mock() def mock_popen(command, *args, **kwargs,): return mock.Mock( __enter__=mock.Mock(return_value=mock.Mock(pid=0, returncode=0)), __exit__=mock.Mock(return_value=None) ) def mock_thread(target, *args, **kwargs): return thread_mock_obj handler = BinaryHandler(mocked_instance, 'build') handler.sourcedir = 'source_dir' handler.build_dir = 'build_dir' handler.name= 'Dummy Name' handler._create_command = mock.Mock(return_value=['dummy' , 'command']) handler._create_env = mock.Mock(return_value=[]) handler._update_instance_info = mock.Mock() handler._final_handle_actions = mock.Mock() handler.terminate = mock.Mock() handler.try_kill_process_by_pid = mock.Mock() handler.options = mock.Mock(coverage=coverage) robot_mock = mock.Mock() harness = mock.Mock(is_robot_test=is_robot_test, run_robot_test=robot_mock) popen_mock = mock.Mock(side_effect=mock_popen) thread_mock = mock.Mock(side_effect=mock_thread) call_mock = mock.Mock() with mock.patch('subprocess.call', call_mock), \ mock.patch('subprocess.Popen', popen_mock), \ mock.patch('threading.Thread', thread_mock), \ mock.patch('sys.stdout.isatty', return_value=isatty): handler.handle(harness) if is_robot_test: robot_mock.assert_called_once_with(['dummy', 'command'], mock.ANY) return assert 'Spawning BinaryHandler Thread for Dummy Name' in caplog.text thread_mock_obj.join.assert_called() handler._update_instance_info.assert_called_once() handler._final_handle_actions.assert_called_once() if isatty: call_mock.assert_any_call(['stty', 'sane'], stdin=mock.ANY) TESTDATA_8 = [ ('renode', True, True, False, False), ('native', False, False, False, True), ('build', False, True, False, False), ] @pytest.mark.parametrize( 'type_str, is_pid_fn, expected_call_make_run, is_binary, expected_ready', TESTDATA_8, ids=[t[0] for t in TESTDATA_8] ) def test_simulationhandler_init( mocked_instance, type_str, is_pid_fn, expected_call_make_run, is_binary, expected_ready ): handler = SimulationHandler(mocked_instance, type_str) assert handler.call_make_run == expected_call_make_run assert handler.ready == expected_ready if is_pid_fn: assert handler.pid_fn == os.path.join(mocked_instance.build_dir, 'renode.pid') if is_binary: assert handler.pid_fn == os.path.join(mocked_instance.build_dir, 'zephyr', 'zephyr.exe') TESTDATA_9 = [ (3, 2, 0, 0, 3, -1, True, False, False, 1), (4, 1, 0, 0, -1, -1, False, True, False, 0), (5, 0, 1, 2, -1, 4, False, False, True, 3) ] @pytest.mark.parametrize( 'success_count, in_waiting_count, oserror_count, readline_error_count,' ' haltless_count, stateless_count, end_by_halt, end_by_close,' ' end_by_state, expected_line_count', TESTDATA_9, ids=[ 'halt event', 'serial closes', 'harness state with errors' ] ) def test_devicehandler_monitor_serial( mocked_instance, success_count, in_waiting_count, oserror_count, readline_error_count, haltless_count, stateless_count, end_by_halt, end_by_close, end_by_state, expected_line_count ): is_open_iter = iter(lambda: True, False) line_iter = [ TypeError('dummy TypeError') if x % 2 else \ SerialException('dummy SerialException') for x in range( readline_error_count ) ] + [ f'line no {idx}'.encode('utf-8') for idx in range(success_count) ] in_waiting_iter = [False] * in_waiting_count + [ TypeError('dummy TypeError') ] if end_by_close else ( [OSError('dummy OSError')] * oserror_count + [False] * in_waiting_count ) + [True] * (success_count + readline_error_count) is_set_iter = [False] * haltless_count + [True] \ if end_by_halt else iter(lambda: False, True) state_iter = [False] * stateless_count + [True] \ if end_by_state else iter(lambda: False, True) halt_event = mock.Mock(is_set=mock.Mock(side_effect=is_set_iter)) ser = mock.Mock( isOpen=mock.Mock(side_effect=is_open_iter), readline=mock.Mock(side_effect=line_iter) ) type(ser).in_waiting = mock.PropertyMock( side_effect=in_waiting_iter, return_value=False ) harness = mock.Mock(capture_coverage=False) type(harness).state=mock.PropertyMock(side_effect=state_iter) handler = DeviceHandler(mocked_instance, 'build') handler.options = mock.Mock(enable_coverage=not end_by_state) with mock.patch('builtins.open', mock.mock_open(read_data='')): handler.monitor_serial(ser, halt_event, harness) if not end_by_close: ser.close.assert_called_once() harness.handle.assert_has_calls( [mock.call(f'line no {idx}') for idx in range(expected_line_count)] ) TESTDATA_10 = [ ( 'dummy_platform', 'dummy fixture', [ mock.Mock( fixtures=[], platform='dummy_platform', available=1, counter=0 ), mock.Mock( fixtures=['dummy fixture'], platform='another_platform', available=1, counter=0 ), mock.Mock( fixtures=['dummy fixture'], platform='dummy_platform', serial_pty=None, serial=None, available=1, counter=0 ), mock.Mock( fixtures=['dummy fixture'], platform='dummy_platform', serial_pty=mock.Mock(), available=1, counter=0 ) ], 3 ), ( 'dummy_platform', 'dummy fixture', [], TwisterException ), ( 'dummy_platform', 'dummy fixture', [ mock.Mock( fixtures=['dummy fixture'], platform='dummy_platform', serial_pty=mock.Mock(), available=0 ), mock.Mock( fixtures=['another fixture'], platform='dummy_platform', serial_pty=mock.Mock(), available=0 ), mock.Mock( fixtures=['dummy fixture'], platform='dummy_platform', serial=mock.Mock(), available=0 ), mock.Mock( fixtures=['another fixture'], platform='dummy_platform', serial=mock.Mock(), available=0 ) ], None ) ] @pytest.mark.parametrize( 'platform_name, fixture, duts, expected', TESTDATA_10, ids=['one good dut', 'exception - no duts', 'no available duts'] ) def test_devicehandler_device_is_available( mocked_instance, platform_name, fixture, duts, expected ): mocked_instance.platform.name = platform_name mocked_instance.testsuite.harness_config = {'fixture': fixture} handler = DeviceHandler(mocked_instance, 'build') handler.duts = duts if isinstance(expected, int): device = handler.device_is_available(mocked_instance) assert device == duts[expected] assert device.available == 0 assert device.counter == 1 elif expected is None: device = handler.device_is_available(mocked_instance) assert device is None elif isinstance(expected, type): with pytest.raises(expected): device = handler.device_is_available(mocked_instance) else: assert False def test_devicehandler_make_device_available(mocked_instance): serial = mock.Mock(name='dummy_serial') duts = [ mock.Mock(available=0, serial=serial, serial_pty=None), mock.Mock(available=0, serial=None, serial_pty=serial), mock.Mock( available=0, serial=mock.Mock('another_serial'), serial_pty=None ) ] handler = DeviceHandler(mocked_instance, 'build') handler.duts = duts handler.make_device_available(serial) assert len([None for d in handler.duts if d.available == 1]) == 2 assert handler.duts[2].available == 0 TESTDATA_11 = [ (mock.Mock(pid=0, returncode=0), False), (mock.Mock(pid=0, returncode=1), False), (mock.Mock(pid=0, returncode=1), True) ] @pytest.mark.parametrize( 'mock_process, raise_timeout', TESTDATA_11, ids=['proper script', 'error', 'timeout'] ) def test_devicehandler_run_custom_script(caplog, mock_process, raise_timeout): def raise_timeout_fn(timeout=-1): if raise_timeout and timeout != -1: raise subprocess.TimeoutExpired(None, timeout) else: return mock.Mock(), mock.Mock() def assert_popen(command, *args, **kwargs): return mock.Mock( __enter__=mock.Mock(return_value=mock_process), __exit__=mock.Mock(return_value=None) ) mock_process.communicate = mock.Mock(side_effect=raise_timeout_fn) script = [os.path.join('test','script', 'path'), 'arg'] timeout = 60 with mock.patch('subprocess.Popen', side_effect=assert_popen): DeviceHandler.run_custom_script(script, timeout) if raise_timeout: assert all( t in caplog.text.lower() for t in [str(script), 'timed out'] ) mock_process.assert_has_calls( [ mock.call.communicate(timeout=timeout), mock.call.kill(), mock.call.communicate() ] ) elif mock_process.returncode == 0: assert not any([r.levelname == 'ERROR' for r in caplog.records]) else: assert 'timed out' not in caplog.text.lower() assert 'custom script failure' in caplog.text.lower() TESTDATA_12 = [ (0, False), (4, False), (0, True) ] @pytest.mark.parametrize( 'num_of_failures, raise_exception', TESTDATA_12, ids=['no failures', 'with failures', 'exception'] ) def test_devicehandler_get_hardware( mocked_instance, caplog, num_of_failures, raise_exception ): expected_hardware = mock.Mock() def mock_availability(handler, instance, no=num_of_failures): if raise_exception: raise TwisterException(f'dummy message') if handler.no: handler.no -= 1 return None return expected_hardware handler = DeviceHandler(mocked_instance, 'build') handler.no = num_of_failures with mock.patch.object( DeviceHandler, 'device_is_available', mock_availability ): hardware = handler.get_hardware() if raise_exception: assert 'dummy message' in caplog.text.lower() assert mocked_instance.status == 'failed' assert mocked_instance.reason == 'dummy message' else: assert hardware == expected_hardware TESTDATA_13 = [ ( None, None, None, ['generator_cmd', '-C', '$build_dir', 'flash'] ), ( [], None, None, ['west', 'flash', '--skip-rebuild', '-d', '$build_dir'] ), ( '--dummy', None, None, ['west', 'flash', '--skip-rebuild', '-d', '$build_dir', '--', '--dummy'] ), ( '--dummy1,--dummy2', None, None, ['west', 'flash', '--skip-rebuild', '-d', '$build_dir', '--', '--dummy1', '--dummy2'] ), ( None, 'runner', 'product', ['west', 'flash', '--skip-rebuild', '-d', '$build_dir', '--runner', 'runner', 'param1', 'param2'] ), ( None, 'pyocd', 'product', ['west', 'flash', '--skip-rebuild', '-d', '$build_dir', '--runner', 'pyocd', 'param1', 'param2', '--', '--dev-id', 12345] ), ( None, 'nrfjprog', 'product', ['west', 'flash', '--skip-rebuild', '-d', '$build_dir', '--runner', 'nrfjprog', 'param1', 'param2', '--', '--dev-id', 12345] ), ( None, 'openocd', 'STM32 STLink', ['west', 'flash', '--skip-rebuild', '-d', '$build_dir', '--runner', 'openocd', 'param1', 'param2', '--', '--cmd-pre-init', 'hla_serial 12345'] ), ( None, 'openocd', 'STLINK-V3', ['west', 'flash', '--skip-rebuild', '-d', '$build_dir', '--runner', 'openocd', 'param1', 'param2', '--', '--cmd-pre-init', 'hla_serial 12345'] ), ( None, 'openocd', 'EDBG CMSIS-DAP', ['west', 'flash', '--skip-rebuild', '-d', '$build_dir', '--runner', 'openocd', 'param1', 'param2', '--', '--cmd-pre-init', 'cmsis_dap_serial 12345'] ), ( None, 'jlink', 'product', ['west', 'flash', '--skip-rebuild', '-d', '$build_dir', '--runner', 'jlink', '--tool-opt=-SelectEmuBySN 12345', # 2x space 'param1', 'param2'] ), ( None, 'stm32cubeprogrammer', 'product', ['west', 'flash', '--skip-rebuild', '-d', '$build_dir', '--runner', 'stm32cubeprogrammer', '--tool-opt=sn=12345', 'param1', 'param2'] ), ] TESTDATA_13_2 = [(True), (False)] @pytest.mark.parametrize( 'self_west_flash, runner,' \ ' hardware_product_name, expected', TESTDATA_13, ids=['generator', '--west-flash', 'one west flash value', 'multiple west flash values', 'generic runner', 'pyocd', 'nrfjprog', 'openocd, STM32 STLink', 'openocd, STLINK-v3', 'openocd, EDBG CMSIS-DAP', 'jlink', 'stm32cubeprogrammer'] ) @pytest.mark.parametrize('hardware_probe', TESTDATA_13_2, ids=['probe', 'id']) def test_devicehandler_create_command( mocked_instance, self_west_flash, runner, hardware_probe, hardware_product_name, expected ): handler = DeviceHandler(mocked_instance, 'build') handler.options = mock.Mock(west_flash=self_west_flash) handler.generator_cmd = 'generator_cmd' expected = [handler.build_dir if val == '$build_dir' else \ val for val in expected] hardware = mock.Mock( product=hardware_product_name, probe_id=12345 if hardware_probe else None, id=12345 if not hardware_probe else None, runner_params=['param1', 'param2'] ) command = handler._create_command(runner, hardware) assert command == expected TESTDATA_14 = [ ('success', False, 'success', 'Unknown', False), ('failed', False, 'failed', 'Failed', True), ('error', False, 'error', 'Unknown', True), (None, True, None, 'Unknown', False), (None, False, 'failed', 'Timeout', True), ] @pytest.mark.parametrize( 'harness_state, flash_error,' \ ' expected_status, expected_reason, do_add_missing', TESTDATA_14, ids=['success', 'failed', 'error', 'flash error', 'no status'] ) def test_devicehandler_update_instance_info( mocked_instance, harness_state, flash_error, expected_status, expected_reason, do_add_missing ): handler = DeviceHandler(mocked_instance, 'build') handler_time = 59 missing_mock = mock.Mock() handler.instance.add_missing_case_status = missing_mock handler._update_instance_info(harness_state, handler_time, flash_error) assert handler.instance.execution_time == handler_time assert handler.instance.status == expected_status assert handler.instance.reason == expected_reason if do_add_missing: missing_mock.assert_called_once_with('blocked', expected_reason) TESTDATA_15 = [ ('dummy device', 'dummy pty', None, None, True, False, False), ( 'dummy device', 'dummy pty', mock.Mock(communicate=mock.Mock(return_value=('', ''))), SerialException, False, True, 'dummy pty' ), ( 'dummy device', None, None, SerialException, False, False, 'dummy device' ) ] @pytest.mark.parametrize( 'serial_device, serial_pty, ser_pty_process, expected_exception,' \ ' expected_result, terminate_ser_pty_process, make_available', TESTDATA_15, ids=['valid', 'serial pty process', 'no serial pty'] ) def test_devicehandler_create_serial_connection( mocked_instance, serial_device, serial_pty, ser_pty_process, expected_exception, expected_result, terminate_ser_pty_process, make_available ): def mock_serial(*args, **kwargs): if expected_exception: raise expected_exception('') return expected_result handler = DeviceHandler(mocked_instance, 'build') handler.make_device_available = mock.Mock() missing_mock = mock.Mock() handler.instance.add_missing_case_status = missing_mock available_mock = mock.Mock() handler.make_device_available = available_mock handler.options = mock.Mock(timeout_multiplier=1) hardware_baud = 14400 flash_timeout = 60 serial_mock = mock.Mock(side_effect=mock_serial) with mock.patch('serial.Serial', serial_mock), \ pytest.raises(expected_exception) if expected_exception else \ nullcontext(): result = handler._create_serial_connection(serial_device, hardware_baud, flash_timeout, serial_pty, ser_pty_process) if expected_result: assert result is not None if expected_exception: assert handler.instance.status == 'failed' assert handler.instance.reason == 'Serial Device Error' missing_mock.assert_called_once_with('blocked', 'Serial Device Error') if terminate_ser_pty_process: ser_pty_process.terminate.assert_called_once() ser_pty_process.communicate.assert_called_once() if make_available: available_mock.assert_called_once_with(make_available) TESTDATA_16 = [ ('dummy1 dummy2', None, 'slave name'), ('dummy1,dummy2', CalledProcessError, None), (None, None, 'dummy hardware serial'), ] @pytest.mark.parametrize( 'serial_pty, popen_exception, expected_device', TESTDATA_16, ids=['pty', 'pty process error', 'no pty'] ) def test_devicehandler_get_serial_device( mocked_instance, serial_pty, popen_exception, expected_device ): def mock_popen(command, *args, **kwargs): assert command == ['dummy1', 'dummy2'] if popen_exception: raise popen_exception(command, 'Dummy error') return mock.Mock() handler = DeviceHandler(mocked_instance, 'build') hardware_serial = 'dummy hardware serial' popen_mock = mock.Mock(side_effect=mock_popen) openpty_mock = mock.Mock(return_value=('master', 'slave')) ttyname_mock = mock.Mock(side_effect=lambda x: x + ' name') with mock.patch('subprocess.Popen', popen_mock), \ mock.patch('pty.openpty', openpty_mock), \ mock.patch('os.ttyname', ttyname_mock): result = handler._get_serial_device(serial_pty, hardware_serial) if popen_exception: assert result is None else: assert result[0] == expected_device TESTDATA_17 = [ (False, False, False, False, None, False, False, None, None, []), (True, True, False, False, None, False, False, None, None, []), (True, False, True, False, None, False, False, 'error', 'Device issue (Flash error)', []), (True, False, False, True, None, False, False, 'error', 'Device issue (Timeout)', ['Flash operation timed out.']), (True, False, False, False, 1, False, False, 'error', 'Device issue (Flash error?)', []), (True, False, False, False, 0, True, False, None, None, ['Timed out while monitoring serial output on IPName']), (True, False, False, False, 0, False, True, None, None, ['Process Serial PTY terminated outs: errs ']), ] @pytest.mark.parametrize( 'has_hardware, raise_create_serial, raise_popen, raise_timeout,' \ ' returncode, do_timeout_thread, use_pty,' \ ' expected_status, expected_reason, expected_logs', TESTDATA_17, ids=['no hardware', 'create serial failure', 'popen called process error', 'communicate timeout', 'nonzero returncode', 'valid pty', 'valid dev'] ) def test_devicehandler_handle( mocked_instance, caplog, has_hardware, raise_create_serial, raise_popen, raise_timeout, returncode, do_timeout_thread, use_pty, expected_status, expected_reason, expected_logs ): def mock_get_serial(serial_pty, hardware_serial): if serial_pty: serial_pty_process = mock.Mock( name='dummy serial PTY process', communicate=mock.Mock( return_value=('', '') ) ) return 'dummy serial PTY device', serial_pty_process return 'dummy serial device', None def mock_create_serial(*args, **kwargs): if raise_create_serial: raise SerialException('dummy cmd', 'dummy msg') return mock.Mock(name='dummy serial') def mock_thread(*args, **kwargs): is_alive_mock = mock.Mock(return_value=bool(do_timeout_thread)) return mock.Mock(is_alive=is_alive_mock) def mock_terminate(proc, *args, **kwargs): proc.communicate = mock.Mock(return_value=(mock.Mock(), mock.Mock())) def mock_communicate(*args, **kwargs): if raise_timeout: raise TimeoutExpired('dummy cmd', 'dummyamount') return mock.Mock(), mock.Mock() def mock_popen(command, *args, **kwargs): if raise_popen: raise CalledProcessError('dummy proc', 'dummy msg') mock_process = mock.Mock( pid=1, returncode=returncode, communicate=mock.Mock(side_effect=mock_communicate) ) return mock.Mock( __enter__=mock.Mock(return_value=mock_process), __exit__=mock.Mock(return_value=None) ) hardware = None if not has_hardware else mock.Mock( baud=14400, runner='dummy runner', serial_pty='Serial PTY' if use_pty else None, serial='dummy serial', pre_script='dummy pre script', post_script='dummy post script', post_flash_script='dummy post flash script', flash_timeout=60, flash_with_test=True ) handler = DeviceHandler(mocked_instance, 'build') handler.get_hardware = mock.Mock(return_value=hardware) handler.options = mock.Mock( timeout_multiplier=1, west_flash=None, west_runner=None ) handler._get_serial_device = mock.Mock(side_effect=mock_get_serial) handler._create_command = mock.Mock(return_value=['dummy', 'command']) handler.run_custom_script = mock.Mock() handler._create_serial_connection = mock.Mock( side_effect=mock_create_serial ) handler.monitor_serial = mock.Mock() handler.terminate = mock.Mock(side_effect=mock_terminate) handler._update_instance_info = mock.Mock() handler._final_handle_actions = mock.Mock() handler.make_device_available = mock.Mock() handler.instance.platform.name = 'IPName' harness = mock.Mock() with mock.patch('builtins.open', mock.mock_open(read_data='')), \ mock.patch('subprocess.Popen', side_effect=mock_popen), \ mock.patch('threading.Event', mock.Mock()), \ mock.patch('threading.Thread', side_effect=mock_thread): handler.handle(harness) handler.get_hardware.assert_called_once() messages = [record.msg for record in caplog.records] assert all([msg in messages for msg in expected_logs]) if not has_hardware: return handler.run_custom_script.assert_has_calls([ mock.call('dummy pre script', mock.ANY) ]) if raise_create_serial: return handler.run_custom_script.assert_has_calls([ mock.call('dummy pre script', mock.ANY), mock.call('dummy post flash script', mock.ANY), mock.call('dummy post script', mock.ANY) ]) if expected_reason: assert handler.instance.reason == expected_reason if expected_status: assert handler.instance.status == expected_status handler.make_device_available.assert_called_once_with( 'Serial PTY' if use_pty else 'dummy serial device' ) TESTDATA_18 = [ (True, True, True), (False, False, False), ] @pytest.mark.parametrize( 'ignore_qemu_crash, expected_ignore_crash, expected_ignore_unexpected_eof', TESTDATA_18, ids=['ignore crash', 'qemu crash'] ) def test_qemuhandler_init( mocked_instance, ignore_qemu_crash, expected_ignore_crash, expected_ignore_unexpected_eof ): mocked_instance.testsuite.ignore_qemu_crash = ignore_qemu_crash handler = QEMUHandler(mocked_instance, 'build') assert handler.ignore_qemu_crash == expected_ignore_crash assert handler.ignore_unexpected_eof == expected_ignore_unexpected_eof def test_qemuhandler_get_cpu_time(): def mock_process(pid): return mock.Mock( cpu_times=mock.Mock( return_value=mock.Mock( user=20.0, system=64.0 ) ) ) with mock.patch('psutil.Process', mock_process): res = QEMUHandler._get_cpu_time(0) assert res == pytest.approx(84.0) TESTDATA_19 = [ ( True, os.path.join('self', 'dummy_dir', '1'), mock.PropertyMock(return_value=os.path.join('dummy_dir', '1')), os.path.join('dummy_dir', '1') ), ( False, os.path.join('self', 'dummy_dir', '2'), mock.PropertyMock(return_value=os.path.join('dummy_dir', '2')), os.path.join('self', 'dummy_dir', '2') ), ] @pytest.mark.parametrize( 'self_sysbuild, self_build_dir, build_dir, expected', TESTDATA_19, ids=['domains build dir', 'self build dir'] ) def test_qemuhandler_get_sysbuild_build_dir( mocked_instance, self_sysbuild, self_build_dir, build_dir, expected ): get_default_domain_mock = mock.Mock() type(get_default_domain_mock()).build_dir = build_dir domains_mock = mock.Mock(get_default_domain=get_default_domain_mock) from_file_mock = mock.Mock(return_value=domains_mock) handler = QEMUHandler(mocked_instance, 'build') handler.instance.testsuite.sysbuild = self_sysbuild handler.build_dir = self_build_dir with mock.patch('domains.Domains.from_file', from_file_mock): result = handler._get_sysbuild_build_dir() assert result == expected TESTDATA_20 = [ ( os.path.join('self', 'dummy_dir', 'log1'), os.path.join('self', 'dummy_dir', 'pid1'), os.path.join('sysbuild', 'dummy_dir', 'bd1'), True ), ( os.path.join('self', 'dummy_dir', 'log2'), os.path.join('self', 'dummy_dir', 'pid2'), os.path.join('sysbuild', 'dummy_dir', 'bd2'), False ), ] @pytest.mark.parametrize( 'self_log, self_pid_fn, sysbuild_build_dir, exists_pid_fn', TESTDATA_20, ids=['pid exists', 'pid missing'] ) def test_qemuhandler_set_qemu_filenames( mocked_instance, self_log, self_pid_fn, sysbuild_build_dir, exists_pid_fn ): unlink_mock = mock.Mock() exists_mock = mock.Mock(return_value=exists_pid_fn) handler = QEMUHandler(mocked_instance, 'build') handler.log = self_log handler.pid_fn = self_pid_fn with mock.patch('os.unlink', unlink_mock), \ mock.patch('os.path.exists', exists_mock): handler._set_qemu_filenames(sysbuild_build_dir) assert handler.fifo_fn == mocked_instance.build_dir + \ os.path.sep + 'qemu-fifo' assert handler.pid_fn == sysbuild_build_dir + os.path.sep + 'qemu.pid' assert handler.log_fn == self_log if exists_pid_fn: unlink_mock.assert_called_once_with(sysbuild_build_dir + \ os.path.sep + 'qemu.pid') def test_qemuhandler_create_command(mocked_instance): sysbuild_build_dir = os.path.join('sysbuild', 'dummy_dir') handler = QEMUHandler(mocked_instance, 'build') handler.generator_cmd = 'dummy_cmd' result = handler._create_command(sysbuild_build_dir) assert result == ['dummy_cmd', '-C', 'sysbuild' + os.path.sep + 'dummy_dir', 'run'] TESTDATA_21 = [ ( 0, False, None, 'good dummy state', False, None, None, False ), ( 1, True, None, 'good dummy state', False, None, None, False ), ( 0, False, None, None, True, 'failed', 'Timeout', True ), ( 1, False, None, None, False, 'failed', 'Exited with 1', True ), ( 1, False, 'preexisting reason', 'good dummy state', False, 'failed', 'preexisting reason', True ), ] @pytest.mark.parametrize( 'self_returncode, self_ignore_qemu_crash,' \ ' self_instance_reason, harness_state, is_timeout,' \ ' expected_status, expected_reason, expected_called_missing_case', TESTDATA_21, ids=['not failed', 'qemu ignore', 'timeout', 'bad returncode', 'other fail'] ) def test_qemuhandler_update_instance_info( mocked_instance, self_returncode, self_ignore_qemu_crash, self_instance_reason, harness_state, is_timeout, expected_status, expected_reason, expected_called_missing_case ): mocked_instance.add_missing_case_status = mock.Mock() mocked_instance.reason = self_instance_reason handler = QEMUHandler(mocked_instance, 'build') handler.returncode = self_returncode handler.ignore_qemu_crash = self_ignore_qemu_crash handler._update_instance_info(harness_state, is_timeout) assert handler.instance.status == expected_status assert handler.instance.reason == expected_reason if expected_called_missing_case: mocked_instance.add_missing_case_status.assert_called_once_with( 'blocked' ) def test_qemuhandler_thread_get_fifo_names(): fifo_fn = 'dummy' fifo_in, fifo_out = QEMUHandler._thread_get_fifo_names(fifo_fn) assert fifo_in == 'dummy.in' assert fifo_out == 'dummy.out' TESTDATA_22 = [ (False, False), (False, True), (True, False), (True, True), ] @pytest.mark.parametrize( 'fifo_in_exists, fifo_out_exists', TESTDATA_22, ids=['both missing', 'out exists', 'in exists', 'both exist'] ) def test_qemuhandler_thread_open_files(fifo_in_exists, fifo_out_exists): def mock_exists(path): if path == 'fifo.in': return fifo_in_exists elif path == 'fifo.out': return fifo_out_exists else: raise ValueError('Unexpected path in mock of os.path.exists') unlink_mock = mock.Mock() exists_mock = mock.Mock(side_effect=mock_exists) mkfifo_mock = mock.Mock() fifo_in = 'fifo.in' fifo_out = 'fifo.out' logfile = 'log.file' with mock.patch('os.unlink', unlink_mock), \ mock.patch('os.mkfifo', mkfifo_mock), \ mock.patch('os.path.exists', exists_mock), \ mock.patch('builtins.open', mock.mock_open()) as open_mock: _, _, _ = QEMUHandler._thread_open_files(fifo_in, fifo_out, logfile) open_mock.assert_has_calls([ mock.call('fifo.in', 'wb'), mock.call('fifo.out', 'rb', buffering=0), mock.call('log.file', 'wt'), ]) if fifo_in_exists: unlink_mock.assert_any_call('fifo.in') if fifo_out_exists: unlink_mock.assert_any_call('fifo.out') TESTDATA_23 = [ (False, False), (True, True), (True, False) ] @pytest.mark.parametrize( 'is_pid, is_lookup_error', TESTDATA_23, ids=['pid missing', 'pid lookup error', 'pid ok'] ) def test_qemuhandler_thread_close_files(is_pid, is_lookup_error): is_process_killed = {} def mock_kill(pid, sig): if is_lookup_error: raise ProcessLookupError(f'Couldn\'t find pid: {pid}.') elif sig == signal.SIGTERM: is_process_killed[pid] = True unlink_mock = mock.Mock() kill_mock = mock.Mock(side_effect=mock_kill) fifo_in = 'fifo.in' fifo_out = 'fifo.out' pid = 12345 if is_pid else None out_fp = mock.Mock() in_fp = mock.Mock() log_out_fp = mock.Mock() with mock.patch('os.unlink', unlink_mock), \ mock.patch('os.kill', kill_mock): QEMUHandler._thread_close_files(fifo_in, fifo_out, pid, out_fp, in_fp, log_out_fp) out_fp.close.assert_called_once() in_fp.close.assert_called_once() log_out_fp.close.assert_called_once() unlink_mock.assert_has_calls([mock.call('fifo.in'), mock.call('fifo.out')]) if is_pid and not is_lookup_error: assert is_process_killed[pid] TESTDATA_24 = [ ('timeout', 'failed', 'Timeout'), ('failed', 'failed', 'Failed'), ('unexpected eof', 'failed', 'unexpected eof'), ('unexpected byte', 'failed', 'unexpected byte'), (None, None, 'Unknown'), ] @pytest.mark.parametrize( 'out_state, expected_status, expected_reason', TESTDATA_24, ids=['timeout', 'failed', 'unexpected eof', 'unexpected byte', 'unknown'] ) def test_qemuhandler_thread_update_instance_info( mocked_instance, out_state, expected_status, expected_reason ): handler = QEMUHandler(mocked_instance, 'build') handler_time = 59 QEMUHandler._thread_update_instance_info(handler, handler_time, out_state) assert handler.instance.execution_time == handler_time assert handler.instance.status == expected_status assert handler.instance.reason == expected_reason TESTDATA_25 = [ ( ('1\n' * 60).encode('utf-8'), 60, 1, [None] * 60 + ['success'] * 6, 1000, False, 'timeout', [mock.call('1\n'), mock.call('1\n')] ), ( ('1\n' * 60).encode('utf-8'), 60, -1, [None] * 60 + ['success'] * 30, 100, False, 'failed', [mock.call('1\n'), mock.call('1\n')] ), ( b'', 60, 1, ['success'] * 3, 100, False, 'unexpected eof', [] ), ( b'\x81', 60, 1, ['success'] * 3, 100, False, 'unexpected byte', [] ), ( '1\n2\n3\n4\n5\n'.encode('utf-8'), 600, 1, [None] * 3 + ['success'] * 7, 100, False, 'success', [mock.call('1\n'), mock.call('2\n'), mock.call('3\n'), mock.call('4\n')] ), ( '1\n2\n3\n4\n5\n'.encode('utf-8'), 600, 0, [None] * 3 + ['success'] * 7, 100, False, 'timeout', [mock.call('1\n'), mock.call('2\n')] ), ( '1\n2\n3\n4\n5\n'.encode('utf-8'), 60, 1, [None] * 3 + ['success'] * 7, (n for n in [100, 100, 10000]), True, 'success', [mock.call('1\n'), mock.call('2\n'), mock.call('3\n'), mock.call('4\n')] ), ] @pytest.mark.parametrize( 'content, timeout, pid, harness_states, cputime, capture_coverage,' \ ' expected_out_state, expected_log_calls', TESTDATA_25, ids=[ 'timeout', 'harness failed', 'unexpected eof', 'unexpected byte', 'harness success', 'timeout by pid=0', 'capture_coverage' ] ) def test_qemuhandler_thread( mocked_instance, faux_timer, content, timeout, pid, harness_states, cputime, capture_coverage, expected_out_state, expected_log_calls ): def mock_cputime(pid): if pid > 0: return cputime if isinstance(cputime, int) else next(cputime) else: raise ProcessLookupError() type(mocked_instance.testsuite).timeout = mock.PropertyMock(return_value=timeout) handler = QEMUHandler(mocked_instance, 'build') handler.results = {} handler.ignore_unexpected_eof = False handler.pid_fn = 'pid_fn' handler.fifo_fn = 'fifo_fn' handler.options = mock.Mock(timeout_multiplier=1) def mocked_open(filename, *args, **kwargs): if filename == handler.pid_fn: contents = str(pid).encode('utf-8') elif filename == handler.fifo_fn + '.out': contents = content else: contents = b'' file_object = mock.mock_open(read_data=contents).return_value file_object.__iter__.return_value = contents.splitlines(True) return file_object harness = mock.Mock(capture_coverage=capture_coverage, handle=print) type(harness).state = mock.PropertyMock(side_effect=harness_states) p = mock.Mock() p.poll = mock.Mock( side_effect=itertools.cycle([True, True, True, True, False]) ) mock_thread_get_fifo_names = mock.Mock( return_value=('fifo_fn.in', 'fifo_fn.out') ) log_fp_mock = mock.Mock() in_fp_mock = mocked_open('fifo_fn.out') out_fp_mock = mock.Mock() mock_thread_open_files = mock.Mock( return_value=(out_fp_mock, in_fp_mock, log_fp_mock) ) mock_thread_close_files = mock.Mock() mock_thread_update_instance_info = mock.Mock() with mock.patch('time.time', side_effect=faux_timer.time), \ mock.patch('builtins.open', new=mocked_open), \ mock.patch('select.poll', return_value=p), \ mock.patch('os.path.exists', return_value=True), \ mock.patch('twisterlib.handlers.QEMUHandler._get_cpu_time', mock_cputime), \ mock.patch('twisterlib.handlers.QEMUHandler._thread_get_fifo_names', mock_thread_get_fifo_names), \ mock.patch('twisterlib.handlers.QEMUHandler._thread_open_files', mock_thread_open_files), \ mock.patch('twisterlib.handlers.QEMUHandler._thread_close_files', mock_thread_close_files), \ mock.patch('twisterlib.handlers.QEMUHandler.' \ '_thread_update_instance_info', mock_thread_update_instance_info): QEMUHandler._thread( handler, handler.get_test_timeout(), handler.build_dir, handler.log, handler.fifo_fn, handler.pid_fn, handler.results, harness, handler.ignore_unexpected_eof ) mock_thread_update_instance_info.assert_called_once_with( handler, mock.ANY, expected_out_state ) log_fp_mock.write.assert_has_calls(expected_log_calls) TESTDATA_26 = [ (True, False, None, True, ['No timeout, return code from QEMU (1): 1', 'return code from QEMU (1): 1']), (False, True, 'passed', True, ['return code from QEMU (1): 0']), (False, True, 'failed', False, ['return code from QEMU (None): 1']), ] @pytest.mark.parametrize( 'isatty, do_timeout, harness_state, exists_pid_fn, expected_logs', TESTDATA_26, ids=['no timeout, isatty', 'timeout passed', 'timeout, no pid_fn'] ) def test_qemuhandler_handle( mocked_instance, caplog, tmp_path, isatty, do_timeout, harness_state, exists_pid_fn, expected_logs ): def mock_wait(*args, **kwargs): if do_timeout: raise TimeoutExpired('dummy cmd', 'dummyamount') mock_process = mock.Mock(pid=0, returncode=1) mock_process.communicate = mock.Mock( return_value=(mock.Mock(), mock.Mock()) ) mock_process.wait = mock.Mock(side_effect=mock_wait) handler = QEMUHandler(mocked_instance, 'build') def mock_path_exists(name, *args, **kwargs): return exists_pid_fn def mock_popen(command, stdout=None, stdin=None, stderr=None, cwd=None): return mock.Mock( __enter__=mock.Mock(return_value=mock_process), __exit__=mock.Mock(return_value=None), communicate=mock.Mock(return_value=(mock.Mock(), mock.Mock())) ) def mock_thread(name=None, target=None, daemon=None, args=None): return mock.Mock() def mock_filenames(sysbuild_build_dir): handler.fifo_fn = os.path.join('dummy', 'qemu-fifo') handler.pid_fn = os.path.join(sysbuild_build_dir, 'qemu.pid') handler.log_fn = os.path.join('dummy', 'log') harness = mock.Mock(state=harness_state) handler_options_west_flash = [] sysbuild_build_dir = os.path.join('sysbuild', 'dummydir') command = ['generator_cmd', '-C', os.path.join('cmd', 'path'), 'run'] handler.options = mock.Mock( timeout_multiplier=1, west_flash=handler_options_west_flash, west_runner=None ) handler.run_custom_script = mock.Mock(return_value=None) handler.make_device_available = mock.Mock(return_value=None) handler._final_handle_actions = mock.Mock(return_value=None) handler._create_command = mock.Mock(return_value=command) handler._set_qemu_filenames = mock.Mock(side_effect=mock_filenames) handler._get_sysbuild_build_dir = mock.Mock(return_value=sysbuild_build_dir) handler.terminate = mock.Mock() unlink_mock = mock.Mock() with mock.patch('subprocess.Popen', side_effect=mock_popen), \ mock.patch('builtins.open', mock.mock_open(read_data='1')), \ mock.patch('threading.Thread', side_effect=mock_thread), \ mock.patch('os.path.exists', side_effect=mock_path_exists), \ mock.patch('os.unlink', unlink_mock), \ mock.patch('sys.stdout.isatty', return_value=isatty): handler.handle(harness) assert all([expected_log in caplog.text for expected_log in expected_logs]) def test_qemuhandler_get_fifo(mocked_instance): handler = QEMUHandler(mocked_instance, 'build') handler.fifo_fn = 'fifo_fn' result = handler.get_fifo() assert result == 'fifo_fn'