1#!/usr/bin/env python3 2# Copyright (c) 2023 Intel Corporation 3# 4# SPDX-License-Identifier: Apache-2.0 5""" 6Tests for handlers.py classes' methods 7""" 8 9import itertools 10import mock 11import os 12import pytest 13import signal 14import subprocess 15import sys 16 17from contextlib import nullcontext 18from importlib import reload 19from serial import SerialException 20from subprocess import CalledProcessError, TimeoutExpired 21 22import twisterlib.harness 23 24from conftest import ZEPHYR_BASE 25from twisterlib.error import TwisterException 26from twisterlib.handlers import ( 27 Handler, 28 BinaryHandler, 29 DeviceHandler, 30 QEMUHandler, 31 SimulationHandler 32) 33 34 35@pytest.fixture 36def mocked_instance(tmp_path): 37 instance = mock.Mock() 38 39 testsuite = mock.Mock() 40 type(testsuite).source_dir = mock.PropertyMock(return_value='') 41 instance.testsuite = testsuite 42 43 build_dir = tmp_path / 'build_dir' 44 os.makedirs(build_dir) 45 type(instance).build_dir = mock.PropertyMock(return_value=str(build_dir)) 46 47 platform = mock.Mock() 48 type(platform).binaries = mock.PropertyMock(return_value=[]) 49 instance.platform = platform 50 51 type(instance.testsuite).timeout = mock.PropertyMock(return_value=60) 52 type(instance.platform).timeout_multiplier = mock.PropertyMock( 53 return_value=2 54 ) 55 56 instance.status = None 57 instance.reason = 'Unknown' 58 59 return instance 60 61 62@pytest.fixture 63def faux_timer(): 64 class Counter: 65 def __init__(self): 66 self.t = 0 67 68 def time(self): 69 self.t += 1 70 return self.t 71 72 return Counter() 73 74 75TESTDATA_1 = [ 76 (True, False, 'posix', ['Install pyserial python module with pip to use' \ 77 ' --device-testing option.'], None), 78 (False, True, 'nt', [], None), 79 (True, True, 'posix', ['Install pyserial python module with pip to use' \ 80 ' --device-testing option.'], ImportError), 81] 82 83@pytest.mark.parametrize( 84 'fail_serial, fail_pty, os_name, expected_outs, expected_error', 85 TESTDATA_1, 86 ids=['import serial', 'import pty nt', 'import serial+pty posix'] 87) 88def test_imports( 89 capfd, 90 fail_serial, 91 fail_pty, 92 os_name, 93 expected_outs, 94 expected_error 95): 96 class ImportRaiser: 97 def find_spec(self, fullname, path, target=None): 98 if fullname == 'serial' and fail_serial: 99 raise ImportError() 100 if fullname == 'pty' and fail_pty: 101 raise ImportError() 102 103 modules_mock = sys.modules.copy() 104 modules_mock['serial'] = None if fail_serial else modules_mock['serial'] 105 modules_mock['pty'] = None if fail_pty else modules_mock['pty'] 106 107 meta_path_mock = sys.meta_path[:] 108 meta_path_mock.insert(0, ImportRaiser()) 109 110 with mock.patch('os.name', os_name), \ 111 mock.patch.dict('sys.modules', modules_mock, clear=True), \ 112 mock.patch('sys.meta_path', meta_path_mock), \ 113 pytest.raises(expected_error) if expected_error else nullcontext(): 114 reload(twisterlib.handlers) 115 116 out, _ = capfd.readouterr() 117 assert all([expected_out in out for expected_out in expected_outs]) 118 119 120def test_handler_final_handle_actions(mocked_instance): 121 instance = mocked_instance 122 instance.testcases = [mock.Mock()] 123 124 handler = Handler(mocked_instance) 125 handler.suite_name_check = True 126 127 harness = twisterlib.harness.Test() 128 harness.state = mock.Mock() 129 harness.detected_suite_names = mock.Mock() 130 harness.matched_run_id = False 131 harness.run_id_exists = True 132 133 handler_time = mock.Mock() 134 135 handler._final_handle_actions(harness, handler_time) 136 137 assert handler.instance.status == 'failed' 138 assert handler.instance.execution_time == handler_time 139 assert handler.instance.reason == 'RunID mismatch' 140 assert all(testcase.status == 'failed' for \ 141 testcase in handler.instance.testcases) 142 143 handler.instance.reason = 'This reason shan\'t be changed.' 144 handler._final_handle_actions(harness, handler_time) 145 146 assert handler.instance.reason == 'This reason shan\'t be changed.' 147 148 149TESTDATA_2 = [ 150 (['dummy_testsuite_name'], False), 151 ([], True), 152 (['another_dummy_name', 'yet_another_dummy_name'], True), 153] 154 155@pytest.mark.parametrize( 156 'detected_suite_names, should_be_called', 157 TESTDATA_2, 158 ids=['detected one expected', 'detected none', 'detected two unexpected'] 159) 160def test_handler_verify_ztest_suite_name( 161 mocked_instance, 162 detected_suite_names, 163 should_be_called 164): 165 instance = mocked_instance 166 type(instance.testsuite).ztest_suite_names = ['dummy_testsuite_name'] 167 168 harness_state = 'passed' 169 170 handler_time = mock.Mock() 171 172 with mock.patch.object(Handler, '_missing_suite_name') as _missing_mocked: 173 handler = Handler(instance) 174 handler._verify_ztest_suite_name( 175 harness_state, 176 detected_suite_names, 177 handler_time 178 ) 179 180 if should_be_called: 181 _missing_mocked.assert_called_once() 182 else: 183 _missing_mocked.assert_not_called() 184 185 186def test_handler_missing_suite_name(mocked_instance): 187 instance = mocked_instance 188 instance.testcases = [mock.Mock()] 189 190 handler = Handler(mocked_instance) 191 handler.suite_name_check = True 192 193 expected_suite_names = ['dummy_testsuite_name'] 194 195 handler_time = mock.Mock() 196 197 handler._missing_suite_name(expected_suite_names, handler_time) 198 199 assert handler.instance.status == 'failed' 200 assert handler.instance.execution_time == handler_time 201 assert handler.instance.reason == 'Testsuite mismatch' 202 assert all( 203 testcase.status == 'failed' for testcase in handler.instance.testcases 204 ) 205 206 207def test_handler_record(mocked_instance): 208 instance = mocked_instance 209 instance.testcases = [mock.Mock()] 210 211 handler = Handler(instance) 212 handler.suite_name_check = True 213 214 harness = twisterlib.harness.Test() 215 harness.recording = ['dummy recording'] 216 type(harness).fieldnames = mock.PropertyMock(return_value=[]) 217 218 mock_writerow = mock.Mock() 219 mock_writer = mock.Mock(writerow=mock_writerow) 220 221 with mock.patch( 222 'builtins.open', 223 mock.mock_open(read_data='') 224 ) as mock_file, \ 225 mock.patch( 226 'csv.writer', 227 mock.Mock(return_value=mock_writer) 228 ) as mock_writer_constructor: 229 handler.record(harness) 230 231 mock_file.assert_called_with( 232 os.path.join(instance.build_dir, 'recording.csv'), 233 'at' 234 ) 235 236 mock_writer_constructor.assert_called_with( 237 mock_file(), 238 harness.fieldnames, 239 lineterminator=os.linesep 240 ) 241 242 mock_writerow.assert_has_calls( 243 [mock.call(harness.fieldnames)] + \ 244 [mock.call(recording) for recording in harness.recording] 245 ) 246 247 248def test_handler_terminate(mocked_instance): 249 def mock_kill_function(pid, sig): 250 if pid < 0: 251 raise ProcessLookupError 252 253 instance = mocked_instance 254 255 handler = Handler(instance) 256 257 mock_process = mock.Mock() 258 mock_child1 = mock.Mock(pid=1) 259 mock_child2 = mock.Mock(pid=2) 260 mock_process.children = mock.Mock(return_value=[mock_child1, mock_child2]) 261 262 mock_proc = mock.Mock(pid=0) 263 mock_proc.terminate = mock.Mock(return_value=None) 264 mock_proc.kill = mock.Mock(return_value=None) 265 266 with mock.patch('psutil.Process', return_value=mock_process), \ 267 mock.patch( 268 'os.kill', 269 mock.Mock(side_effect=mock_kill_function) 270 ) as mock_kill: 271 handler.terminate(mock_proc) 272 273 assert handler.terminated 274 mock_proc.terminate.assert_called_once() 275 mock_proc.kill.assert_called_once() 276 mock_kill.assert_has_calls( 277 [mock.call(1, signal.SIGTERM), mock.call(2, signal.SIGTERM)] 278 ) 279 280 mock_child_neg1 = mock.Mock(pid=-1) 281 mock_process.children = mock.Mock( 282 return_value=[mock_child_neg1, mock_child2] 283 ) 284 handler.terminated = False 285 mock_kill.reset_mock() 286 287 handler.terminate(mock_proc) 288 289 mock_kill.assert_has_calls( 290 [mock.call(-1, signal.SIGTERM), mock.call(2, signal.SIGTERM)] 291 ) 292 293 294def test_binaryhandler_try_kill_process_by_pid(mocked_instance): 295 def mock_kill_function(pid, sig): 296 if pid < 0: 297 raise ProcessLookupError 298 299 instance = mocked_instance 300 301 handler = BinaryHandler(instance, 'build') 302 handler.pid_fn = os.path.join('dummy', 'path', 'to', 'pid.pid') 303 304 with mock.patch( 305 'os.kill', 306 mock.Mock(side_effect=mock_kill_function) 307 ) as mock_kill, \ 308 mock.patch('os.unlink', mock.Mock()) as mock_unlink: 309 with mock.patch('builtins.open', mock.mock_open(read_data='1')): 310 handler.try_kill_process_by_pid() 311 312 mock_unlink.assert_called_once_with( 313 os.path.join('dummy', 'path', 'to', 'pid.pid') 314 ) 315 mock_kill.assert_called_once_with(1, signal.SIGKILL) 316 317 mock_unlink.reset_mock() 318 mock_kill.reset_mock() 319 handler.pid_fn = os.path.join('dummy', 'path', 'to', 'pid.pid') 320 321 with mock.patch('builtins.open', mock.mock_open(read_data='-1')): 322 handler.try_kill_process_by_pid() 323 324 mock_unlink.assert_called_once_with( 325 os.path.join('dummy', 'path', 'to', 'pid.pid') 326 ) 327 mock_kill.assert_called_once_with(-1, signal.SIGKILL) 328 329 330TESTDATA_3 = [ 331 ( 332 [b'This\\r\\n', b'is\r', b'a short', b'file.'], 333 mock.Mock(state=False, capture_coverage=False), 334 [ 335 mock.call('This\\r\\n'), 336 mock.call('is\r'), 337 mock.call('a short'), 338 mock.call('file.') 339 ], 340 [ 341 mock.call('This'), 342 mock.call('is'), 343 mock.call('a short'), 344 mock.call('file.') 345 ], 346 None, 347 False 348 ), 349 ( 350 [b'Too much.'] * 120, # Should be more than the timeout 351 mock.Mock(state=False, capture_coverage=False), 352 None, 353 None, 354 True, 355 False 356 ), 357 ( 358 [b'Too much.'] * 120, # Should be more than the timeout 359 mock.Mock(state=True, capture_coverage=False), 360 None, 361 None, 362 True, 363 False 364 ), 365 ( 366 [b'Too much.'] * 120, # Should be more than the timeout 367 mock.Mock(state=True, capture_coverage=True), 368 None, 369 None, 370 False, 371 True 372 ), 373] 374 375@pytest.mark.parametrize( 376 'proc_stdout, harness, expected_handler_calls,' 377 ' expected_harness_calls, should_be_less, timeout_wait', 378 TESTDATA_3, 379 ids=[ 380 'no timeout', 381 'timeout', 382 'timeout with harness state', 383 'timeout with capture_coverage, wait timeout' 384 ] 385) 386def test_binaryhandler_output_handler( 387 mocked_instance, 388 faux_timer, 389 proc_stdout, 390 harness, 391 expected_handler_calls, 392 expected_harness_calls, 393 should_be_less, 394 timeout_wait 395): 396 class MockStdout(mock.Mock): 397 def __init__(self, text): 398 super().__init__(text) 399 self.text = text 400 self.line_index = 0 401 402 def readline(self): 403 if self.line_index == len(self.text): 404 self.line_index = 0 405 return b'' 406 else: 407 line = self.text[self.line_index] 408 self.line_index += 1 409 return line 410 411 class MockProc(mock.Mock): 412 def __init__(self, pid, stdout): 413 super().__init__(pid, stdout) 414 self.pid = mock.PropertyMock(return_value=pid) 415 self.stdout = MockStdout(stdout) 416 417 def wait(self, *args, **kwargs): 418 if timeout_wait: 419 raise TimeoutExpired('dummy cmd', 'dummyamount') 420 421 handler = BinaryHandler(mocked_instance, 'build') 422 handler.terminate = mock.Mock() 423 handler.options = mock.Mock(timeout_multiplier=1) 424 425 proc = MockProc(1, proc_stdout) 426 427 with mock.patch( 428 'builtins.open', 429 mock.mock_open(read_data='') 430 ) as mock_file, \ 431 mock.patch('time.time', side_effect=faux_timer.time): 432 handler._output_handler(proc, harness) 433 434 mock_file.assert_called_with(handler.log, 'wt') 435 436 if expected_handler_calls: 437 mock_file.return_value.write.assert_has_calls(expected_handler_calls) 438 if expected_harness_calls: 439 harness.handle.assert_has_calls(expected_harness_calls) 440 if should_be_less is not None: 441 if should_be_less: 442 assert mock_file.return_value.write.call_count < len(proc_stdout) 443 else: 444 assert mock_file.return_value.write.call_count == len(proc_stdout) 445 if timeout_wait: 446 handler.terminate.assert_called_once_with(proc) 447 448 449TESTDATA_4 = [ 450 (True, False, False, True, None, None, 451 ['valgrind', '--error-exitcode=2', '--leak-check=full', 452 f'--suppressions={ZEPHYR_BASE}/scripts/valgrind.supp', 453 '--log-file=build_dir/valgrind.log', '--track-origins=yes', 454 'generator', 'run_renode_test']), 455 (False, True, False, False, 123, None, ['generator', 'run', '--seed=123']), 456 (False, False, True, False, None, None, 457 ['west', 'flash', '--skip-rebuild', '-d', 'build_dir']), 458 (False, False, False, False, None, ['ex1', 'ex2'], ['bin', 'ex1', 'ex2']), 459] 460 461@pytest.mark.parametrize( 462 'robot_test, call_make_run, call_west_flash, enable_valgrind, seed,' \ 463 ' extra_args, expected', 464 TESTDATA_4, 465 ids=['robot, valgrind', 'make run, seed', 'west flash', 'binary, extra'] 466) 467def test_binaryhandler_create_command( 468 mocked_instance, 469 robot_test, 470 call_make_run, 471 call_west_flash, 472 enable_valgrind, 473 seed, 474 extra_args, 475 expected 476): 477 handler = BinaryHandler(mocked_instance, 'build') 478 handler.generator_cmd = 'generator' 479 handler.binary = 'bin' 480 handler.call_make_run = call_make_run 481 handler.call_west_flash = call_west_flash 482 handler.options = mock.Mock(enable_valgrind=enable_valgrind) 483 handler.seed = seed 484 handler.extra_test_args = extra_args 485 handler.build_dir = 'build_dir' 486 487 command = handler._create_command(robot_test) 488 489 assert command == expected 490 491 492TESTDATA_5 = [ 493 (False, False, False), 494 (True, False, False), 495 (True, True, False), 496 (False, False, True), 497] 498 499@pytest.mark.parametrize( 500 'enable_asan, enable_lsan, enable_ubsan', 501 TESTDATA_5, 502 ids=['none', 'asan', 'asan, lsan', 'ubsan'] 503) 504def test_binaryhandler_create_env( 505 mocked_instance, 506 enable_asan, 507 enable_lsan, 508 enable_ubsan 509): 510 handler = BinaryHandler(mocked_instance, 'build') 511 handler.options = mock.Mock( 512 enable_asan=enable_asan, 513 enable_lsan=enable_lsan, 514 enable_ubsan=enable_ubsan 515 ) 516 517 env = { 518 'example_env_var': True, 519 'ASAN_OPTIONS': 'dummy=dummy:', 520 'UBSAN_OPTIONS': 'dummy=dummy:' 521 } 522 523 with mock.patch('os.environ', env): 524 res = handler._create_env() 525 526 assert env['example_env_var'] == res['example_env_var'] 527 528 if enable_ubsan: 529 assert env['UBSAN_OPTIONS'] in res['UBSAN_OPTIONS'] 530 assert 'log_path=stdout:' in res['UBSAN_OPTIONS'] 531 assert 'halt_on_error=1:' in res['UBSAN_OPTIONS'] 532 533 if enable_asan: 534 assert env['ASAN_OPTIONS'] in res['ASAN_OPTIONS'] 535 assert 'log_path=stdout:' in res['ASAN_OPTIONS'] 536 537 if not enable_lsan: 538 assert 'detect_leaks=0' in res['ASAN_OPTIONS'] 539 540 541TESTDATA_6 = [ 542 (None, False, 2, True, 'failed', 'Valgrind error', False), 543 (None, False, 1, False, 'failed', 'Failed', False), 544 ('failed', False, 0, False, 'failed', 'Failed', False), 545 ('success', False, 0, False, 'success', 'Unknown', False), 546 (None, True, 1, True, 'failed', 'Timeout', True), 547] 548 549@pytest.mark.parametrize( 550 'harness_state, terminated, returncode, enable_valgrind,' \ 551 ' expected_status, expected_reason, do_add_missing', 552 TESTDATA_6, 553 ids=['valgrind error', 'failed', 'harness failed', 'success', 'no state'] 554) 555def test_binaryhandler_update_instance_info( 556 mocked_instance, 557 harness_state, 558 terminated, 559 returncode, 560 enable_valgrind, 561 expected_status, 562 expected_reason, 563 do_add_missing 564): 565 handler = BinaryHandler(mocked_instance, 'build') 566 handler_time = 59 567 handler.terminated = terminated 568 handler.returncode = returncode 569 handler.options = mock.Mock(enable_valgrind=enable_valgrind) 570 missing_mock = mock.Mock() 571 handler.instance.add_missing_case_status = missing_mock 572 573 handler._update_instance_info(harness_state, handler_time) 574 575 assert handler.instance.execution_time == handler_time 576 577 assert handler.instance.status == expected_status 578 assert handler.instance.reason == expected_reason 579 580 if do_add_missing: 581 missing_mock.assert_called_once_with('blocked', expected_reason) 582 583 584TESTDATA_7 = [ 585 (True, False, False), 586 (False, True, False), 587 (False, False, True), 588] 589 590@pytest.mark.parametrize( 591 'is_robot_test, coverage, isatty', 592 TESTDATA_7, 593 ids=['robot test', 'coverage', 'isatty'] 594) 595def test_binaryhandler_handle( 596 mocked_instance, 597 caplog, 598 is_robot_test, 599 coverage, 600 isatty 601): 602 thread_mock_obj = mock.Mock() 603 604 def mock_popen(command, *args, **kwargs,): 605 return mock.Mock( 606 __enter__=mock.Mock(return_value=mock.Mock(pid=0, returncode=0)), 607 __exit__=mock.Mock(return_value=None) 608 ) 609 610 def mock_thread(target, *args, **kwargs): 611 return thread_mock_obj 612 613 handler = BinaryHandler(mocked_instance, 'build') 614 handler.sourcedir = 'source_dir' 615 handler.build_dir = 'build_dir' 616 handler.name= 'Dummy Name' 617 handler._create_command = mock.Mock(return_value=['dummy' , 'command']) 618 handler._create_env = mock.Mock(return_value=[]) 619 handler._update_instance_info = mock.Mock() 620 handler._final_handle_actions = mock.Mock() 621 handler.terminate = mock.Mock() 622 handler.try_kill_process_by_pid = mock.Mock() 623 handler.options = mock.Mock(coverage=coverage) 624 625 robot_mock = mock.Mock() 626 harness = mock.Mock(is_robot_test=is_robot_test, run_robot_test=robot_mock) 627 628 popen_mock = mock.Mock(side_effect=mock_popen) 629 thread_mock = mock.Mock(side_effect=mock_thread) 630 call_mock = mock.Mock() 631 632 with mock.patch('subprocess.call', call_mock), \ 633 mock.patch('subprocess.Popen', popen_mock), \ 634 mock.patch('threading.Thread', thread_mock), \ 635 mock.patch('sys.stdout.isatty', return_value=isatty): 636 handler.handle(harness) 637 638 if is_robot_test: 639 robot_mock.assert_called_once_with(['dummy', 'command'], mock.ANY) 640 return 641 642 assert 'Spawning BinaryHandler Thread for Dummy Name' in caplog.text 643 644 thread_mock_obj.join.assert_called() 645 handler._update_instance_info.assert_called_once() 646 handler._final_handle_actions.assert_called_once() 647 648 if coverage: 649 call_mock.assert_any_call( 650 ['GCOV_PREFIX=build_dir', 'gcov', 'source_dir', 651 '-b', '-s', 'build_dir'], 652 shell=True 653 ) 654 655 if isatty: 656 call_mock.assert_any_call(['stty', 'sane'], stdin=mock.ANY) 657 658 659TESTDATA_8 = [ 660 ('renode', True, True, False, False), 661 ('native', False, False, False, True), 662 ('build', False, True, False, False), 663] 664 665@pytest.mark.parametrize( 666 'type_str, is_pid_fn, expected_call_make_run, is_binary, expected_ready', 667 TESTDATA_8, 668 ids=[t[0] for t in TESTDATA_8] 669) 670def test_simulationhandler_init( 671 mocked_instance, 672 type_str, 673 is_pid_fn, 674 expected_call_make_run, 675 is_binary, 676 expected_ready 677): 678 handler = SimulationHandler(mocked_instance, type_str) 679 680 assert handler.call_make_run == expected_call_make_run 681 assert handler.ready == expected_ready 682 683 if is_pid_fn: 684 assert handler.pid_fn == os.path.join(mocked_instance.build_dir, 685 'renode.pid') 686 if is_binary: 687 assert handler.pid_fn == os.path.join(mocked_instance.build_dir, 688 'zephyr', 'zephyr.exe') 689 690 691TESTDATA_9 = [ 692 (3, 2, 0, 0, 3, -1, True, False, False, 1), 693 (4, 1, 0, 0, -1, -1, False, True, False, 0), 694 (5, 0, 1, 2, -1, 4, False, False, True, 3) 695] 696 697@pytest.mark.parametrize( 698 'success_count, in_waiting_count, oserror_count, readline_error_count,' 699 ' haltless_count, stateless_count, end_by_halt, end_by_close,' 700 ' end_by_state, expected_line_count', 701 TESTDATA_9, 702 ids=[ 703 'halt event', 704 'serial closes', 705 'harness state with errors' 706 ] 707) 708def test_devicehandler_monitor_serial( 709 mocked_instance, 710 success_count, 711 in_waiting_count, 712 oserror_count, 713 readline_error_count, 714 haltless_count, 715 stateless_count, 716 end_by_halt, 717 end_by_close, 718 end_by_state, 719 expected_line_count 720): 721 is_open_iter = iter(lambda: True, False) 722 line_iter = [ 723 TypeError('dummy TypeError') if x % 2 else \ 724 SerialException('dummy SerialException') for x in range( 725 readline_error_count 726 ) 727 ] + [ 728 f'line no {idx}'.encode('utf-8') for idx in range(success_count) 729 ] 730 in_waiting_iter = [False] * in_waiting_count + [ 731 TypeError('dummy TypeError') 732 ] if end_by_close else ( 733 [OSError('dummy OSError')] * oserror_count + [False] * in_waiting_count 734 ) + [True] * (success_count + readline_error_count) 735 736 is_set_iter = [False] * haltless_count + [True] \ 737 if end_by_halt else iter(lambda: False, True) 738 739 state_iter = [False] * stateless_count + [True] \ 740 if end_by_state else iter(lambda: False, True) 741 742 halt_event = mock.Mock(is_set=mock.Mock(side_effect=is_set_iter)) 743 ser = mock.Mock( 744 isOpen=mock.Mock(side_effect=is_open_iter), 745 readline=mock.Mock(side_effect=line_iter) 746 ) 747 type(ser).in_waiting = mock.PropertyMock( 748 side_effect=in_waiting_iter, 749 return_value=False 750 ) 751 harness = mock.Mock(capture_coverage=False) 752 type(harness).state=mock.PropertyMock(side_effect=state_iter) 753 754 handler = DeviceHandler(mocked_instance, 'build') 755 handler.options = mock.Mock(coverage=not end_by_state) 756 757 with mock.patch('builtins.open', mock.mock_open(read_data='')): 758 handler.monitor_serial(ser, halt_event, harness) 759 760 if not end_by_close: 761 ser.close.assert_called_once() 762 763 harness.handle.assert_has_calls( 764 [mock.call(f'line no {idx}') for idx in range(expected_line_count)] 765 ) 766 767 768TESTDATA_10 = [ 769 ( 770 'dummy_platform', 771 'dummy fixture', 772 [ 773 mock.Mock( 774 fixtures=[], 775 platform='dummy_platform', 776 available=1, 777 counter=0 778 ), 779 mock.Mock( 780 fixtures=['dummy fixture'], 781 platform='another_platform', 782 available=1, 783 counter=0 784 ), 785 mock.Mock( 786 fixtures=['dummy fixture'], 787 platform='dummy_platform', 788 serial_pty=None, 789 serial=None, 790 available=1, 791 counter=0 792 ), 793 mock.Mock( 794 fixtures=['dummy fixture'], 795 platform='dummy_platform', 796 serial_pty=mock.Mock(), 797 available=1, 798 counter=0 799 ) 800 ], 801 3 802 ), 803 ( 804 'dummy_platform', 805 'dummy fixture', 806 [], 807 TwisterException 808 ), 809 ( 810 'dummy_platform', 811 'dummy fixture', 812 [ 813 mock.Mock( 814 fixtures=['dummy fixture'], 815 platform='dummy_platform', 816 serial_pty=mock.Mock(), 817 available=0 818 ), 819 mock.Mock( 820 fixtures=['another fixture'], 821 platform='dummy_platform', 822 serial_pty=mock.Mock(), 823 available=0 824 ), 825 mock.Mock( 826 fixtures=['dummy fixture'], 827 platform='dummy_platform', 828 serial=mock.Mock(), 829 available=0 830 ), 831 mock.Mock( 832 fixtures=['another fixture'], 833 platform='dummy_platform', 834 serial=mock.Mock(), 835 available=0 836 ) 837 ], 838 None 839 ) 840] 841 842@pytest.mark.parametrize( 843 'platform_name, fixture, duts, expected', 844 TESTDATA_10, 845 ids=['one good dut', 'exception - no duts', 'no available duts'] 846) 847def test_devicehandler_device_is_available( 848 mocked_instance, 849 platform_name, 850 fixture, 851 duts, 852 expected 853): 854 mocked_instance.platform.name = platform_name 855 mocked_instance.testsuite.harness_config = {'fixture': fixture} 856 857 handler = DeviceHandler(mocked_instance, 'build') 858 handler.duts = duts 859 860 if isinstance(expected, int): 861 device = handler.device_is_available(mocked_instance) 862 863 assert device == duts[expected] 864 assert device.available == 0 865 assert device.counter == 1 866 elif expected is None: 867 device = handler.device_is_available(mocked_instance) 868 869 assert device is None 870 elif isinstance(expected, type): 871 with pytest.raises(expected): 872 device = handler.device_is_available(mocked_instance) 873 else: 874 assert False 875 876 877def test_devicehandler_make_device_available(mocked_instance): 878 serial = mock.Mock(name='dummy_serial') 879 duts = [ 880 mock.Mock(available=0, serial=serial, serial_pty=None), 881 mock.Mock(available=0, serial=None, serial_pty=serial), 882 mock.Mock( 883 available=0, 884 serial=mock.Mock('another_serial'), 885 serial_pty=None 886 ) 887 ] 888 889 handler = DeviceHandler(mocked_instance, 'build') 890 handler.duts = duts 891 892 handler.make_device_available(serial) 893 894 assert len([None for d in handler.duts if d.available == 1]) == 2 895 assert handler.duts[2].available == 0 896 897 898TESTDATA_11 = [ 899 (mock.Mock(pid=0, returncode=0), False), 900 (mock.Mock(pid=0, returncode=1), False), 901 (mock.Mock(pid=0, returncode=1), True) 902] 903 904@pytest.mark.parametrize( 905 'mock_process, raise_timeout', 906 TESTDATA_11, 907 ids=['proper script', 'error', 'timeout'] 908) 909def test_devicehandler_run_custom_script(caplog, mock_process, raise_timeout): 910 def raise_timeout_fn(timeout=-1): 911 if raise_timeout and timeout != -1: 912 raise subprocess.TimeoutExpired(None, timeout) 913 else: 914 return mock.Mock(), mock.Mock() 915 916 def assert_popen(command, *args, **kwargs): 917 return mock.Mock( 918 __enter__=mock.Mock(return_value=mock_process), 919 __exit__=mock.Mock(return_value=None) 920 ) 921 922 mock_process.communicate = mock.Mock(side_effect=raise_timeout_fn) 923 924 script = [os.path.join('test','script', 'path'), 'arg'] 925 timeout = 60 926 927 with mock.patch('subprocess.Popen', side_effect=assert_popen): 928 DeviceHandler.run_custom_script(script, timeout) 929 930 if raise_timeout: 931 assert all( 932 t in caplog.text.lower() for t in [str(script), 'timed out'] 933 ) 934 mock_process.assert_has_calls( 935 [ 936 mock.call.communicate(timeout=timeout), 937 mock.call.kill(), 938 mock.call.communicate() 939 ] 940 ) 941 elif mock_process.returncode == 0: 942 assert not any([r.levelname == 'ERROR' for r in caplog.records]) 943 else: 944 assert 'timed out' not in caplog.text.lower() 945 assert 'custom script failure' in caplog.text.lower() 946 947 948TESTDATA_12 = [ 949 (0, False), 950 (4, False), 951 (0, True) 952] 953 954@pytest.mark.parametrize( 955 'num_of_failures, raise_exception', 956 TESTDATA_12, 957 ids=['no failures', 'with failures', 'exception'] 958) 959def test_devicehandler_get_hardware( 960 mocked_instance, 961 caplog, 962 num_of_failures, 963 raise_exception 964): 965 expected_hardware = mock.Mock() 966 967 def mock_availability(handler, instance, no=num_of_failures): 968 if raise_exception: 969 raise TwisterException(f'dummy message') 970 if handler.no: 971 handler.no -= 1 972 return None 973 return expected_hardware 974 975 handler = DeviceHandler(mocked_instance, 'build') 976 handler.no = num_of_failures 977 978 with mock.patch.object( 979 DeviceHandler, 980 'device_is_available', 981 mock_availability 982 ): 983 hardware = handler.get_hardware() 984 985 if raise_exception: 986 assert 'dummy message' in caplog.text.lower() 987 assert mocked_instance.status == 'failed' 988 assert mocked_instance.reason == 'dummy message' 989 else: 990 assert hardware == expected_hardware 991 992 993TESTDATA_13 = [ 994 ( 995 None, 996 None, 997 None, 998 ['generator_cmd', '-C', '$build_dir', 'flash'] 999 ), 1000 ( 1001 [], 1002 None, 1003 None, 1004 ['west', 'flash', '--skip-rebuild', '-d', '$build_dir'] 1005 ), 1006 ( 1007 '--dummy', 1008 None, 1009 None, 1010 ['west', 'flash', '--skip-rebuild', '-d', '$build_dir', 1011 '--', '--dummy'] 1012 ), 1013 ( 1014 '--dummy1,--dummy2', 1015 None, 1016 None, 1017 ['west', 'flash', '--skip-rebuild', '-d', '$build_dir', 1018 '--', '--dummy1', '--dummy2'] 1019 ), 1020 1021 ( 1022 None, 1023 'runner', 1024 'product', 1025 ['west', 'flash', '--skip-rebuild', '-d', '$build_dir', 1026 '--runner', 'runner', 'param1', 'param2'] 1027 ), 1028 1029 ( 1030 None, 1031 'pyocd', 1032 'product', 1033 ['west', 'flash', '--skip-rebuild', '-d', '$build_dir', 1034 '--runner', 'pyocd', 'param1', 'param2', '--', '--dev-id', 12345] 1035 ), 1036 ( 1037 None, 1038 'nrfjprog', 1039 'product', 1040 ['west', 'flash', '--skip-rebuild', '-d', '$build_dir', 1041 '--runner', 'nrfjprog', 'param1', 'param2', '--', '--dev-id', 12345] 1042 ), 1043 ( 1044 None, 1045 'openocd', 1046 'STM32 STLink', 1047 ['west', 'flash', '--skip-rebuild', '-d', '$build_dir', 1048 '--runner', 'openocd', 'param1', 'param2', 1049 '--', '--cmd-pre-init', 'hla_serial 12345'] 1050 ), 1051 ( 1052 None, 1053 'openocd', 1054 'STLINK-V3', 1055 ['west', 'flash', '--skip-rebuild', '-d', '$build_dir', 1056 '--runner', 'openocd', 'param1', 'param2', 1057 '--', '--cmd-pre-init', 'hla_serial 12345'] 1058 ), 1059 ( 1060 None, 1061 'openocd', 1062 'EDBG CMSIS-DAP', 1063 ['west', 'flash', '--skip-rebuild', '-d', '$build_dir', 1064 '--runner', 'openocd', 'param1', 'param2', 1065 '--', '--cmd-pre-init', 'cmsis_dap_serial 12345'] 1066 ), 1067 ( 1068 None, 1069 'jlink', 1070 'product', 1071 ['west', 'flash', '--skip-rebuild', '-d', '$build_dir', 1072 '--runner', 'jlink', '--tool-opt=-SelectEmuBySN 12345', # 2x space 1073 'param1', 'param2'] 1074 ), 1075 ( 1076 None, 1077 'stm32cubeprogrammer', 1078 'product', 1079 ['west', 'flash', '--skip-rebuild', '-d', '$build_dir', 1080 '--runner', 'stm32cubeprogrammer', '--tool-opt=sn=12345', 1081 'param1', 'param2'] 1082 ), 1083 1084] 1085 1086TESTDATA_13_2 = [(True), (False)] 1087 1088@pytest.mark.parametrize( 1089 'self_west_flash, runner,' \ 1090 ' hardware_product_name, expected', 1091 TESTDATA_13, 1092 ids=['generator', '--west-flash', 'one west flash value', 1093 'multiple west flash values', 'generic runner', 'pyocd', 1094 'nrfjprog', 'openocd, STM32 STLink', 'openocd, STLINK-v3', 1095 'openocd, EDBG CMSIS-DAP', 'jlink', 'stm32cubeprogrammer'] 1096) 1097@pytest.mark.parametrize('hardware_probe', TESTDATA_13_2, ids=['probe', 'id']) 1098def test_devicehandler_create_command( 1099 mocked_instance, 1100 self_west_flash, 1101 runner, 1102 hardware_probe, 1103 hardware_product_name, 1104 expected 1105): 1106 handler = DeviceHandler(mocked_instance, 'build') 1107 handler.options = mock.Mock(west_flash=self_west_flash) 1108 handler.generator_cmd = 'generator_cmd' 1109 1110 expected = [handler.build_dir if val == '$build_dir' else \ 1111 val for val in expected] 1112 1113 hardware = mock.Mock( 1114 product=hardware_product_name, 1115 probe_id=12345 if hardware_probe else None, 1116 id=12345 if not hardware_probe else None, 1117 runner_params=['param1', 'param2'] 1118 ) 1119 1120 command = handler._create_command(runner, hardware) 1121 1122 assert command == expected 1123 1124 1125TESTDATA_14 = [ 1126 ('success', False, 'success', 'Unknown', False), 1127 ('failed', False, 'failed', 'Failed', True), 1128 ('error', False, 'error', 'Unknown', True), 1129 (None, True, None, 'Unknown', False), 1130 (None, False, 'failed', 'Timeout', True), 1131] 1132 1133@pytest.mark.parametrize( 1134 'harness_state, flash_error,' \ 1135 ' expected_status, expected_reason, do_add_missing', 1136 TESTDATA_14, 1137 ids=['success', 'failed', 'error', 'flash error', 'no status'] 1138) 1139def test_devicehandler_update_instance_info( 1140 mocked_instance, 1141 harness_state, 1142 flash_error, 1143 expected_status, 1144 expected_reason, 1145 do_add_missing 1146): 1147 handler = DeviceHandler(mocked_instance, 'build') 1148 handler_time = 59 1149 missing_mock = mock.Mock() 1150 handler.instance.add_missing_case_status = missing_mock 1151 1152 handler._update_instance_info(harness_state, handler_time, flash_error) 1153 1154 assert handler.instance.execution_time == handler_time 1155 1156 assert handler.instance.status == expected_status 1157 assert handler.instance.reason == expected_reason 1158 1159 if do_add_missing: 1160 missing_mock.assert_called_once_with('blocked', expected_reason) 1161 1162 1163TESTDATA_15 = [ 1164 ('dummy device', 'dummy pty', None, None, True, False, False), 1165 ( 1166 'dummy device', 1167 'dummy pty', 1168 mock.Mock(communicate=mock.Mock(return_value=('', ''))), 1169 SerialException, 1170 False, 1171 True, 1172 'dummy pty' 1173 ), 1174 ( 1175 'dummy device', 1176 None, 1177 None, 1178 SerialException, 1179 False, 1180 False, 1181 'dummy device' 1182 ) 1183] 1184 1185@pytest.mark.parametrize( 1186 'serial_device, serial_pty, ser_pty_process, expected_exception,' \ 1187 ' expected_result, terminate_ser_pty_process, make_available', 1188 TESTDATA_15, 1189 ids=['valid', 'serial pty process', 'no serial pty'] 1190) 1191def test_devicehandler_create_serial_connection( 1192 mocked_instance, 1193 serial_device, 1194 serial_pty, 1195 ser_pty_process, 1196 expected_exception, 1197 expected_result, 1198 terminate_ser_pty_process, 1199 make_available 1200): 1201 def mock_serial(*args, **kwargs): 1202 if expected_exception: 1203 raise expected_exception('') 1204 return expected_result 1205 1206 handler = DeviceHandler(mocked_instance, 'build') 1207 handler.make_device_available = mock.Mock() 1208 missing_mock = mock.Mock() 1209 handler.instance.add_missing_case_status = missing_mock 1210 available_mock = mock.Mock() 1211 handler.make_device_available = available_mock 1212 handler.options = mock.Mock(timeout_multiplier=1) 1213 1214 hardware_baud = 14400 1215 flash_timeout = 60 1216 serial_mock = mock.Mock(side_effect=mock_serial) 1217 1218 with mock.patch('serial.Serial', serial_mock), \ 1219 pytest.raises(expected_exception) if expected_exception else \ 1220 nullcontext(): 1221 result = handler._create_serial_connection(serial_device, hardware_baud, 1222 flash_timeout, serial_pty, 1223 ser_pty_process) 1224 1225 if expected_result: 1226 assert result is not None 1227 1228 if expected_exception: 1229 assert handler.instance.status == 'failed' 1230 assert handler.instance.reason == 'Serial Device Error' 1231 1232 missing_mock.assert_called_once_with('blocked', 'Serial Device Error') 1233 1234 if terminate_ser_pty_process: 1235 ser_pty_process.terminate.assert_called_once() 1236 ser_pty_process.communicate.assert_called_once() 1237 1238 if make_available: 1239 available_mock.assert_called_once_with(make_available) 1240 1241 1242TESTDATA_16 = [ 1243 ('dummy1 dummy2', None, 'slave name'), 1244 ('dummy1,dummy2', CalledProcessError, None), 1245 (None, None, 'dummy hardware serial'), 1246] 1247 1248@pytest.mark.parametrize( 1249 'serial_pty, popen_exception, expected_device', 1250 TESTDATA_16, 1251 ids=['pty', 'pty process error', 'no pty'] 1252) 1253def test_devicehandler_get_serial_device( 1254 mocked_instance, 1255 serial_pty, 1256 popen_exception, 1257 expected_device 1258): 1259 def mock_popen(command, *args, **kwargs): 1260 assert command == ['dummy1', 'dummy2'] 1261 if popen_exception: 1262 raise popen_exception(command, 'Dummy error') 1263 return mock.Mock() 1264 1265 handler = DeviceHandler(mocked_instance, 'build') 1266 hardware_serial = 'dummy hardware serial' 1267 1268 popen_mock = mock.Mock(side_effect=mock_popen) 1269 openpty_mock = mock.Mock(return_value=('master', 'slave')) 1270 ttyname_mock = mock.Mock(side_effect=lambda x: x + ' name') 1271 1272 with mock.patch('subprocess.Popen', popen_mock), \ 1273 mock.patch('pty.openpty', openpty_mock), \ 1274 mock.patch('os.ttyname', ttyname_mock): 1275 result = handler._get_serial_device(serial_pty, hardware_serial) 1276 1277 if popen_exception: 1278 assert result is None 1279 else: 1280 assert result[0] == expected_device 1281 1282TESTDATA_17 = [ 1283 (False, False, False, False, None, False, False, 1284 None, None, []), 1285 (True, True, False, False, None, False, False, 1286 None, None, []), 1287 (True, False, True, False, None, False, False, 1288 'error', 'Device issue (Flash error)', []), 1289 (True, False, False, True, None, False, False, 1290 'error', 'Device issue (Timeout)', ['Flash operation timed out.']), 1291 (True, False, False, False, 1, False, False, 1292 'error', 'Device issue (Flash error?)', []), 1293 (True, False, False, False, 0, True, False, 1294 None, None, ['Timed out while monitoring serial output on IPName']), 1295 (True, False, False, False, 0, False, True, 1296 None, None, ['Process Serial PTY terminated outs: errs ']), 1297] 1298 1299@pytest.mark.parametrize( 1300 'has_hardware, raise_create_serial, raise_popen, raise_timeout,' \ 1301 ' returncode, do_timeout_thread, use_pty,' \ 1302 ' expected_status, expected_reason, expected_logs', 1303 TESTDATA_17, 1304 ids=['no hardware', 'create serial failure', 'popen called process error', 1305 'communicate timeout', 'nonzero returncode', 'valid pty', 'valid dev'] 1306) 1307def test_devicehandler_handle( 1308 mocked_instance, 1309 caplog, 1310 has_hardware, 1311 raise_create_serial, 1312 raise_popen, 1313 raise_timeout, 1314 returncode, 1315 do_timeout_thread, 1316 use_pty, 1317 expected_status, 1318 expected_reason, 1319 expected_logs 1320): 1321 def mock_get_serial(serial_pty, hardware_serial): 1322 if serial_pty: 1323 serial_pty_process = mock.Mock( 1324 name='dummy serial PTY process', 1325 communicate=mock.Mock( 1326 return_value=('', '') 1327 ) 1328 ) 1329 return 'dummy serial PTY device', serial_pty_process 1330 return 'dummy serial device', None 1331 1332 def mock_create_serial(*args, **kwargs): 1333 if raise_create_serial: 1334 raise SerialException('dummy cmd', 'dummy msg') 1335 return mock.Mock(name='dummy serial') 1336 1337 def mock_thread(*args, **kwargs): 1338 is_alive_mock = mock.Mock(return_value=bool(do_timeout_thread)) 1339 1340 return mock.Mock(is_alive=is_alive_mock) 1341 1342 def mock_terminate(proc, *args, **kwargs): 1343 proc.communicate = mock.Mock(return_value=(mock.Mock(), mock.Mock())) 1344 1345 def mock_communicate(*args, **kwargs): 1346 if raise_timeout: 1347 raise TimeoutExpired('dummy cmd', 'dummyamount') 1348 return mock.Mock(), mock.Mock() 1349 1350 def mock_popen(command, *args, **kwargs): 1351 if raise_popen: 1352 raise CalledProcessError('dummy proc', 'dummy msg') 1353 1354 mock_process = mock.Mock( 1355 pid=1, 1356 returncode=returncode, 1357 communicate=mock.Mock(side_effect=mock_communicate) 1358 ) 1359 1360 return mock.Mock( 1361 __enter__=mock.Mock(return_value=mock_process), 1362 __exit__=mock.Mock(return_value=None) 1363 ) 1364 1365 hardware = None if not has_hardware else mock.Mock( 1366 baud=14400, 1367 runner='dummy runner', 1368 serial_pty='Serial PTY' if use_pty else None, 1369 serial='dummy serial', 1370 pre_script='dummy pre script', 1371 post_script='dummy post script', 1372 post_flash_script='dummy post flash script', 1373 flash_timeout=60, 1374 flash_with_test=True 1375 ) 1376 1377 handler = DeviceHandler(mocked_instance, 'build') 1378 handler.get_hardware = mock.Mock(return_value=hardware) 1379 handler.options = mock.Mock( 1380 timeout_multiplier=1, 1381 west_flash=None, 1382 west_runner=None 1383 ) 1384 handler._get_serial_device = mock.Mock(side_effect=mock_get_serial) 1385 handler._create_command = mock.Mock(return_value=['dummy', 'command']) 1386 handler.run_custom_script = mock.Mock() 1387 handler._create_serial_connection = mock.Mock( 1388 side_effect=mock_create_serial 1389 ) 1390 handler.monitor_serial = mock.Mock() 1391 handler.terminate = mock.Mock(side_effect=mock_terminate) 1392 handler._update_instance_info = mock.Mock() 1393 handler._final_handle_actions = mock.Mock() 1394 handler.make_device_available = mock.Mock() 1395 handler.instance.platform.name = 'IPName' 1396 1397 harness = mock.Mock() 1398 1399 with mock.patch('builtins.open', mock.mock_open(read_data='')), \ 1400 mock.patch('subprocess.Popen', side_effect=mock_popen), \ 1401 mock.patch('threading.Event', mock.Mock()), \ 1402 mock.patch('threading.Thread', side_effect=mock_thread): 1403 handler.handle(harness) 1404 1405 handler.get_hardware.assert_called_once() 1406 1407 messages = [record.msg for record in caplog.records] 1408 assert all([msg in messages for msg in expected_logs]) 1409 1410 if not has_hardware: 1411 return 1412 1413 handler.run_custom_script.assert_has_calls([ 1414 mock.call('dummy pre script', mock.ANY) 1415 ]) 1416 1417 if raise_create_serial: 1418 return 1419 1420 handler.run_custom_script.assert_has_calls([ 1421 mock.call('dummy pre script', mock.ANY), 1422 mock.call('dummy post flash script', mock.ANY), 1423 mock.call('dummy post script', mock.ANY) 1424 ]) 1425 1426 if expected_reason: 1427 assert handler.instance.reason == expected_reason 1428 if expected_status: 1429 assert handler.instance.status == expected_status 1430 1431 handler.make_device_available.assert_called_once_with( 1432 'Serial PTY' if use_pty else 'dummy serial device' 1433 ) 1434 1435 1436TESTDATA_18 = [ 1437 (True, True, True), 1438 (False, False, False), 1439] 1440 1441@pytest.mark.parametrize( 1442 'ignore_qemu_crash, expected_ignore_crash, expected_ignore_unexpected_eof', 1443 TESTDATA_18, 1444 ids=['ignore crash', 'qemu crash'] 1445) 1446def test_qemuhandler_init( 1447 mocked_instance, 1448 ignore_qemu_crash, 1449 expected_ignore_crash, 1450 expected_ignore_unexpected_eof 1451): 1452 mocked_instance.testsuite.ignore_qemu_crash = ignore_qemu_crash 1453 1454 handler = QEMUHandler(mocked_instance, 'build') 1455 1456 assert handler.ignore_qemu_crash == expected_ignore_crash 1457 assert handler.ignore_unexpected_eof == expected_ignore_unexpected_eof 1458 1459 1460def test_qemuhandler_get_cpu_time(): 1461 def mock_process(pid): 1462 return mock.Mock( 1463 cpu_times=mock.Mock( 1464 return_value=mock.Mock( 1465 user=20.0, 1466 system=64.0 1467 ) 1468 ) 1469 ) 1470 1471 with mock.patch('psutil.Process', mock_process): 1472 res = QEMUHandler._get_cpu_time(0) 1473 1474 assert res == pytest.approx(84.0) 1475 1476 1477TESTDATA_19 = [ 1478 ( 1479 True, 1480 os.path.join('self', 'dummy_dir', '1'), 1481 mock.PropertyMock(return_value=os.path.join('dummy_dir', '1')), 1482 os.path.join('dummy_dir', '1') 1483 ), 1484 ( 1485 False, 1486 os.path.join('self', 'dummy_dir', '2'), 1487 mock.PropertyMock(return_value=os.path.join('dummy_dir', '2')), 1488 os.path.join('self', 'dummy_dir', '2') 1489 ), 1490] 1491 1492@pytest.mark.parametrize( 1493 'self_sysbuild, self_build_dir, build_dir, expected', 1494 TESTDATA_19, 1495 ids=['domains build dir', 'self build dir'] 1496) 1497def test_qemuhandler_get_sysbuild_build_dir( 1498 mocked_instance, 1499 self_sysbuild, 1500 self_build_dir, 1501 build_dir, 1502 expected 1503): 1504 get_default_domain_mock = mock.Mock() 1505 type(get_default_domain_mock()).build_dir = build_dir 1506 domains_mock = mock.Mock(get_default_domain=get_default_domain_mock) 1507 from_file_mock = mock.Mock(return_value=domains_mock) 1508 1509 handler = QEMUHandler(mocked_instance, 'build') 1510 handler.instance.testsuite.sysbuild = self_sysbuild 1511 handler.build_dir = self_build_dir 1512 1513 with mock.patch('domains.Domains.from_file', from_file_mock): 1514 result = handler._get_sysbuild_build_dir() 1515 1516 assert result == expected 1517 1518 1519TESTDATA_20 = [ 1520 ( 1521 os.path.join('self', 'dummy_dir', 'log1'), 1522 os.path.join('self', 'dummy_dir', 'pid1'), 1523 os.path.join('sysbuild', 'dummy_dir', 'bd1'), 1524 True 1525 ), 1526 ( 1527 os.path.join('self', 'dummy_dir', 'log2'), 1528 os.path.join('self', 'dummy_dir', 'pid2'), 1529 os.path.join('sysbuild', 'dummy_dir', 'bd2'), 1530 False 1531 ), 1532] 1533 1534@pytest.mark.parametrize( 1535 'self_log, self_pid_fn, sysbuild_build_dir, exists_pid_fn', 1536 TESTDATA_20, 1537 ids=['pid exists', 'pid missing'] 1538) 1539def test_qemuhandler_set_qemu_filenames( 1540 mocked_instance, 1541 self_log, 1542 self_pid_fn, 1543 sysbuild_build_dir, 1544 exists_pid_fn 1545): 1546 unlink_mock = mock.Mock() 1547 exists_mock = mock.Mock(return_value=exists_pid_fn) 1548 1549 handler = QEMUHandler(mocked_instance, 'build') 1550 handler.log = self_log 1551 handler.pid_fn = self_pid_fn 1552 1553 with mock.patch('os.unlink', unlink_mock), \ 1554 mock.patch('os.path.exists', exists_mock): 1555 handler._set_qemu_filenames(sysbuild_build_dir) 1556 1557 assert handler.fifo_fn == mocked_instance.build_dir + \ 1558 os.path.sep + 'qemu-fifo' 1559 1560 assert handler.pid_fn == sysbuild_build_dir + os.path.sep + 'qemu.pid' 1561 1562 assert handler.log_fn == self_log 1563 1564 if exists_pid_fn: 1565 unlink_mock.assert_called_once_with(sysbuild_build_dir + \ 1566 os.path.sep + 'qemu.pid') 1567 1568 1569def test_qemuhandler_create_command(mocked_instance): 1570 sysbuild_build_dir = os.path.join('sysbuild', 'dummy_dir') 1571 1572 handler = QEMUHandler(mocked_instance, 'build') 1573 handler.generator_cmd = 'dummy_cmd' 1574 1575 result = handler._create_command(sysbuild_build_dir) 1576 1577 assert result == ['dummy_cmd', '-C', 'sysbuild' + os.path.sep + 'dummy_dir', 1578 'run'] 1579 1580 1581TESTDATA_21 = [ 1582 ( 1583 0, 1584 False, 1585 None, 1586 'good dummy state', 1587 False, 1588 None, 1589 None, 1590 False 1591 ), 1592 ( 1593 1, 1594 True, 1595 None, 1596 'good dummy state', 1597 False, 1598 None, 1599 None, 1600 False 1601 ), 1602 ( 1603 0, 1604 False, 1605 None, 1606 None, 1607 True, 1608 'failed', 1609 'Timeout', 1610 True 1611 ), 1612 ( 1613 1, 1614 False, 1615 None, 1616 None, 1617 False, 1618 'failed', 1619 'Exited with 1', 1620 True 1621 ), 1622 ( 1623 1, 1624 False, 1625 'preexisting reason', 1626 'good dummy state', 1627 False, 1628 'failed', 1629 'preexisting reason', 1630 True 1631 ), 1632] 1633 1634@pytest.mark.parametrize( 1635 'self_returncode, self_ignore_qemu_crash,' \ 1636 ' self_instance_reason, harness_state, is_timeout,' \ 1637 ' expected_status, expected_reason, expected_called_missing_case', 1638 TESTDATA_21, 1639 ids=['not failed', 'qemu ignore', 'timeout', 'bad returncode', 'other fail'] 1640) 1641def test_qemuhandler_update_instance_info( 1642 mocked_instance, 1643 self_returncode, 1644 self_ignore_qemu_crash, 1645 self_instance_reason, 1646 harness_state, 1647 is_timeout, 1648 expected_status, 1649 expected_reason, 1650 expected_called_missing_case 1651): 1652 mocked_instance.add_missing_case_status = mock.Mock() 1653 mocked_instance.reason = self_instance_reason 1654 1655 handler = QEMUHandler(mocked_instance, 'build') 1656 handler.returncode = self_returncode 1657 handler.ignore_qemu_crash = self_ignore_qemu_crash 1658 1659 handler._update_instance_info(harness_state, is_timeout) 1660 1661 assert handler.instance.status == expected_status 1662 assert handler.instance.reason == expected_reason 1663 1664 if expected_called_missing_case: 1665 mocked_instance.add_missing_case_status.assert_called_once_with( 1666 'blocked' 1667 ) 1668 1669 1670def test_qemuhandler_thread_get_fifo_names(): 1671 fifo_fn = 'dummy' 1672 1673 fifo_in, fifo_out = QEMUHandler._thread_get_fifo_names(fifo_fn) 1674 1675 assert fifo_in == 'dummy.in' 1676 assert fifo_out == 'dummy.out' 1677 1678TESTDATA_22 = [ 1679 (False, False), 1680 (False, True), 1681 (True, False), 1682 (True, True), 1683] 1684 1685@pytest.mark.parametrize( 1686 'fifo_in_exists, fifo_out_exists', 1687 TESTDATA_22, 1688 ids=['both missing', 'out exists', 'in exists', 'both exist'] 1689) 1690def test_qemuhandler_thread_open_files(fifo_in_exists, fifo_out_exists): 1691 def mock_exists(path): 1692 if path == 'fifo.in': 1693 return fifo_in_exists 1694 elif path == 'fifo.out': 1695 return fifo_out_exists 1696 else: 1697 raise ValueError('Unexpected path in mock of os.path.exists') 1698 1699 unlink_mock = mock.Mock() 1700 exists_mock = mock.Mock(side_effect=mock_exists) 1701 mkfifo_mock = mock.Mock() 1702 1703 fifo_in = 'fifo.in' 1704 fifo_out = 'fifo.out' 1705 logfile = 'log.file' 1706 1707 with mock.patch('os.unlink', unlink_mock), \ 1708 mock.patch('os.mkfifo', mkfifo_mock), \ 1709 mock.patch('os.path.exists', exists_mock), \ 1710 mock.patch('builtins.open', mock.mock_open()) as open_mock: 1711 _, _, _ = QEMUHandler._thread_open_files(fifo_in, fifo_out, logfile) 1712 1713 open_mock.assert_has_calls([ 1714 mock.call('fifo.in', 'wb'), 1715 mock.call('fifo.out', 'rb', buffering=0), 1716 mock.call('log.file', 'wt'), 1717 ]) 1718 1719 if fifo_in_exists: 1720 unlink_mock.assert_any_call('fifo.in') 1721 1722 if fifo_out_exists: 1723 unlink_mock.assert_any_call('fifo.out') 1724 1725 1726TESTDATA_23 = [ 1727 (False, False), 1728 (True, True), 1729 (True, False) 1730] 1731 1732@pytest.mark.parametrize( 1733 'is_pid, is_lookup_error', 1734 TESTDATA_23, 1735 ids=['pid missing', 'pid lookup error', 'pid ok'] 1736) 1737def test_qemuhandler_thread_close_files(is_pid, is_lookup_error): 1738 is_process_killed = {} 1739 1740 def mock_kill(pid, sig): 1741 if is_lookup_error: 1742 raise ProcessLookupError(f'Couldn\'t find pid: {pid}.') 1743 elif sig == signal.SIGTERM: 1744 is_process_killed[pid] = True 1745 1746 unlink_mock = mock.Mock() 1747 kill_mock = mock.Mock(side_effect=mock_kill) 1748 1749 fifo_in = 'fifo.in' 1750 fifo_out = 'fifo.out' 1751 pid = 12345 if is_pid else None 1752 out_fp = mock.Mock() 1753 in_fp = mock.Mock() 1754 log_out_fp = mock.Mock() 1755 1756 with mock.patch('os.unlink', unlink_mock), \ 1757 mock.patch('os.kill', kill_mock): 1758 QEMUHandler._thread_close_files(fifo_in, fifo_out, pid, out_fp, 1759 in_fp, log_out_fp) 1760 1761 out_fp.close.assert_called_once() 1762 in_fp.close.assert_called_once() 1763 log_out_fp.close.assert_called_once() 1764 1765 unlink_mock.assert_has_calls([mock.call('fifo.in'), mock.call('fifo.out')]) 1766 1767 if is_pid and not is_lookup_error: 1768 assert is_process_killed[pid] 1769 1770 1771TESTDATA_24 = [ 1772 ('timeout', 'failed', 'Timeout'), 1773 ('failed', 'failed', 'Failed'), 1774 ('unexpected eof', 'failed', 'unexpected eof'), 1775 ('unexpected byte', 'failed', 'unexpected byte'), 1776 (None, None, 'Unknown'), 1777] 1778 1779@pytest.mark.parametrize( 1780 'out_state, expected_status, expected_reason', 1781 TESTDATA_24, 1782 ids=['timeout', 'failed', 'unexpected eof', 'unexpected byte', 'unknown'] 1783) 1784def test_qemuhandler_thread_update_instance_info( 1785 mocked_instance, 1786 out_state, 1787 expected_status, 1788 expected_reason 1789): 1790 handler = QEMUHandler(mocked_instance, 'build') 1791 handler_time = 59 1792 1793 QEMUHandler._thread_update_instance_info(handler, handler_time, out_state) 1794 1795 assert handler.instance.execution_time == handler_time 1796 1797 assert handler.instance.status == expected_status 1798 assert handler.instance.reason == expected_reason 1799 1800 1801TESTDATA_25 = [ 1802 ( 1803 ('1\n' * 60).encode('utf-8'), 1804 60, 1805 1, 1806 [None] * 60 + ['success'] * 6, 1807 1000, 1808 False, 1809 'timeout', 1810 [mock.call('1\n'), mock.call('1\n')] 1811 ), 1812 ( 1813 ('1\n' * 60).encode('utf-8'), 1814 60, 1815 -1, 1816 [None] * 60 + ['success'] * 30, 1817 100, 1818 False, 1819 'failed', 1820 [mock.call('1\n'), mock.call('1\n')] 1821 ), 1822 ( 1823 b'', 1824 60, 1825 1, 1826 ['success'] * 3, 1827 100, 1828 False, 1829 'unexpected eof', 1830 [] 1831 ), 1832 ( 1833 b'\x81', 1834 60, 1835 1, 1836 ['success'] * 3, 1837 100, 1838 False, 1839 'unexpected byte', 1840 [] 1841 ), 1842 ( 1843 '1\n2\n3\n4\n5\n'.encode('utf-8'), 1844 600, 1845 1, 1846 [None] * 3 + ['success'] * 7, 1847 100, 1848 False, 1849 'success', 1850 [mock.call('1\n'), mock.call('2\n'), mock.call('3\n'), mock.call('4\n')] 1851 ), 1852 ( 1853 '1\n2\n3\n4\n5\n'.encode('utf-8'), 1854 600, 1855 0, 1856 [None] * 3 + ['success'] * 7, 1857 100, 1858 False, 1859 'timeout', 1860 [mock.call('1\n'), mock.call('2\n')] 1861 ), 1862 ( 1863 '1\n2\n3\n4\n5\n'.encode('utf-8'), 1864 60, 1865 1, 1866 [None] * 3 + ['success'] * 7, 1867 (n for n in [100, 100, 10000]), 1868 True, 1869 'success', 1870 [mock.call('1\n'), mock.call('2\n'), mock.call('3\n'), mock.call('4\n')] 1871 ), 1872] 1873 1874@pytest.mark.parametrize( 1875 'content, timeout, pid, harness_states, cputime, capture_coverage,' \ 1876 ' expected_out_state, expected_log_calls', 1877 TESTDATA_25, 1878 ids=[ 1879 'timeout', 1880 'harness failed', 1881 'unexpected eof', 1882 'unexpected byte', 1883 'harness success', 1884 'timeout by pid=0', 1885 'capture_coverage' 1886 ] 1887) 1888def test_qemuhandler_thread( 1889 mocked_instance, 1890 faux_timer, 1891 content, 1892 timeout, 1893 pid, 1894 harness_states, 1895 cputime, 1896 capture_coverage, 1897 expected_out_state, 1898 expected_log_calls 1899): 1900 def mock_cputime(pid): 1901 if pid > 0: 1902 return cputime if isinstance(cputime, int) else next(cputime) 1903 else: 1904 raise ProcessLookupError() 1905 1906 type(mocked_instance.testsuite).timeout = mock.PropertyMock(return_value=timeout) 1907 handler = QEMUHandler(mocked_instance, 'build') 1908 handler.results = {} 1909 handler.ignore_unexpected_eof = False 1910 handler.pid_fn = 'pid_fn' 1911 handler.fifo_fn = 'fifo_fn' 1912 handler.options = mock.Mock(timeout_multiplier=1) 1913 1914 def mocked_open(filename, *args, **kwargs): 1915 if filename == handler.pid_fn: 1916 contents = str(pid).encode('utf-8') 1917 elif filename == handler.fifo_fn + '.out': 1918 contents = content 1919 else: 1920 contents = b'' 1921 1922 file_object = mock.mock_open(read_data=contents).return_value 1923 file_object.__iter__.return_value = contents.splitlines(True) 1924 return file_object 1925 1926 harness = mock.Mock(capture_coverage=capture_coverage, handle=print) 1927 type(harness).state = mock.PropertyMock(side_effect=harness_states) 1928 1929 p = mock.Mock() 1930 p.poll = mock.Mock( 1931 side_effect=itertools.cycle([True, True, True, True, False]) 1932 ) 1933 1934 mock_thread_get_fifo_names = mock.Mock( 1935 return_value=('fifo_fn.in', 'fifo_fn.out') 1936 ) 1937 log_fp_mock = mock.Mock() 1938 in_fp_mock = mocked_open('fifo_fn.out') 1939 out_fp_mock = mock.Mock() 1940 mock_thread_open_files = mock.Mock( 1941 return_value=(out_fp_mock, in_fp_mock, log_fp_mock) 1942 ) 1943 mock_thread_close_files = mock.Mock() 1944 mock_thread_update_instance_info = mock.Mock() 1945 1946 with mock.patch('time.time', side_effect=faux_timer.time), \ 1947 mock.patch('builtins.open', new=mocked_open), \ 1948 mock.patch('select.poll', return_value=p), \ 1949 mock.patch('os.path.exists', return_value=True), \ 1950 mock.patch('twisterlib.handlers.QEMUHandler._get_cpu_time', 1951 mock_cputime), \ 1952 mock.patch('twisterlib.handlers.QEMUHandler._thread_get_fifo_names', 1953 mock_thread_get_fifo_names), \ 1954 mock.patch('twisterlib.handlers.QEMUHandler._thread_open_files', 1955 mock_thread_open_files), \ 1956 mock.patch('twisterlib.handlers.QEMUHandler._thread_close_files', 1957 mock_thread_close_files), \ 1958 mock.patch('twisterlib.handlers.QEMUHandler.' \ 1959 '_thread_update_instance_info', 1960 mock_thread_update_instance_info): 1961 QEMUHandler._thread( 1962 handler, 1963 handler.get_test_timeout(), 1964 handler.build_dir, 1965 handler.log, 1966 handler.fifo_fn, 1967 handler.pid_fn, 1968 handler.results, 1969 harness, 1970 handler.ignore_unexpected_eof 1971 ) 1972 1973 mock_thread_update_instance_info.assert_called_once_with( 1974 handler, 1975 mock.ANY, 1976 expected_out_state 1977 ) 1978 1979 log_fp_mock.write.assert_has_calls(expected_log_calls) 1980 1981 1982TESTDATA_26 = [ 1983 (True, False, None, True, 1984 ['No timeout, return code from QEMU (1): 1', 1985 'return code from QEMU (1): 1']), 1986 (False, True, 'passed', True, ['return code from QEMU (1): 0']), 1987 (False, True, 'failed', False, ['return code from QEMU (None): 1']), 1988] 1989 1990@pytest.mark.parametrize( 1991 'isatty, do_timeout, harness_state, exists_pid_fn, expected_logs', 1992 TESTDATA_26, 1993 ids=['no timeout, isatty', 'timeout passed', 'timeout, no pid_fn'] 1994) 1995def test_qemuhandler_handle( 1996 mocked_instance, 1997 caplog, 1998 tmp_path, 1999 isatty, 2000 do_timeout, 2001 harness_state, 2002 exists_pid_fn, 2003 expected_logs 2004): 2005 def mock_wait(*args, **kwargs): 2006 if do_timeout: 2007 raise TimeoutExpired('dummy cmd', 'dummyamount') 2008 2009 mock_process = mock.Mock(pid=0, returncode=1) 2010 mock_process.communicate = mock.Mock( 2011 return_value=(mock.Mock(), mock.Mock()) 2012 ) 2013 mock_process.wait = mock.Mock(side_effect=mock_wait) 2014 2015 handler = QEMUHandler(mocked_instance, 'build') 2016 2017 def mock_path_exists(name, *args, **kwargs): 2018 return exists_pid_fn 2019 2020 def mock_popen(command, stdout=None, stdin=None, stderr=None, cwd=None): 2021 return mock.Mock( 2022 __enter__=mock.Mock(return_value=mock_process), 2023 __exit__=mock.Mock(return_value=None), 2024 communicate=mock.Mock(return_value=(mock.Mock(), mock.Mock())) 2025 ) 2026 2027 def mock_thread(name=None, target=None, daemon=None, args=None): 2028 return mock.Mock() 2029 2030 def mock_filenames(sysbuild_build_dir): 2031 handler.fifo_fn = os.path.join('dummy', 'qemu-fifo') 2032 handler.pid_fn = os.path.join(sysbuild_build_dir, 'qemu.pid') 2033 handler.log_fn = os.path.join('dummy', 'log') 2034 2035 harness = mock.Mock(state=harness_state) 2036 handler_options_west_flash = [] 2037 2038 sysbuild_build_dir = os.path.join('sysbuild', 'dummydir') 2039 command = ['generator_cmd', '-C', os.path.join('cmd', 'path'), 'run'] 2040 2041 handler.options = mock.Mock( 2042 timeout_multiplier=1, 2043 west_flash=handler_options_west_flash, 2044 west_runner=None 2045 ) 2046 handler.run_custom_script = mock.Mock(return_value=None) 2047 handler.make_device_available = mock.Mock(return_value=None) 2048 handler._final_handle_actions = mock.Mock(return_value=None) 2049 handler._create_command = mock.Mock(return_value=command) 2050 handler._set_qemu_filenames = mock.Mock(side_effect=mock_filenames) 2051 handler._get_sysbuild_build_dir = mock.Mock(return_value=sysbuild_build_dir) 2052 handler.terminate = mock.Mock() 2053 2054 unlink_mock = mock.Mock() 2055 2056 with mock.patch('subprocess.Popen', side_effect=mock_popen), \ 2057 mock.patch('builtins.open', mock.mock_open(read_data='1')), \ 2058 mock.patch('threading.Thread', side_effect=mock_thread), \ 2059 mock.patch('os.path.exists', side_effect=mock_path_exists), \ 2060 mock.patch('os.unlink', unlink_mock), \ 2061 mock.patch('sys.stdout.isatty', return_value=isatty): 2062 handler.handle(harness) 2063 2064 assert all([expected_log in caplog.text for expected_log in expected_logs]) 2065 2066 2067def test_qemuhandler_get_fifo(mocked_instance): 2068 handler = QEMUHandler(mocked_instance, 'build') 2069 handler.fifo_fn = 'fifo_fn' 2070 2071 result = handler.get_fifo() 2072 2073 assert result == 'fifo_fn' 2074