1#!/usr/bin/env python3 2# Copyright (c) 2023 Intel Corporation 3# 4# SPDX-License-Identifier: Apache-2.0 5""" 6Tests for handlers.py classes' methods 7""" 8 9import itertools 10import mock 11import os 12import pytest 13import signal 14import subprocess 15import sys 16 17from contextlib import nullcontext 18from importlib import reload 19from serial import SerialException 20from subprocess import CalledProcessError, TimeoutExpired 21from types import SimpleNamespace 22 23import twisterlib.harness 24 25ZEPHYR_BASE = os.getenv("ZEPHYR_BASE") 26 27from twisterlib.error import TwisterException 28from twisterlib.handlers import ( 29 Handler, 30 BinaryHandler, 31 DeviceHandler, 32 QEMUHandler, 33 SimulationHandler 34) 35 36 37@pytest.fixture 38def mocked_instance(tmp_path): 39 instance = mock.Mock() 40 41 testsuite = mock.Mock() 42 type(testsuite).source_dir = mock.PropertyMock(return_value='') 43 instance.testsuite = testsuite 44 45 build_dir = tmp_path / 'build_dir' 46 os.makedirs(build_dir) 47 type(instance).build_dir = mock.PropertyMock(return_value=str(build_dir)) 48 49 platform = mock.Mock() 50 type(platform).binaries = mock.PropertyMock(return_value=[]) 51 instance.platform = platform 52 53 type(instance.testsuite).timeout = mock.PropertyMock(return_value=60) 54 type(instance.platform).timeout_multiplier = mock.PropertyMock( 55 return_value=2 56 ) 57 58 instance.status = None 59 instance.reason = 'Unknown' 60 61 return instance 62 63 64@pytest.fixture 65def faux_timer(): 66 class Counter: 67 def __init__(self): 68 self.t = 0 69 70 def time(self): 71 self.t += 1 72 return self.t 73 74 return Counter() 75 76 77TESTDATA_1 = [ 78 (True, False, 'posix', ['Install pyserial python module with pip to use' \ 79 ' --device-testing option.'], None), 80 (False, True, 'nt', [], None), 81 (True, True, 'posix', ['Install pyserial python module with pip to use' \ 82 ' --device-testing option.'], ImportError), 83] 84 85@pytest.mark.parametrize( 86 'fail_serial, fail_pty, os_name, expected_outs, expected_error', 87 TESTDATA_1, 88 ids=['import serial', 'import pty nt', 'import serial+pty posix'] 89) 90def test_imports( 91 capfd, 92 fail_serial, 93 fail_pty, 94 os_name, 95 expected_outs, 96 expected_error 97): 98 class ImportRaiser: 99 def find_spec(self, fullname, path, target=None): 100 if fullname == 'serial' and fail_serial: 101 raise ImportError() 102 if fullname == 'pty' and fail_pty: 103 raise ImportError() 104 105 modules_mock = sys.modules.copy() 106 modules_mock['serial'] = None if fail_serial else modules_mock['serial'] 107 modules_mock['pty'] = None if fail_pty else modules_mock['pty'] 108 109 meta_path_mock = sys.meta_path[:] 110 meta_path_mock.insert(0, ImportRaiser()) 111 112 with mock.patch('os.name', os_name), \ 113 mock.patch.dict('sys.modules', modules_mock, clear=True), \ 114 mock.patch('sys.meta_path', meta_path_mock), \ 115 pytest.raises(expected_error) if expected_error else nullcontext(): 116 reload(twisterlib.handlers) 117 118 out, _ = capfd.readouterr() 119 assert all([expected_out in out for expected_out in expected_outs]) 120 121 122def test_handler_final_handle_actions(mocked_instance): 123 instance = mocked_instance 124 instance.testcases = [mock.Mock()] 125 126 handler = Handler(mocked_instance) 127 handler.suite_name_check = True 128 129 harness = twisterlib.harness.Test() 130 harness.state = mock.Mock() 131 harness.detected_suite_names = mock.Mock() 132 harness.matched_run_id = False 133 harness.run_id_exists = True 134 harness.recording = mock.Mock() 135 136 handler_time = mock.Mock() 137 138 handler._final_handle_actions(harness, handler_time) 139 140 assert handler.instance.status == 'failed' 141 assert handler.instance.execution_time == handler_time 142 assert handler.instance.reason == 'RunID mismatch' 143 assert all(testcase.status == 'failed' for \ 144 testcase in handler.instance.testcases) 145 146 handler.instance.reason = 'This reason shan\'t be changed.' 147 handler._final_handle_actions(harness, handler_time) 148 149 instance.assert_has_calls([mock.call.record(harness.recording)]) 150 151 assert handler.instance.reason == 'This reason shan\'t be changed.' 152 153 154TESTDATA_2 = [ 155 (['dummy_testsuite_name'], False), 156 ([], True), 157 (['another_dummy_name', 'yet_another_dummy_name'], True), 158] 159 160@pytest.mark.parametrize( 161 'detected_suite_names, should_be_called', 162 TESTDATA_2, 163 ids=['detected one expected', 'detected none', 'detected two unexpected'] 164) 165def test_handler_verify_ztest_suite_name( 166 mocked_instance, 167 detected_suite_names, 168 should_be_called 169): 170 instance = mocked_instance 171 type(instance.testsuite).ztest_suite_names = ['dummy_testsuite_name'] 172 173 harness_state = 'passed' 174 175 handler_time = mock.Mock() 176 177 with mock.patch.object(Handler, '_missing_suite_name') as _missing_mocked: 178 handler = Handler(instance) 179 handler._verify_ztest_suite_name( 180 harness_state, 181 detected_suite_names, 182 handler_time 183 ) 184 185 if should_be_called: 186 _missing_mocked.assert_called_once() 187 else: 188 _missing_mocked.assert_not_called() 189 190 191def test_handler_missing_suite_name(mocked_instance): 192 instance = mocked_instance 193 instance.testcases = [mock.Mock()] 194 195 handler = Handler(mocked_instance) 196 handler.suite_name_check = True 197 198 expected_suite_names = ['dummy_testsuite_name'] 199 200 handler_time = mock.Mock() 201 202 handler._missing_suite_name(expected_suite_names, handler_time) 203 204 assert handler.instance.status == 'failed' 205 assert handler.instance.execution_time == handler_time 206 assert handler.instance.reason == 'Testsuite mismatch' 207 assert all( 208 testcase.status == 'failed' for testcase in handler.instance.testcases 209 ) 210 211 212def test_handler_terminate(mocked_instance): 213 def mock_kill_function(pid, sig): 214 if pid < 0: 215 raise ProcessLookupError 216 217 instance = mocked_instance 218 219 handler = Handler(instance) 220 221 mock_process = mock.Mock() 222 mock_child1 = mock.Mock(pid=1) 223 mock_child2 = mock.Mock(pid=2) 224 mock_process.children = mock.Mock(return_value=[mock_child1, mock_child2]) 225 226 mock_proc = mock.Mock(pid=0) 227 mock_proc.terminate = mock.Mock(return_value=None) 228 mock_proc.kill = mock.Mock(return_value=None) 229 230 with mock.patch('psutil.Process', return_value=mock_process), \ 231 mock.patch( 232 'os.kill', 233 mock.Mock(side_effect=mock_kill_function) 234 ) as mock_kill: 235 handler.terminate(mock_proc) 236 237 assert handler.terminated 238 mock_proc.terminate.assert_called_once() 239 mock_proc.kill.assert_called_once() 240 mock_kill.assert_has_calls( 241 [mock.call(1, signal.SIGTERM), mock.call(2, signal.SIGTERM)] 242 ) 243 244 mock_child_neg1 = mock.Mock(pid=-1) 245 mock_process.children = mock.Mock( 246 return_value=[mock_child_neg1, mock_child2] 247 ) 248 handler.terminated = False 249 mock_kill.reset_mock() 250 251 handler.terminate(mock_proc) 252 253 mock_kill.assert_has_calls( 254 [mock.call(-1, signal.SIGTERM), mock.call(2, signal.SIGTERM)] 255 ) 256 257 258def test_binaryhandler_try_kill_process_by_pid(mocked_instance): 259 def mock_kill_function(pid, sig): 260 if pid < 0: 261 raise ProcessLookupError 262 263 instance = mocked_instance 264 265 handler = BinaryHandler(instance, 'build') 266 handler.pid_fn = os.path.join('dummy', 'path', 'to', 'pid.pid') 267 268 with mock.patch( 269 'os.kill', 270 mock.Mock(side_effect=mock_kill_function) 271 ) as mock_kill, \ 272 mock.patch('os.unlink', mock.Mock()) as mock_unlink: 273 with mock.patch('builtins.open', mock.mock_open(read_data='1')): 274 handler.try_kill_process_by_pid() 275 276 mock_unlink.assert_called_once_with( 277 os.path.join('dummy', 'path', 'to', 'pid.pid') 278 ) 279 mock_kill.assert_called_once_with(1, signal.SIGKILL) 280 281 mock_unlink.reset_mock() 282 mock_kill.reset_mock() 283 handler.pid_fn = os.path.join('dummy', 'path', 'to', 'pid.pid') 284 285 with mock.patch('builtins.open', mock.mock_open(read_data='-1')): 286 handler.try_kill_process_by_pid() 287 288 mock_unlink.assert_called_once_with( 289 os.path.join('dummy', 'path', 'to', 'pid.pid') 290 ) 291 mock_kill.assert_called_once_with(-1, signal.SIGKILL) 292 293 294TESTDATA_3 = [ 295 ( 296 [b'This\\r\\n', b'is\r', b'a short', b'file.'], 297 mock.Mock(state=False, capture_coverage=False), 298 [ 299 mock.call('This\\r\\n'), 300 mock.call('is\r'), 301 mock.call('a short'), 302 mock.call('file.') 303 ], 304 [ 305 mock.call('This'), 306 mock.call('is'), 307 mock.call('a short'), 308 mock.call('file.') 309 ], 310 None, 311 False 312 ), 313 ( 314 [b'Too much.'] * 120, # Should be more than the timeout 315 mock.Mock(state=False, capture_coverage=False), 316 None, 317 None, 318 True, 319 False 320 ), 321 ( 322 [b'Too much.'] * 120, # Should be more than the timeout 323 mock.Mock(state=True, capture_coverage=False), 324 None, 325 None, 326 True, 327 False 328 ), 329 ( 330 [b'Too much.'] * 120, # Should be more than the timeout 331 mock.Mock(state=True, capture_coverage=True), 332 None, 333 None, 334 False, 335 True 336 ), 337] 338 339@pytest.mark.parametrize( 340 'proc_stdout, harness, expected_handler_calls,' 341 ' expected_harness_calls, should_be_less, timeout_wait', 342 TESTDATA_3, 343 ids=[ 344 'no timeout', 345 'timeout', 346 'timeout with harness state', 347 'timeout with capture_coverage, wait timeout' 348 ] 349) 350def test_binaryhandler_output_handler( 351 mocked_instance, 352 faux_timer, 353 proc_stdout, 354 harness, 355 expected_handler_calls, 356 expected_harness_calls, 357 should_be_less, 358 timeout_wait 359): 360 class MockStdout(mock.Mock): 361 def __init__(self, text): 362 super().__init__(text) 363 self.text = text 364 self.line_index = 0 365 366 def readline(self): 367 if self.line_index == len(self.text): 368 self.line_index = 0 369 return b'' 370 else: 371 line = self.text[self.line_index] 372 self.line_index += 1 373 return line 374 375 class MockProc(mock.Mock): 376 def __init__(self, pid, stdout): 377 super().__init__(pid, stdout) 378 self.pid = mock.PropertyMock(return_value=pid) 379 self.stdout = MockStdout(stdout) 380 381 def wait(self, *args, **kwargs): 382 if timeout_wait: 383 raise TimeoutExpired('dummy cmd', 'dummyamount') 384 385 handler = BinaryHandler(mocked_instance, 'build') 386 handler.terminate = mock.Mock() 387 handler.options = mock.Mock(timeout_multiplier=1) 388 389 proc = MockProc(1, proc_stdout) 390 391 with mock.patch( 392 'builtins.open', 393 mock.mock_open(read_data='') 394 ) as mock_file, \ 395 mock.patch('time.time', side_effect=faux_timer.time): 396 handler._output_handler(proc, harness) 397 398 mock_file.assert_called_with(handler.log, 'wt') 399 400 if expected_handler_calls: 401 mock_file.return_value.write.assert_has_calls(expected_handler_calls) 402 if expected_harness_calls: 403 harness.handle.assert_has_calls(expected_harness_calls) 404 if should_be_less is not None: 405 if should_be_less: 406 assert mock_file.return_value.write.call_count < len(proc_stdout) 407 else: 408 assert mock_file.return_value.write.call_count == len(proc_stdout) 409 if timeout_wait: 410 handler.terminate.assert_called_once_with(proc) 411 412 413TESTDATA_4 = [ 414 (True, False, True, None, None, 415 ['valgrind', '--error-exitcode=2', '--leak-check=full', 416 f'--suppressions={ZEPHYR_BASE}/scripts/valgrind.supp', 417 '--log-file=build_dir/valgrind.log', '--track-origins=yes', 418 'generator']), 419 (False, True, False, 123, None, ['generator', 'run', '--seed=123']), 420 (False, False, False, None, ['ex1', 'ex2'], ['build_dir/zephyr/zephyr.exe', 'ex1', 'ex2']), 421] 422 423@pytest.mark.parametrize( 424 'robot_test, call_make_run, enable_valgrind, seed,' \ 425 ' extra_args, expected', 426 TESTDATA_4, 427 ids=['robot, valgrind', 'make run, seed', 'binary, extra'] 428) 429def test_binaryhandler_create_command( 430 mocked_instance, 431 robot_test, 432 call_make_run, 433 enable_valgrind, 434 seed, 435 extra_args, 436 expected 437): 438 handler = BinaryHandler(mocked_instance, 'build') 439 handler.generator_cmd = 'generator' 440 handler.binary = 'bin' 441 handler.call_make_run = call_make_run 442 handler.options = SimpleNamespace() 443 handler.options.enable_valgrind = enable_valgrind 444 handler.options.coverage_basedir = "coverage_basedir" 445 handler.seed = seed 446 handler.extra_test_args = extra_args 447 handler.build_dir = 'build_dir' 448 handler.instance.sysbuild = False 449 handler.platform = SimpleNamespace() 450 handler.platform.resc = "file.resc" 451 handler.platform.uart = "uart" 452 453 command = handler._create_command(robot_test) 454 455 assert command == expected 456 457 458TESTDATA_5 = [ 459 (False, False, False), 460 (True, False, False), 461 (True, True, False), 462 (False, False, True), 463] 464 465@pytest.mark.parametrize( 466 'enable_asan, enable_lsan, enable_ubsan', 467 TESTDATA_5, 468 ids=['none', 'asan', 'asan, lsan', 'ubsan'] 469) 470def test_binaryhandler_create_env( 471 mocked_instance, 472 enable_asan, 473 enable_lsan, 474 enable_ubsan 475): 476 handler = BinaryHandler(mocked_instance, 'build') 477 handler.options = mock.Mock( 478 enable_asan=enable_asan, 479 enable_lsan=enable_lsan, 480 enable_ubsan=enable_ubsan 481 ) 482 483 env = { 484 'example_env_var': True, 485 'ASAN_OPTIONS': 'dummy=dummy:', 486 'UBSAN_OPTIONS': 'dummy=dummy:' 487 } 488 489 with mock.patch('os.environ', env): 490 res = handler._create_env() 491 492 assert env['example_env_var'] == res['example_env_var'] 493 494 if enable_ubsan: 495 assert env['UBSAN_OPTIONS'] in res['UBSAN_OPTIONS'] 496 assert 'log_path=stdout:' in res['UBSAN_OPTIONS'] 497 assert 'halt_on_error=1:' in res['UBSAN_OPTIONS'] 498 499 if enable_asan: 500 assert env['ASAN_OPTIONS'] in res['ASAN_OPTIONS'] 501 assert 'log_path=stdout:' in res['ASAN_OPTIONS'] 502 503 if not enable_lsan: 504 assert 'detect_leaks=0' in res['ASAN_OPTIONS'] 505 506 507TESTDATA_6 = [ 508 (None, False, 2, True, 'failed', 'Valgrind error', False), 509 (None, False, 1, False, 'failed', 'Failed', False), 510 ('failed', False, 0, False, 'failed', 'Failed', False), 511 ('success', False, 0, False, 'success', 'Unknown', False), 512 (None, True, 1, True, 'failed', 'Timeout', True), 513] 514 515@pytest.mark.parametrize( 516 'harness_state, terminated, returncode, enable_valgrind,' \ 517 ' expected_status, expected_reason, do_add_missing', 518 TESTDATA_6, 519 ids=['valgrind error', 'failed', 'harness failed', 'success', 'no state'] 520) 521def test_binaryhandler_update_instance_info( 522 mocked_instance, 523 harness_state, 524 terminated, 525 returncode, 526 enable_valgrind, 527 expected_status, 528 expected_reason, 529 do_add_missing 530): 531 handler = BinaryHandler(mocked_instance, 'build') 532 handler_time = 59 533 handler.terminated = terminated 534 handler.returncode = returncode 535 handler.options = mock.Mock(enable_valgrind=enable_valgrind) 536 missing_mock = mock.Mock() 537 handler.instance.add_missing_case_status = missing_mock 538 539 handler._update_instance_info(harness_state, handler_time) 540 541 assert handler.instance.execution_time == handler_time 542 543 assert handler.instance.status == expected_status 544 assert handler.instance.reason == expected_reason 545 546 if do_add_missing: 547 missing_mock.assert_called_once_with('blocked', expected_reason) 548 549 550TESTDATA_7 = [ 551 (True, False, False), 552 (False, True, False), 553 (False, False, True), 554] 555 556@pytest.mark.parametrize( 557 'is_robot_test, coverage, isatty', 558 TESTDATA_7, 559 ids=['robot test', 'coverage', 'isatty'] 560) 561def test_binaryhandler_handle( 562 mocked_instance, 563 caplog, 564 is_robot_test, 565 coverage, 566 isatty 567): 568 thread_mock_obj = mock.Mock() 569 570 def mock_popen(command, *args, **kwargs,): 571 return mock.Mock( 572 __enter__=mock.Mock(return_value=mock.Mock(pid=0, returncode=0)), 573 __exit__=mock.Mock(return_value=None) 574 ) 575 576 def mock_thread(target, *args, **kwargs): 577 return thread_mock_obj 578 579 handler = BinaryHandler(mocked_instance, 'build') 580 handler.sourcedir = 'source_dir' 581 handler.build_dir = 'build_dir' 582 handler.name= 'Dummy Name' 583 handler._create_command = mock.Mock(return_value=['dummy' , 'command']) 584 handler._create_env = mock.Mock(return_value=[]) 585 handler._update_instance_info = mock.Mock() 586 handler._final_handle_actions = mock.Mock() 587 handler.terminate = mock.Mock() 588 handler.try_kill_process_by_pid = mock.Mock() 589 handler.options = mock.Mock(coverage=coverage) 590 591 robot_mock = mock.Mock() 592 harness = mock.Mock(is_robot_test=is_robot_test, run_robot_test=robot_mock) 593 594 popen_mock = mock.Mock(side_effect=mock_popen) 595 thread_mock = mock.Mock(side_effect=mock_thread) 596 call_mock = mock.Mock() 597 598 with mock.patch('subprocess.call', call_mock), \ 599 mock.patch('subprocess.Popen', popen_mock), \ 600 mock.patch('threading.Thread', thread_mock), \ 601 mock.patch('sys.stdout.isatty', return_value=isatty): 602 handler.handle(harness) 603 604 if is_robot_test: 605 robot_mock.assert_called_once_with(['dummy', 'command'], mock.ANY) 606 return 607 608 assert 'Spawning BinaryHandler Thread for Dummy Name' in caplog.text 609 610 thread_mock_obj.join.assert_called() 611 handler._update_instance_info.assert_called_once() 612 handler._final_handle_actions.assert_called_once() 613 614 if isatty: 615 call_mock.assert_any_call(['stty', 'sane'], stdin=mock.ANY) 616 617 618TESTDATA_8 = [ 619 ('renode', True, True, False, False), 620 ('native', False, False, False, True), 621 ('build', False, True, False, False), 622] 623 624@pytest.mark.parametrize( 625 'type_str, is_pid_fn, expected_call_make_run, is_binary, expected_ready', 626 TESTDATA_8, 627 ids=[t[0] for t in TESTDATA_8] 628) 629def test_simulationhandler_init( 630 mocked_instance, 631 type_str, 632 is_pid_fn, 633 expected_call_make_run, 634 is_binary, 635 expected_ready 636): 637 handler = SimulationHandler(mocked_instance, type_str) 638 639 assert handler.call_make_run == expected_call_make_run 640 assert handler.ready == expected_ready 641 642 if is_pid_fn: 643 assert handler.pid_fn == os.path.join(mocked_instance.build_dir, 644 'renode.pid') 645 if is_binary: 646 assert handler.pid_fn == os.path.join(mocked_instance.build_dir, 647 'zephyr', 'zephyr.exe') 648 649 650TESTDATA_9 = [ 651 (3, 2, 0, 0, 3, -1, True, False, False, 1), 652 (4, 1, 0, 0, -1, -1, False, True, False, 0), 653 (5, 0, 1, 2, -1, 4, False, False, True, 3) 654] 655 656@pytest.mark.parametrize( 657 'success_count, in_waiting_count, oserror_count, readline_error_count,' 658 ' haltless_count, stateless_count, end_by_halt, end_by_close,' 659 ' end_by_state, expected_line_count', 660 TESTDATA_9, 661 ids=[ 662 'halt event', 663 'serial closes', 664 'harness state with errors' 665 ] 666) 667def test_devicehandler_monitor_serial( 668 mocked_instance, 669 success_count, 670 in_waiting_count, 671 oserror_count, 672 readline_error_count, 673 haltless_count, 674 stateless_count, 675 end_by_halt, 676 end_by_close, 677 end_by_state, 678 expected_line_count 679): 680 is_open_iter = iter(lambda: True, False) 681 line_iter = [ 682 TypeError('dummy TypeError') if x % 2 else \ 683 SerialException('dummy SerialException') for x in range( 684 readline_error_count 685 ) 686 ] + [ 687 f'line no {idx}'.encode('utf-8') for idx in range(success_count) 688 ] 689 in_waiting_iter = [False] * in_waiting_count + [ 690 TypeError('dummy TypeError') 691 ] if end_by_close else ( 692 [OSError('dummy OSError')] * oserror_count + [False] * in_waiting_count 693 ) + [True] * (success_count + readline_error_count) 694 695 is_set_iter = [False] * haltless_count + [True] \ 696 if end_by_halt else iter(lambda: False, True) 697 698 state_iter = [False] * stateless_count + [True] \ 699 if end_by_state else iter(lambda: False, True) 700 701 halt_event = mock.Mock(is_set=mock.Mock(side_effect=is_set_iter)) 702 ser = mock.Mock( 703 isOpen=mock.Mock(side_effect=is_open_iter), 704 readline=mock.Mock(side_effect=line_iter) 705 ) 706 type(ser).in_waiting = mock.PropertyMock( 707 side_effect=in_waiting_iter, 708 return_value=False 709 ) 710 harness = mock.Mock(capture_coverage=False) 711 type(harness).state=mock.PropertyMock(side_effect=state_iter) 712 713 handler = DeviceHandler(mocked_instance, 'build') 714 handler.options = mock.Mock(enable_coverage=not end_by_state) 715 716 with mock.patch('builtins.open', mock.mock_open(read_data='')): 717 handler.monitor_serial(ser, halt_event, harness) 718 719 if not end_by_close: 720 ser.close.assert_called_once() 721 722 harness.handle.assert_has_calls( 723 [mock.call(f'line no {idx}') for idx in range(expected_line_count)] 724 ) 725 726 727TESTDATA_10 = [ 728 ( 729 'dummy_platform', 730 'dummy fixture', 731 [ 732 mock.Mock( 733 fixtures=[], 734 platform='dummy_platform', 735 available=1, 736 counter=0 737 ), 738 mock.Mock( 739 fixtures=['dummy fixture'], 740 platform='another_platform', 741 available=1, 742 counter=0 743 ), 744 mock.Mock( 745 fixtures=['dummy fixture'], 746 platform='dummy_platform', 747 serial_pty=None, 748 serial=None, 749 available=1, 750 counter=0 751 ), 752 mock.Mock( 753 fixtures=['dummy fixture'], 754 platform='dummy_platform', 755 serial_pty=mock.Mock(), 756 available=1, 757 counter=0 758 ) 759 ], 760 3 761 ), 762 ( 763 'dummy_platform', 764 'dummy fixture', 765 [], 766 TwisterException 767 ), 768 ( 769 'dummy_platform', 770 'dummy fixture', 771 [ 772 mock.Mock( 773 fixtures=['dummy fixture'], 774 platform='dummy_platform', 775 serial_pty=mock.Mock(), 776 available=0 777 ), 778 mock.Mock( 779 fixtures=['another fixture'], 780 platform='dummy_platform', 781 serial_pty=mock.Mock(), 782 available=0 783 ), 784 mock.Mock( 785 fixtures=['dummy fixture'], 786 platform='dummy_platform', 787 serial=mock.Mock(), 788 available=0 789 ), 790 mock.Mock( 791 fixtures=['another fixture'], 792 platform='dummy_platform', 793 serial=mock.Mock(), 794 available=0 795 ) 796 ], 797 None 798 ) 799] 800 801@pytest.mark.parametrize( 802 'platform_name, fixture, duts, expected', 803 TESTDATA_10, 804 ids=['one good dut', 'exception - no duts', 'no available duts'] 805) 806def test_devicehandler_device_is_available( 807 mocked_instance, 808 platform_name, 809 fixture, 810 duts, 811 expected 812): 813 mocked_instance.platform.name = platform_name 814 mocked_instance.testsuite.harness_config = {'fixture': fixture} 815 816 handler = DeviceHandler(mocked_instance, 'build') 817 handler.duts = duts 818 819 if isinstance(expected, int): 820 device = handler.device_is_available(mocked_instance) 821 822 assert device == duts[expected] 823 assert device.available == 0 824 assert device.counter == 1 825 elif expected is None: 826 device = handler.device_is_available(mocked_instance) 827 828 assert device is None 829 elif isinstance(expected, type): 830 with pytest.raises(expected): 831 device = handler.device_is_available(mocked_instance) 832 else: 833 assert False 834 835 836def test_devicehandler_make_device_available(mocked_instance): 837 serial = mock.Mock(name='dummy_serial') 838 duts = [ 839 mock.Mock(available=0, serial=serial, serial_pty=None), 840 mock.Mock(available=0, serial=None, serial_pty=serial), 841 mock.Mock( 842 available=0, 843 serial=mock.Mock('another_serial'), 844 serial_pty=None 845 ) 846 ] 847 848 handler = DeviceHandler(mocked_instance, 'build') 849 handler.duts = duts 850 851 handler.make_device_available(serial) 852 853 assert len([None for d in handler.duts if d.available == 1]) == 2 854 assert handler.duts[2].available == 0 855 856 857TESTDATA_11 = [ 858 (mock.Mock(pid=0, returncode=0), False), 859 (mock.Mock(pid=0, returncode=1), False), 860 (mock.Mock(pid=0, returncode=1), True) 861] 862 863@pytest.mark.parametrize( 864 'mock_process, raise_timeout', 865 TESTDATA_11, 866 ids=['proper script', 'error', 'timeout'] 867) 868def test_devicehandler_run_custom_script(caplog, mock_process, raise_timeout): 869 def raise_timeout_fn(timeout=-1): 870 if raise_timeout and timeout != -1: 871 raise subprocess.TimeoutExpired(None, timeout) 872 else: 873 return mock.Mock(), mock.Mock() 874 875 def assert_popen(command, *args, **kwargs): 876 return mock.Mock( 877 __enter__=mock.Mock(return_value=mock_process), 878 __exit__=mock.Mock(return_value=None) 879 ) 880 881 mock_process.communicate = mock.Mock(side_effect=raise_timeout_fn) 882 883 script = [os.path.join('test','script', 'path'), 'arg'] 884 timeout = 60 885 886 with mock.patch('subprocess.Popen', side_effect=assert_popen): 887 DeviceHandler.run_custom_script(script, timeout) 888 889 if raise_timeout: 890 assert all( 891 t in caplog.text.lower() for t in [str(script), 'timed out'] 892 ) 893 mock_process.assert_has_calls( 894 [ 895 mock.call.communicate(timeout=timeout), 896 mock.call.kill(), 897 mock.call.communicate() 898 ] 899 ) 900 elif mock_process.returncode == 0: 901 assert not any([r.levelname == 'ERROR' for r in caplog.records]) 902 else: 903 assert 'timed out' not in caplog.text.lower() 904 assert 'custom script failure' in caplog.text.lower() 905 906 907TESTDATA_12 = [ 908 (0, False), 909 (4, False), 910 (0, True) 911] 912 913@pytest.mark.parametrize( 914 'num_of_failures, raise_exception', 915 TESTDATA_12, 916 ids=['no failures', 'with failures', 'exception'] 917) 918def test_devicehandler_get_hardware( 919 mocked_instance, 920 caplog, 921 num_of_failures, 922 raise_exception 923): 924 expected_hardware = mock.Mock() 925 926 def mock_availability(handler, instance, no=num_of_failures): 927 if raise_exception: 928 raise TwisterException(f'dummy message') 929 if handler.no: 930 handler.no -= 1 931 return None 932 return expected_hardware 933 934 handler = DeviceHandler(mocked_instance, 'build') 935 handler.no = num_of_failures 936 937 with mock.patch.object( 938 DeviceHandler, 939 'device_is_available', 940 mock_availability 941 ): 942 hardware = handler.get_hardware() 943 944 if raise_exception: 945 assert 'dummy message' in caplog.text.lower() 946 assert mocked_instance.status == 'failed' 947 assert mocked_instance.reason == 'dummy message' 948 else: 949 assert hardware == expected_hardware 950 951 952TESTDATA_13 = [ 953 ( 954 None, 955 None, 956 None, 957 ['generator_cmd', '-C', '$build_dir', 'flash'] 958 ), 959 ( 960 [], 961 None, 962 None, 963 ['west', 'flash', '--skip-rebuild', '-d', '$build_dir'] 964 ), 965 ( 966 '--dummy', 967 None, 968 None, 969 ['west', 'flash', '--skip-rebuild', '-d', '$build_dir', 970 '--', '--dummy'] 971 ), 972 ( 973 '--dummy1,--dummy2', 974 None, 975 None, 976 ['west', 'flash', '--skip-rebuild', '-d', '$build_dir', 977 '--', '--dummy1', '--dummy2'] 978 ), 979 980 ( 981 None, 982 'runner', 983 'product', 984 ['west', 'flash', '--skip-rebuild', '-d', '$build_dir', 985 '--runner', 'runner', 'param1', 'param2'] 986 ), 987 988 ( 989 None, 990 'pyocd', 991 'product', 992 ['west', 'flash', '--skip-rebuild', '-d', '$build_dir', 993 '--runner', 'pyocd', 'param1', 'param2', '--', '--dev-id', 12345] 994 ), 995 ( 996 None, 997 'nrfjprog', 998 'product', 999 ['west', 'flash', '--skip-rebuild', '-d', '$build_dir', 1000 '--runner', 'nrfjprog', 'param1', 'param2', '--', '--dev-id', 12345] 1001 ), 1002 ( 1003 None, 1004 'openocd', 1005 'STM32 STLink', 1006 ['west', 'flash', '--skip-rebuild', '-d', '$build_dir', 1007 '--runner', 'openocd', 'param1', 'param2', 1008 '--', '--cmd-pre-init', 'hla_serial 12345'] 1009 ), 1010 ( 1011 None, 1012 'openocd', 1013 'STLINK-V3', 1014 ['west', 'flash', '--skip-rebuild', '-d', '$build_dir', 1015 '--runner', 'openocd', 'param1', 'param2', 1016 '--', '--cmd-pre-init', 'hla_serial 12345'] 1017 ), 1018 ( 1019 None, 1020 'openocd', 1021 'EDBG CMSIS-DAP', 1022 ['west', 'flash', '--skip-rebuild', '-d', '$build_dir', 1023 '--runner', 'openocd', 'param1', 'param2', 1024 '--', '--cmd-pre-init', 'cmsis_dap_serial 12345'] 1025 ), 1026 ( 1027 None, 1028 'jlink', 1029 'product', 1030 ['west', 'flash', '--skip-rebuild', '-d', '$build_dir', 1031 '--runner', 'jlink', '--tool-opt=-SelectEmuBySN 12345', # 2x space 1032 'param1', 'param2'] 1033 ), 1034 ( 1035 None, 1036 'stm32cubeprogrammer', 1037 'product', 1038 ['west', 'flash', '--skip-rebuild', '-d', '$build_dir', 1039 '--runner', 'stm32cubeprogrammer', '--tool-opt=sn=12345', 1040 'param1', 'param2'] 1041 ), 1042 1043] 1044 1045TESTDATA_13_2 = [(True), (False)] 1046 1047@pytest.mark.parametrize( 1048 'self_west_flash, runner,' \ 1049 ' hardware_product_name, expected', 1050 TESTDATA_13, 1051 ids=['generator', '--west-flash', 'one west flash value', 1052 'multiple west flash values', 'generic runner', 'pyocd', 1053 'nrfjprog', 'openocd, STM32 STLink', 'openocd, STLINK-v3', 1054 'openocd, EDBG CMSIS-DAP', 'jlink', 'stm32cubeprogrammer'] 1055) 1056@pytest.mark.parametrize('hardware_probe', TESTDATA_13_2, ids=['probe', 'id']) 1057def test_devicehandler_create_command( 1058 mocked_instance, 1059 self_west_flash, 1060 runner, 1061 hardware_probe, 1062 hardware_product_name, 1063 expected 1064): 1065 handler = DeviceHandler(mocked_instance, 'build') 1066 handler.options = mock.Mock(west_flash=self_west_flash) 1067 handler.generator_cmd = 'generator_cmd' 1068 1069 expected = [handler.build_dir if val == '$build_dir' else \ 1070 val for val in expected] 1071 1072 hardware = mock.Mock( 1073 product=hardware_product_name, 1074 probe_id=12345 if hardware_probe else None, 1075 id=12345 if not hardware_probe else None, 1076 runner_params=['param1', 'param2'] 1077 ) 1078 1079 command = handler._create_command(runner, hardware) 1080 1081 assert command == expected 1082 1083 1084TESTDATA_14 = [ 1085 ('success', False, 'success', 'Unknown', False), 1086 ('failed', False, 'failed', 'Failed', True), 1087 ('error', False, 'error', 'Unknown', True), 1088 (None, True, None, 'Unknown', False), 1089 (None, False, 'failed', 'Timeout', True), 1090] 1091 1092@pytest.mark.parametrize( 1093 'harness_state, flash_error,' \ 1094 ' expected_status, expected_reason, do_add_missing', 1095 TESTDATA_14, 1096 ids=['success', 'failed', 'error', 'flash error', 'no status'] 1097) 1098def test_devicehandler_update_instance_info( 1099 mocked_instance, 1100 harness_state, 1101 flash_error, 1102 expected_status, 1103 expected_reason, 1104 do_add_missing 1105 ): 1106 handler = DeviceHandler(mocked_instance, 'build') 1107 handler_time = 59 1108 missing_mock = mock.Mock() 1109 handler.instance.add_missing_case_status = missing_mock 1110 1111 handler._update_instance_info(harness_state, handler_time, flash_error) 1112 1113 assert handler.instance.execution_time == handler_time 1114 1115 assert handler.instance.status == expected_status 1116 assert handler.instance.reason == expected_reason 1117 1118 if do_add_missing: 1119 missing_mock.assert_called_with('blocked', expected_reason) 1120 1121 1122TESTDATA_15 = [ 1123 ('dummy device', 'dummy pty', None, None, True, False, False), 1124 ( 1125 'dummy device', 1126 'dummy pty', 1127 mock.Mock(communicate=mock.Mock(return_value=('', ''))), 1128 SerialException, 1129 False, 1130 True, 1131 'dummy pty' 1132 ), 1133 ( 1134 'dummy device', 1135 None, 1136 None, 1137 SerialException, 1138 False, 1139 False, 1140 'dummy device' 1141 ) 1142] 1143 1144@pytest.mark.parametrize( 1145 'serial_device, serial_pty, ser_pty_process, expected_exception,' \ 1146 ' expected_result, terminate_ser_pty_process, make_available', 1147 TESTDATA_15, 1148 ids=['valid', 'serial pty process', 'no serial pty'] 1149) 1150def test_devicehandler_create_serial_connection( 1151 mocked_instance, 1152 serial_device, 1153 serial_pty, 1154 ser_pty_process, 1155 expected_exception, 1156 expected_result, 1157 terminate_ser_pty_process, 1158 make_available 1159): 1160 def mock_serial(*args, **kwargs): 1161 if expected_exception: 1162 raise expected_exception('') 1163 return expected_result 1164 1165 handler = DeviceHandler(mocked_instance, 'build') 1166 handler.make_device_available = mock.Mock() 1167 missing_mock = mock.Mock() 1168 handler.instance.add_missing_case_status = missing_mock 1169 available_mock = mock.Mock() 1170 handler.make_device_available = available_mock 1171 handler.options = mock.Mock(timeout_multiplier=1) 1172 twisterlib.handlers.terminate_process = mock.Mock() 1173 1174 hardware_baud = 14400 1175 flash_timeout = 60 1176 serial_mock = mock.Mock(side_effect=mock_serial) 1177 1178 with mock.patch('serial.Serial', serial_mock), \ 1179 pytest.raises(expected_exception) if expected_exception else \ 1180 nullcontext(): 1181 result = handler._create_serial_connection(serial_device, hardware_baud, 1182 flash_timeout, serial_pty, 1183 ser_pty_process) 1184 1185 if expected_result: 1186 assert result is not None 1187 1188 if expected_exception: 1189 assert handler.instance.status == 'failed' 1190 assert handler.instance.reason == 'Serial Device Error' 1191 1192 missing_mock.assert_called_once_with('blocked', 'Serial Device Error') 1193 1194 if terminate_ser_pty_process: 1195 twisterlib.handlers.terminate_process.assert_called_once() 1196 ser_pty_process.communicate.assert_called_once() 1197 1198 if make_available: 1199 available_mock.assert_called_once_with(make_available) 1200 1201 1202TESTDATA_16 = [ 1203 ('dummy1 dummy2', None, 'slave name'), 1204 ('dummy1,dummy2', CalledProcessError, None), 1205 (None, None, 'dummy hardware serial'), 1206] 1207 1208@pytest.mark.parametrize( 1209 'serial_pty, popen_exception, expected_device', 1210 TESTDATA_16, 1211 ids=['pty', 'pty process error', 'no pty'] 1212) 1213def test_devicehandler_get_serial_device( 1214 mocked_instance, 1215 serial_pty, 1216 popen_exception, 1217 expected_device 1218): 1219 def mock_popen(command, *args, **kwargs): 1220 assert command == ['dummy1', 'dummy2'] 1221 if popen_exception: 1222 raise popen_exception(command, 'Dummy error') 1223 return mock.Mock() 1224 1225 handler = DeviceHandler(mocked_instance, 'build') 1226 hardware_serial = 'dummy hardware serial' 1227 1228 popen_mock = mock.Mock(side_effect=mock_popen) 1229 openpty_mock = mock.Mock(return_value=('master', 'slave')) 1230 ttyname_mock = mock.Mock(side_effect=lambda x: x + ' name') 1231 1232 with mock.patch('subprocess.Popen', popen_mock), \ 1233 mock.patch('pty.openpty', openpty_mock), \ 1234 mock.patch('os.ttyname', ttyname_mock): 1235 result = handler._get_serial_device(serial_pty, hardware_serial) 1236 1237 if popen_exception: 1238 assert result is None 1239 else: 1240 assert result[0] == expected_device 1241 1242TESTDATA_17 = [ 1243 (False, False, False, False, None, False, False, 1244 None, None, []), 1245 (True, True, False, False, None, False, False, 1246 None, None, []), 1247 (True, False, True, False, None, False, False, 1248 'error', 'Device issue (Flash error)', []), 1249 (True, False, False, True, None, False, False, 1250 'error', 'Device issue (Timeout)', ['Flash operation timed out.']), 1251 (True, False, False, False, 1, False, False, 1252 'error', 'Device issue (Flash error?)', []), 1253 (True, False, False, False, 0, True, False, 1254 None, None, ['Timed out while monitoring serial output on IPName']), 1255 (True, False, False, False, 0, False, True, 1256 None, None, ["Terminating serial-pty:'Serial PTY'", 1257 "Terminated serial-pty:'Serial PTY', stdout:'', stderr:''"]), 1258] 1259 1260@pytest.mark.parametrize( 1261 'has_hardware, raise_create_serial, raise_popen, raise_timeout,' \ 1262 ' returncode, do_timeout_thread, use_pty,' \ 1263 ' expected_status, expected_reason, expected_logs', 1264 TESTDATA_17, 1265 ids=['no hardware', 'create serial failure', 'popen called process error', 1266 'communicate timeout', 'nonzero returncode', 'valid pty', 'valid dev'] 1267) 1268def test_devicehandler_handle( 1269 mocked_instance, 1270 caplog, 1271 has_hardware, 1272 raise_create_serial, 1273 raise_popen, 1274 raise_timeout, 1275 returncode, 1276 do_timeout_thread, 1277 use_pty, 1278 expected_status, 1279 expected_reason, 1280 expected_logs 1281): 1282 def mock_get_serial(serial_pty, hardware_serial): 1283 if serial_pty: 1284 serial_pty_process = mock.Mock( 1285 name='dummy serial PTY process', 1286 communicate=mock.Mock( 1287 return_value=('', '') 1288 ) 1289 ) 1290 return 'dummy serial PTY device', serial_pty_process 1291 return 'dummy serial device', None 1292 1293 def mock_create_serial(*args, **kwargs): 1294 if raise_create_serial: 1295 raise SerialException('dummy cmd', 'dummy msg') 1296 return mock.Mock(name='dummy serial') 1297 1298 def mock_thread(*args, **kwargs): 1299 is_alive_mock = mock.Mock(return_value=bool(do_timeout_thread)) 1300 1301 return mock.Mock(is_alive=is_alive_mock) 1302 1303 def mock_terminate(proc, *args, **kwargs): 1304 proc.communicate = mock.Mock(return_value=(mock.Mock(), mock.Mock())) 1305 1306 def mock_communicate(*args, **kwargs): 1307 if raise_timeout: 1308 raise TimeoutExpired('dummy cmd', 'dummyamount') 1309 return mock.Mock(), mock.Mock() 1310 1311 def mock_popen(command, *args, **kwargs): 1312 if raise_popen: 1313 raise CalledProcessError('dummy proc', 'dummy msg') 1314 1315 mock_process = mock.Mock( 1316 pid=1, 1317 returncode=returncode, 1318 communicate=mock.Mock(side_effect=mock_communicate) 1319 ) 1320 1321 return mock.Mock( 1322 __enter__=mock.Mock(return_value=mock_process), 1323 __exit__=mock.Mock(return_value=None) 1324 ) 1325 1326 hardware = None if not has_hardware else mock.Mock( 1327 baud=14400, 1328 runner='dummy runner', 1329 serial_pty='Serial PTY' if use_pty else None, 1330 serial='dummy serial', 1331 pre_script='dummy pre script', 1332 post_script='dummy post script', 1333 post_flash_script='dummy post flash script', 1334 flash_timeout=60, 1335 flash_with_test=True 1336 ) 1337 1338 handler = DeviceHandler(mocked_instance, 'build') 1339 handler.get_hardware = mock.Mock(return_value=hardware) 1340 handler.options = mock.Mock( 1341 timeout_multiplier=1, 1342 west_flash=None, 1343 west_runner=None 1344 ) 1345 handler._get_serial_device = mock.Mock(side_effect=mock_get_serial) 1346 handler._create_command = mock.Mock(return_value=['dummy', 'command']) 1347 handler.run_custom_script = mock.Mock() 1348 handler._create_serial_connection = mock.Mock( 1349 side_effect=mock_create_serial 1350 ) 1351 handler.monitor_serial = mock.Mock() 1352 handler.terminate = mock.Mock(side_effect=mock_terminate) 1353 handler._update_instance_info = mock.Mock() 1354 handler._final_handle_actions = mock.Mock() 1355 handler.make_device_available = mock.Mock() 1356 twisterlib.handlers.terminate_process = mock.Mock() 1357 handler.instance.platform.name = 'IPName' 1358 1359 harness = mock.Mock() 1360 1361 with mock.patch('builtins.open', mock.mock_open(read_data='')), \ 1362 mock.patch('subprocess.Popen', side_effect=mock_popen), \ 1363 mock.patch('threading.Event', mock.Mock()), \ 1364 mock.patch('threading.Thread', side_effect=mock_thread): 1365 handler.handle(harness) 1366 1367 handler.get_hardware.assert_called_once() 1368 1369 messages = [record.msg for record in caplog.records] 1370 assert all([msg in messages for msg in expected_logs]) 1371 1372 if not has_hardware: 1373 return 1374 1375 handler.run_custom_script.assert_has_calls([ 1376 mock.call('dummy pre script', mock.ANY) 1377 ]) 1378 1379 if raise_create_serial: 1380 return 1381 1382 handler.run_custom_script.assert_has_calls([ 1383 mock.call('dummy pre script', mock.ANY), 1384 mock.call('dummy post flash script', mock.ANY), 1385 mock.call('dummy post script', mock.ANY) 1386 ]) 1387 1388 if expected_reason: 1389 assert handler.instance.reason == expected_reason 1390 if expected_status: 1391 assert handler.instance.status == expected_status 1392 1393 handler.make_device_available.assert_called_once_with( 1394 'Serial PTY' if use_pty else 'dummy serial device' 1395 ) 1396 1397 1398TESTDATA_18 = [ 1399 (True, True, True), 1400 (False, False, False), 1401] 1402 1403@pytest.mark.parametrize( 1404 'ignore_qemu_crash, expected_ignore_crash, expected_ignore_unexpected_eof', 1405 TESTDATA_18, 1406 ids=['ignore crash', 'qemu crash'] 1407) 1408def test_qemuhandler_init( 1409 mocked_instance, 1410 ignore_qemu_crash, 1411 expected_ignore_crash, 1412 expected_ignore_unexpected_eof 1413): 1414 mocked_instance.testsuite.ignore_qemu_crash = ignore_qemu_crash 1415 1416 handler = QEMUHandler(mocked_instance, 'build') 1417 1418 assert handler.ignore_qemu_crash == expected_ignore_crash 1419 assert handler.ignore_unexpected_eof == expected_ignore_unexpected_eof 1420 1421 1422def test_qemuhandler_get_cpu_time(): 1423 def mock_process(pid): 1424 return mock.Mock( 1425 cpu_times=mock.Mock( 1426 return_value=mock.Mock( 1427 user=20.0, 1428 system=64.0 1429 ) 1430 ) 1431 ) 1432 1433 with mock.patch('psutil.Process', mock_process): 1434 res = QEMUHandler._get_cpu_time(0) 1435 1436 assert res == pytest.approx(84.0) 1437 1438 1439TESTDATA_19 = [ 1440 ( 1441 True, 1442 os.path.join('self', 'dummy_dir', '1'), 1443 mock.PropertyMock(return_value=os.path.join('dummy_dir', '1')), 1444 os.path.join('dummy_dir', '1') 1445 ), 1446 ( 1447 False, 1448 os.path.join('self', 'dummy_dir', '2'), 1449 mock.PropertyMock(return_value=os.path.join('dummy_dir', '2')), 1450 os.path.join('self', 'dummy_dir', '2') 1451 ), 1452] 1453 1454@pytest.mark.parametrize( 1455 'self_sysbuild, self_build_dir, build_dir, expected', 1456 TESTDATA_19, 1457 ids=['domains build dir', 'self build dir'] 1458) 1459def test_qemuhandler_get_default_domain_build_dir( 1460 mocked_instance, 1461 self_sysbuild, 1462 self_build_dir, 1463 build_dir, 1464 expected 1465): 1466 get_default_domain_mock = mock.Mock() 1467 type(get_default_domain_mock()).build_dir = build_dir 1468 domains_mock = mock.Mock(get_default_domain=get_default_domain_mock) 1469 from_file_mock = mock.Mock(return_value=domains_mock) 1470 1471 handler = QEMUHandler(mocked_instance, 'build') 1472 handler.instance.sysbuild = self_sysbuild 1473 handler.build_dir = self_build_dir 1474 1475 with mock.patch('domains.Domains.from_file', from_file_mock): 1476 result = handler.get_default_domain_build_dir() 1477 1478 assert result == expected 1479 1480 1481TESTDATA_20 = [ 1482 ( 1483 os.path.join('self', 'dummy_dir', 'log1'), 1484 os.path.join('self', 'dummy_dir', 'pid1'), 1485 os.path.join('sysbuild', 'dummy_dir', 'bd1'), 1486 True 1487 ), 1488 ( 1489 os.path.join('self', 'dummy_dir', 'log2'), 1490 os.path.join('self', 'dummy_dir', 'pid2'), 1491 os.path.join('sysbuild', 'dummy_dir', 'bd2'), 1492 False 1493 ), 1494] 1495 1496@pytest.mark.parametrize( 1497 'self_log, self_pid_fn, sysbuild_build_dir, exists_pid_fn', 1498 TESTDATA_20, 1499 ids=['pid exists', 'pid missing'] 1500) 1501def test_qemuhandler_set_qemu_filenames( 1502 mocked_instance, 1503 self_log, 1504 self_pid_fn, 1505 sysbuild_build_dir, 1506 exists_pid_fn 1507): 1508 unlink_mock = mock.Mock() 1509 exists_mock = mock.Mock(return_value=exists_pid_fn) 1510 1511 handler = QEMUHandler(mocked_instance, 'build') 1512 handler.log = self_log 1513 handler.pid_fn = self_pid_fn 1514 1515 with mock.patch('os.unlink', unlink_mock), \ 1516 mock.patch('os.path.exists', exists_mock): 1517 handler._set_qemu_filenames(sysbuild_build_dir) 1518 1519 assert handler.fifo_fn == mocked_instance.build_dir + \ 1520 os.path.sep + 'qemu-fifo' 1521 1522 assert handler.pid_fn == sysbuild_build_dir + os.path.sep + 'qemu.pid' 1523 1524 assert handler.log_fn == self_log 1525 1526 if exists_pid_fn: 1527 unlink_mock.assert_called_once_with(sysbuild_build_dir + \ 1528 os.path.sep + 'qemu.pid') 1529 1530 1531def test_qemuhandler_create_command(mocked_instance): 1532 sysbuild_build_dir = os.path.join('sysbuild', 'dummy_dir') 1533 1534 handler = QEMUHandler(mocked_instance, 'build') 1535 handler.generator_cmd = 'dummy_cmd' 1536 1537 result = handler._create_command(sysbuild_build_dir) 1538 1539 assert result == ['dummy_cmd', '-C', 'sysbuild' + os.path.sep + 'dummy_dir', 1540 'run'] 1541 1542 1543TESTDATA_21 = [ 1544 ( 1545 0, 1546 False, 1547 None, 1548 'good dummy state', 1549 False, 1550 None, 1551 None, 1552 False 1553 ), 1554 ( 1555 1, 1556 True, 1557 None, 1558 'good dummy state', 1559 False, 1560 None, 1561 None, 1562 False 1563 ), 1564 ( 1565 0, 1566 False, 1567 None, 1568 None, 1569 True, 1570 'failed', 1571 'Timeout', 1572 True 1573 ), 1574 ( 1575 1, 1576 False, 1577 None, 1578 None, 1579 False, 1580 'failed', 1581 'Exited with 1', 1582 True 1583 ), 1584 ( 1585 1, 1586 False, 1587 'preexisting reason', 1588 'good dummy state', 1589 False, 1590 'failed', 1591 'preexisting reason', 1592 True 1593 ), 1594] 1595 1596@pytest.mark.parametrize( 1597 'self_returncode, self_ignore_qemu_crash,' \ 1598 ' self_instance_reason, harness_state, is_timeout,' \ 1599 ' expected_status, expected_reason, expected_called_missing_case', 1600 TESTDATA_21, 1601 ids=['not failed', 'qemu ignore', 'timeout', 'bad returncode', 'other fail'] 1602) 1603def test_qemuhandler_update_instance_info( 1604 mocked_instance, 1605 self_returncode, 1606 self_ignore_qemu_crash, 1607 self_instance_reason, 1608 harness_state, 1609 is_timeout, 1610 expected_status, 1611 expected_reason, 1612 expected_called_missing_case 1613): 1614 mocked_instance.add_missing_case_status = mock.Mock() 1615 mocked_instance.reason = self_instance_reason 1616 1617 handler = QEMUHandler(mocked_instance, 'build') 1618 handler.returncode = self_returncode 1619 handler.ignore_qemu_crash = self_ignore_qemu_crash 1620 1621 handler._update_instance_info(harness_state, is_timeout) 1622 1623 assert handler.instance.status == expected_status 1624 assert handler.instance.reason == expected_reason 1625 1626 if expected_called_missing_case: 1627 mocked_instance.add_missing_case_status.assert_called_once_with( 1628 'blocked' 1629 ) 1630 1631 1632def test_qemuhandler_thread_get_fifo_names(): 1633 fifo_fn = 'dummy' 1634 1635 fifo_in, fifo_out = QEMUHandler._thread_get_fifo_names(fifo_fn) 1636 1637 assert fifo_in == 'dummy.in' 1638 assert fifo_out == 'dummy.out' 1639 1640TESTDATA_22 = [ 1641 (False, False), 1642 (False, True), 1643 (True, False), 1644 (True, True), 1645] 1646 1647@pytest.mark.parametrize( 1648 'fifo_in_exists, fifo_out_exists', 1649 TESTDATA_22, 1650 ids=['both missing', 'out exists', 'in exists', 'both exist'] 1651) 1652def test_qemuhandler_thread_open_files(fifo_in_exists, fifo_out_exists): 1653 def mock_exists(path): 1654 if path == 'fifo.in': 1655 return fifo_in_exists 1656 elif path == 'fifo.out': 1657 return fifo_out_exists 1658 else: 1659 raise ValueError('Unexpected path in mock of os.path.exists') 1660 1661 unlink_mock = mock.Mock() 1662 exists_mock = mock.Mock(side_effect=mock_exists) 1663 mkfifo_mock = mock.Mock() 1664 1665 fifo_in = 'fifo.in' 1666 fifo_out = 'fifo.out' 1667 logfile = 'log.file' 1668 1669 with mock.patch('os.unlink', unlink_mock), \ 1670 mock.patch('os.mkfifo', mkfifo_mock), \ 1671 mock.patch('os.path.exists', exists_mock), \ 1672 mock.patch('builtins.open', mock.mock_open()) as open_mock: 1673 _, _, _ = QEMUHandler._thread_open_files(fifo_in, fifo_out, logfile) 1674 1675 open_mock.assert_has_calls([ 1676 mock.call('fifo.in', 'wb'), 1677 mock.call('fifo.out', 'rb', buffering=0), 1678 mock.call('log.file', 'wt'), 1679 ]) 1680 1681 if fifo_in_exists: 1682 unlink_mock.assert_any_call('fifo.in') 1683 1684 if fifo_out_exists: 1685 unlink_mock.assert_any_call('fifo.out') 1686 1687 1688TESTDATA_23 = [ 1689 (False, False), 1690 (True, True), 1691 (True, False) 1692] 1693 1694@pytest.mark.parametrize( 1695 'is_pid, is_lookup_error', 1696 TESTDATA_23, 1697 ids=['pid missing', 'pid lookup error', 'pid ok'] 1698) 1699def test_qemuhandler_thread_close_files(is_pid, is_lookup_error): 1700 is_process_killed = {} 1701 1702 def mock_kill(pid, sig): 1703 if is_lookup_error: 1704 raise ProcessLookupError(f'Couldn\'t find pid: {pid}.') 1705 elif sig == signal.SIGTERM: 1706 is_process_killed[pid] = True 1707 1708 unlink_mock = mock.Mock() 1709 kill_mock = mock.Mock(side_effect=mock_kill) 1710 1711 fifo_in = 'fifo.in' 1712 fifo_out = 'fifo.out' 1713 pid = 12345 if is_pid else None 1714 out_fp = mock.Mock() 1715 in_fp = mock.Mock() 1716 log_out_fp = mock.Mock() 1717 1718 with mock.patch('os.unlink', unlink_mock), \ 1719 mock.patch('os.kill', kill_mock): 1720 QEMUHandler._thread_close_files(fifo_in, fifo_out, pid, out_fp, 1721 in_fp, log_out_fp) 1722 1723 out_fp.close.assert_called_once() 1724 in_fp.close.assert_called_once() 1725 log_out_fp.close.assert_called_once() 1726 1727 unlink_mock.assert_has_calls([mock.call('fifo.in'), mock.call('fifo.out')]) 1728 1729 if is_pid and not is_lookup_error: 1730 assert is_process_killed[pid] 1731 1732 1733TESTDATA_24 = [ 1734 ('failed', 'timeout', 'failed', 'timeout'), 1735 ('failed', 'Execution error', 'failed', 'Execution error'), 1736 ('failed', 'unexpected eof', 'failed', 'unexpected eof'), 1737 ('failed', 'unexpected byte', 'failed', 'unexpected byte'), 1738 (None, None, None, 'Unknown'), 1739] 1740 1741@pytest.mark.parametrize( 1742 '_status, _reason, expected_status, expected_reason', 1743 TESTDATA_24, 1744 ids=['timeout', 'failed', 'unexpected eof', 'unexpected byte', 'unknown'] 1745) 1746def test_qemuhandler_thread_update_instance_info( 1747 mocked_instance, 1748 _status, 1749 _reason, 1750 expected_status, 1751 expected_reason 1752): 1753 handler = QEMUHandler(mocked_instance, 'build') 1754 handler_time = 59 1755 1756 QEMUHandler._thread_update_instance_info(handler, handler_time, _status, _reason) 1757 1758 assert handler.instance.execution_time == handler_time 1759 1760 assert handler.instance.status == expected_status 1761 assert handler.instance.reason == expected_reason 1762 1763 1764TESTDATA_25 = [ 1765 ( 1766 ('1\n' * 60).encode('utf-8'), 1767 60, 1768 1, 1769 [None] * 60 + ['success'] * 6, 1770 1000, 1771 False, 1772 'failed', 1773 'timeout', 1774 [mock.call('1\n'), mock.call('1\n')] 1775 ), 1776 ( 1777 ('1\n' * 60).encode('utf-8'), 1778 60, 1779 -1, 1780 [None] * 60 + ['success'] * 30, 1781 100, 1782 False, 1783 'failed', 1784 None, 1785 [mock.call('1\n'), mock.call('1\n')] 1786 ), 1787 ( 1788 b'', 1789 60, 1790 1, 1791 ['success'] * 3, 1792 100, 1793 False, 1794 'failed', 1795 'unexpected eof', 1796 [] 1797 ), 1798 ( 1799 b'\x81', 1800 60, 1801 1, 1802 ['success'] * 3, 1803 100, 1804 False, 1805 'failed', 1806 'unexpected byte', 1807 [] 1808 ), 1809 ( 1810 '1\n2\n3\n4\n5\n'.encode('utf-8'), 1811 600, 1812 1, 1813 [None] * 3 + ['success'] * 7, 1814 100, 1815 False, 1816 'success', 1817 None, 1818 [mock.call('1\n'), mock.call('2\n'), mock.call('3\n'), mock.call('4\n')] 1819 ), 1820 ( 1821 '1\n2\n3\n4\n5\n'.encode('utf-8'), 1822 600, 1823 0, 1824 [None] * 3 + ['success'] * 7, 1825 100, 1826 False, 1827 'failed', 1828 'timeout', 1829 [mock.call('1\n'), mock.call('2\n')] 1830 ), 1831 ( 1832 '1\n2\n3\n4\n5\n'.encode('utf-8'), 1833 60, 1834 1, 1835 [None] * 3 + ['success'] * 7, 1836 (n for n in [100, 100, 10000]), 1837 True, 1838 'success', 1839 None, 1840 [mock.call('1\n'), mock.call('2\n'), mock.call('3\n'), mock.call('4\n')] 1841 ), 1842] 1843 1844@pytest.mark.parametrize( 1845 'content, timeout, pid, harness_states, cputime, capture_coverage,' \ 1846 ' expected_status, expected_reason, expected_log_calls', 1847 TESTDATA_25, 1848 ids=[ 1849 'timeout', 1850 'harness failed', 1851 'unexpected eof', 1852 'unexpected byte', 1853 'harness success', 1854 'timeout by pid=0', 1855 'capture_coverage' 1856 ] 1857) 1858def test_qemuhandler_thread( 1859 mocked_instance, 1860 faux_timer, 1861 content, 1862 timeout, 1863 pid, 1864 harness_states, 1865 cputime, 1866 capture_coverage, 1867 expected_status, 1868 expected_reason, 1869 expected_log_calls 1870): 1871 def mock_cputime(pid): 1872 if pid > 0: 1873 return cputime if isinstance(cputime, int) else next(cputime) 1874 else: 1875 raise ProcessLookupError() 1876 1877 type(mocked_instance.testsuite).timeout = mock.PropertyMock(return_value=timeout) 1878 handler = QEMUHandler(mocked_instance, 'build') 1879 handler.ignore_unexpected_eof = False 1880 handler.pid_fn = 'pid_fn' 1881 handler.fifo_fn = 'fifo_fn' 1882 handler.options = mock.Mock(timeout_multiplier=1) 1883 1884 def mocked_open(filename, *args, **kwargs): 1885 if filename == handler.pid_fn: 1886 contents = str(pid).encode('utf-8') 1887 elif filename == handler.fifo_fn + '.out': 1888 contents = content 1889 else: 1890 contents = b'' 1891 1892 file_object = mock.mock_open(read_data=contents).return_value 1893 file_object.__iter__.return_value = contents.splitlines(True) 1894 return file_object 1895 1896 harness = mock.Mock(capture_coverage=capture_coverage, handle=print) 1897 type(harness).state = mock.PropertyMock(side_effect=harness_states) 1898 1899 p = mock.Mock() 1900 p.poll = mock.Mock( 1901 side_effect=itertools.cycle([True, True, True, True, False]) 1902 ) 1903 1904 mock_thread_get_fifo_names = mock.Mock( 1905 return_value=('fifo_fn.in', 'fifo_fn.out') 1906 ) 1907 log_fp_mock = mock.Mock() 1908 in_fp_mock = mocked_open('fifo_fn.out') 1909 out_fp_mock = mock.Mock() 1910 mock_thread_open_files = mock.Mock( 1911 return_value=(out_fp_mock, in_fp_mock, log_fp_mock) 1912 ) 1913 mock_thread_close_files = mock.Mock() 1914 mock_thread_update_instance_info = mock.Mock() 1915 1916 with mock.patch('time.time', side_effect=faux_timer.time), \ 1917 mock.patch('builtins.open', new=mocked_open), \ 1918 mock.patch('select.poll', return_value=p), \ 1919 mock.patch('os.path.exists', return_value=True), \ 1920 mock.patch('twisterlib.handlers.QEMUHandler._get_cpu_time', 1921 mock_cputime), \ 1922 mock.patch('twisterlib.handlers.QEMUHandler._thread_get_fifo_names', 1923 mock_thread_get_fifo_names), \ 1924 mock.patch('twisterlib.handlers.QEMUHandler._thread_open_files', 1925 mock_thread_open_files), \ 1926 mock.patch('twisterlib.handlers.QEMUHandler._thread_close_files', 1927 mock_thread_close_files), \ 1928 mock.patch('twisterlib.handlers.QEMUHandler.' \ 1929 '_thread_update_instance_info', 1930 mock_thread_update_instance_info): 1931 QEMUHandler._thread( 1932 handler, 1933 handler.get_test_timeout(), 1934 handler.build_dir, 1935 handler.log, 1936 handler.fifo_fn, 1937 handler.pid_fn, 1938 harness, 1939 handler.ignore_unexpected_eof 1940 ) 1941 1942 mock_thread_update_instance_info.assert_called_once_with( 1943 handler, 1944 mock.ANY, 1945 expected_status, 1946 mock.ANY 1947 ) 1948 1949 log_fp_mock.write.assert_has_calls(expected_log_calls) 1950 1951 1952TESTDATA_26 = [ 1953 (True, False, None, True, 1954 ['No timeout, return code from QEMU (1): 1', 1955 'return code from QEMU (1): 1']), 1956 (False, True, 'passed', True, ['return code from QEMU (1): 0']), 1957 (False, True, 'failed', False, ['return code from QEMU (None): 1']), 1958] 1959 1960@pytest.mark.parametrize( 1961 'isatty, do_timeout, harness_state, exists_pid_fn, expected_logs', 1962 TESTDATA_26, 1963 ids=['no timeout, isatty', 'timeout passed', 'timeout, no pid_fn'] 1964) 1965def test_qemuhandler_handle( 1966 mocked_instance, 1967 caplog, 1968 tmp_path, 1969 isatty, 1970 do_timeout, 1971 harness_state, 1972 exists_pid_fn, 1973 expected_logs 1974): 1975 def mock_wait(*args, **kwargs): 1976 if do_timeout: 1977 raise TimeoutExpired('dummy cmd', 'dummyamount') 1978 1979 mock_process = mock.Mock(pid=0, returncode=1) 1980 mock_process.communicate = mock.Mock( 1981 return_value=(mock.Mock(), mock.Mock()) 1982 ) 1983 mock_process.wait = mock.Mock(side_effect=mock_wait) 1984 1985 handler = QEMUHandler(mocked_instance, 'build') 1986 1987 def mock_path_exists(name, *args, **kwargs): 1988 return exists_pid_fn 1989 1990 def mock_popen(command, stdout=None, stdin=None, stderr=None, cwd=None): 1991 return mock.Mock( 1992 __enter__=mock.Mock(return_value=mock_process), 1993 __exit__=mock.Mock(return_value=None), 1994 communicate=mock.Mock(return_value=(mock.Mock(), mock.Mock())) 1995 ) 1996 1997 def mock_thread(name=None, target=None, daemon=None, args=None): 1998 return mock.Mock() 1999 2000 def mock_filenames(sysbuild_build_dir): 2001 handler.fifo_fn = os.path.join('dummy', 'qemu-fifo') 2002 handler.pid_fn = os.path.join(sysbuild_build_dir, 'qemu.pid') 2003 handler.log_fn = os.path.join('dummy', 'log') 2004 2005 harness = mock.Mock(state=harness_state) 2006 handler_options_west_flash = [] 2007 2008 domain_build_dir = os.path.join('sysbuild', 'dummydir') 2009 command = ['generator_cmd', '-C', os.path.join('cmd', 'path'), 'run'] 2010 2011 handler.options = mock.Mock( 2012 timeout_multiplier=1, 2013 west_flash=handler_options_west_flash, 2014 west_runner=None 2015 ) 2016 handler.run_custom_script = mock.Mock(return_value=None) 2017 handler.make_device_available = mock.Mock(return_value=None) 2018 handler._final_handle_actions = mock.Mock(return_value=None) 2019 handler._create_command = mock.Mock(return_value=command) 2020 handler._set_qemu_filenames = mock.Mock(side_effect=mock_filenames) 2021 handler.get_default_domain_build_dir = mock.Mock(return_value=domain_build_dir) 2022 handler.terminate = mock.Mock() 2023 2024 unlink_mock = mock.Mock() 2025 2026 with mock.patch('subprocess.Popen', side_effect=mock_popen), \ 2027 mock.patch('builtins.open', mock.mock_open(read_data='1')), \ 2028 mock.patch('threading.Thread', side_effect=mock_thread), \ 2029 mock.patch('os.path.exists', side_effect=mock_path_exists), \ 2030 mock.patch('os.unlink', unlink_mock), \ 2031 mock.patch('sys.stdout.isatty', return_value=isatty): 2032 handler.handle(harness) 2033 2034 assert all([expected_log in caplog.text for expected_log in expected_logs]) 2035 2036 2037def test_qemuhandler_get_fifo(mocked_instance): 2038 handler = QEMUHandler(mocked_instance, 'build') 2039 handler.fifo_fn = 'fifo_fn' 2040 2041 result = handler.get_fifo() 2042 2043 assert result == 'fifo_fn' 2044