1#!/usr/bin/env python3 2# Copyright (c) 2023 Intel Corporation 3# Copyright (c) 2024 Arm Limited (or its affiliates). All rights reserved. 4# 5# SPDX-License-Identifier: Apache-2.0 6""" 7Tests for handlers.py classes' methods 8""" 9 10import itertools 11import mock 12import os 13import pytest 14import signal 15import subprocess 16import sys 17 18from contextlib import nullcontext 19from importlib import reload 20from serial import SerialException 21from subprocess import CalledProcessError, TimeoutExpired 22from types import SimpleNamespace 23 24import twisterlib.harness 25 26ZEPHYR_BASE = os.getenv("ZEPHYR_BASE") 27 28from twisterlib.error import TwisterException 29from twisterlib.statuses import TwisterStatus 30from twisterlib.handlers import ( 31 Handler, 32 BinaryHandler, 33 DeviceHandler, 34 QEMUHandler, 35 SimulationHandler 36) 37from twisterlib.hardwaremap import ( 38 DUT 39) 40 41@pytest.fixture 42def mocked_instance(tmp_path): 43 instance = mock.Mock() 44 45 testsuite = mock.Mock() 46 type(testsuite).source_dir = mock.PropertyMock(return_value='') 47 instance.testsuite = testsuite 48 49 build_dir = tmp_path / 'build_dir' 50 os.makedirs(build_dir) 51 type(instance).build_dir = mock.PropertyMock(return_value=str(build_dir)) 52 53 platform = mock.Mock() 54 type(platform).binaries = mock.PropertyMock(return_value=[]) 55 instance.platform = platform 56 57 type(instance.testsuite).timeout = mock.PropertyMock(return_value=60) 58 type(instance.platform).timeout_multiplier = mock.PropertyMock( 59 return_value=2 60 ) 61 62 instance.status = TwisterStatus.NONE 63 instance.reason = 'Unknown' 64 65 return instance 66 67 68@pytest.fixture 69def faux_timer(): 70 class Counter: 71 def __init__(self): 72 self.t = 0 73 74 def time(self): 75 self.t += 1 76 return self.t 77 78 return Counter() 79 80 81TESTDATA_1 = [ 82 (True, False, 'posix', ['Install pyserial python module with pip to use' \ 83 ' --device-testing option.'], None), 84 (False, True, 'nt', [], None), 85 (True, True, 'posix', ['Install pyserial python module with pip to use' \ 86 ' --device-testing option.'], ImportError), 87] 88 89@pytest.mark.parametrize( 90 'fail_serial, fail_pty, os_name, expected_outs, expected_error', 91 TESTDATA_1, 92 ids=['import serial', 'import pty nt', 'import serial+pty posix'] 93) 94def test_imports( 95 capfd, 96 fail_serial, 97 fail_pty, 98 os_name, 99 expected_outs, 100 expected_error 101): 102 class ImportRaiser: 103 def find_spec(self, fullname, path, target=None): 104 if fullname == 'serial' and fail_serial: 105 raise ImportError() 106 if fullname == 'pty' and fail_pty: 107 raise ImportError() 108 109 modules_mock = sys.modules.copy() 110 modules_mock['serial'] = None if fail_serial else modules_mock['serial'] 111 modules_mock['pty'] = None if fail_pty else modules_mock['pty'] 112 113 meta_path_mock = sys.meta_path[:] 114 meta_path_mock.insert(0, ImportRaiser()) 115 116 with mock.patch('os.name', os_name), \ 117 mock.patch.dict('sys.modules', modules_mock, clear=True), \ 118 mock.patch('sys.meta_path', meta_path_mock), \ 119 pytest.raises(expected_error) if expected_error else nullcontext(): 120 reload(twisterlib.handlers) 121 122 out, _ = capfd.readouterr() 123 assert all([expected_out in out for expected_out in expected_outs]) 124 125 126def test_handler_final_handle_actions(mocked_instance): 127 instance = mocked_instance 128 instance.testcases = [mock.Mock()] 129 130 handler = Handler(mocked_instance, 'build', mock.Mock()) 131 handler.suite_name_check = True 132 133 harness = twisterlib.harness.Test() 134 harness.status = TwisterStatus.NONE 135 harness.detected_suite_names = mock.Mock() 136 harness.matched_run_id = False 137 harness.run_id_exists = True 138 harness.recording = mock.Mock() 139 140 handler_time = mock.Mock() 141 142 handler._final_handle_actions(harness, handler_time) 143 144 assert handler.instance.status == TwisterStatus.FAIL 145 assert handler.instance.execution_time == handler_time 146 assert handler.instance.reason == 'RunID mismatch' 147 assert all(testcase.status == TwisterStatus.FAIL for \ 148 testcase in handler.instance.testcases) 149 150 handler.instance.reason = 'This reason shan\'t be changed.' 151 handler._final_handle_actions(harness, handler_time) 152 153 instance.assert_has_calls([mock.call.record(harness.recording)]) 154 155 assert handler.instance.reason == 'This reason shan\'t be changed.' 156 157 158TESTDATA_2 = [ 159 (['dummy_testsuite_name'], False), 160 ([], True), 161 (['another_dummy_name', 'yet_another_dummy_name'], True), 162] 163 164@pytest.mark.parametrize( 165 'detected_suite_names, should_be_called', 166 TESTDATA_2, 167 ids=['detected one expected', 'detected none', 'detected two unexpected'] 168) 169def test_handler_verify_ztest_suite_name( 170 mocked_instance, 171 detected_suite_names, 172 should_be_called 173): 174 instance = mocked_instance 175 type(instance.testsuite).ztest_suite_names = ['dummy_testsuite_name'] 176 177 harness_status = TwisterStatus.PASS 178 179 handler_time = mock.Mock() 180 181 with mock.patch.object(Handler, '_missing_suite_name') as _missing_mocked: 182 handler = Handler(instance, 'build', mock.Mock()) 183 handler._verify_ztest_suite_name( 184 harness_status, 185 detected_suite_names, 186 handler_time 187 ) 188 189 if should_be_called: 190 _missing_mocked.assert_called_once() 191 else: 192 _missing_mocked.assert_not_called() 193 194 195def test_handler_missing_suite_name(mocked_instance): 196 instance = mocked_instance 197 instance.testcases = [mock.Mock()] 198 199 handler = Handler(mocked_instance, 'build', mock.Mock()) 200 handler.suite_name_check = True 201 202 expected_suite_names = ['dummy_testsuite_name'] 203 204 handler_time = mock.Mock() 205 206 handler._missing_suite_name(expected_suite_names, handler_time) 207 208 assert handler.instance.status == TwisterStatus.FAIL 209 assert handler.instance.execution_time == handler_time 210 assert handler.instance.reason == 'Testsuite mismatch' 211 assert all( 212 testcase.status == TwisterStatus.FAIL for testcase in handler.instance.testcases 213 ) 214 215 216def test_handler_terminate(mocked_instance): 217 def mock_kill_function(pid, sig): 218 if pid < 0: 219 raise ProcessLookupError 220 221 instance = mocked_instance 222 223 handler = Handler(instance, 'build', mock.Mock()) 224 225 mock_process = mock.Mock() 226 mock_child1 = mock.Mock(pid=1) 227 mock_child2 = mock.Mock(pid=2) 228 mock_process.children = mock.Mock(return_value=[mock_child1, mock_child2]) 229 230 mock_proc = mock.Mock(pid=0) 231 mock_proc.terminate = mock.Mock(return_value=None) 232 mock_proc.kill = mock.Mock(return_value=None) 233 234 with mock.patch('psutil.Process', return_value=mock_process), \ 235 mock.patch( 236 'os.kill', 237 mock.Mock(side_effect=mock_kill_function) 238 ) as mock_kill: 239 handler.terminate(mock_proc) 240 241 assert handler.terminated 242 mock_proc.terminate.assert_called_once() 243 mock_proc.kill.assert_called_once() 244 mock_kill.assert_has_calls( 245 [mock.call(1, signal.SIGTERM), mock.call(2, signal.SIGTERM)] 246 ) 247 248 mock_child_neg1 = mock.Mock(pid=-1) 249 mock_process.children = mock.Mock( 250 return_value=[mock_child_neg1, mock_child2] 251 ) 252 handler.terminated = False 253 mock_kill.reset_mock() 254 255 handler.terminate(mock_proc) 256 257 mock_kill.assert_has_calls( 258 [mock.call(-1, signal.SIGTERM), mock.call(2, signal.SIGTERM)] 259 ) 260 261 262def test_binaryhandler_try_kill_process_by_pid(mocked_instance): 263 def mock_kill_function(pid, sig): 264 if pid < 0: 265 raise ProcessLookupError 266 267 instance = mocked_instance 268 269 handler = BinaryHandler(instance, 'build', mock.Mock()) 270 handler.pid_fn = os.path.join('dummy', 'path', 'to', 'pid.pid') 271 272 with mock.patch( 273 'os.kill', 274 mock.Mock(side_effect=mock_kill_function) 275 ) as mock_kill, \ 276 mock.patch('os.unlink', mock.Mock()) as mock_unlink: 277 with mock.patch('builtins.open', mock.mock_open(read_data='1')): 278 handler.try_kill_process_by_pid() 279 280 mock_unlink.assert_called_once_with( 281 os.path.join('dummy', 'path', 'to', 'pid.pid') 282 ) 283 mock_kill.assert_called_once_with(1, signal.SIGKILL) 284 285 mock_unlink.reset_mock() 286 mock_kill.reset_mock() 287 handler.pid_fn = os.path.join('dummy', 'path', 'to', 'pid.pid') 288 289 with mock.patch('builtins.open', mock.mock_open(read_data='-1')): 290 handler.try_kill_process_by_pid() 291 292 mock_unlink.assert_called_once_with( 293 os.path.join('dummy', 'path', 'to', 'pid.pid') 294 ) 295 mock_kill.assert_called_once_with(-1, signal.SIGKILL) 296 297 298TESTDATA_3 = [ 299 ( 300 [b'This\\r\\n\n', b'is\r', b'some \x1B[31mANSI\x1B[39m in\n', b'a short\n', b'file.'], 301 mock.Mock(status=TwisterStatus.NONE, capture_coverage=False), 302 [ 303 mock.call('This\\r\\n\n'), 304 mock.call('is\r'), 305 mock.call('some ANSI in\n'), 306 mock.call('a short\n'), 307 mock.call('file.') 308 ], 309 [ 310 mock.call('This'), 311 mock.call('is'), 312 mock.call('some \x1B[31mANSI\x1B[39m in'), 313 mock.call('a short'), 314 mock.call('file.') 315 ], 316 None, 317 False 318 ), 319 ( 320 [b'Too much.'] * 120, # Should be more than the timeout 321 mock.Mock(status=TwisterStatus.PASS, capture_coverage=False), 322 None, 323 None, 324 True, 325 False 326 ), 327 ( 328 [b'Too much.'] * 120, # Should be more than the timeout 329 mock.Mock(status=TwisterStatus.PASS, capture_coverage=False), 330 None, 331 None, 332 True, 333 False 334 ), 335 ( 336 [b'Too much.'] * 120, # Should be more than the timeout 337 mock.Mock(status=TwisterStatus.PASS, capture_coverage=True), 338 None, 339 None, 340 False, 341 True 342 ), 343] 344 345@pytest.mark.parametrize( 346 'proc_stdout, harness, expected_handler_calls,' 347 ' expected_harness_calls, should_be_less, timeout_wait', 348 TESTDATA_3, 349 ids=[ 350 'no timeout', 351 'timeout', 352 'timeout with harness status', 353 'timeout with capture_coverage, wait timeout' 354 ] 355) 356def test_binaryhandler_output_handler( 357 mocked_instance, 358 faux_timer, 359 proc_stdout, 360 harness, 361 expected_handler_calls, 362 expected_harness_calls, 363 should_be_less, 364 timeout_wait 365): 366 class MockStdout(mock.Mock): 367 def __init__(self, text): 368 super().__init__(text) 369 self.text = text 370 self.line_index = 0 371 372 def readline(self): 373 if self.line_index == len(self.text): 374 self.line_index = 0 375 return b'' 376 else: 377 line = self.text[self.line_index] 378 self.line_index += 1 379 return line 380 381 class MockProc(mock.Mock): 382 def __init__(self, pid, stdout): 383 super().__init__(pid, stdout) 384 self.pid = mock.PropertyMock(return_value=pid) 385 self.stdout = MockStdout(stdout) 386 387 def wait(self, *args, **kwargs): 388 if timeout_wait: 389 raise TimeoutExpired('dummy cmd', 'dummyamount') 390 391 handler = BinaryHandler(mocked_instance, 'build', mock.Mock(timeout_multiplier=1)) 392 handler.terminate = mock.Mock() 393 394 proc = MockProc(1, proc_stdout) 395 396 with mock.patch( 397 'builtins.open', 398 mock.mock_open(read_data='') 399 ) as mock_file, \ 400 mock.patch('time.time', side_effect=faux_timer.time): 401 handler._output_handler(proc, harness) 402 403 mock_file.assert_called_with(handler.log, 'w') 404 405 if expected_handler_calls: 406 mock_file.return_value.write.assert_has_calls(expected_handler_calls) 407 if expected_harness_calls: 408 harness.handle.assert_has_calls(expected_harness_calls) 409 if should_be_less is not None: 410 if should_be_less: 411 assert mock_file.return_value.write.call_count < len(proc_stdout) 412 else: 413 assert mock_file.return_value.write.call_count == len(proc_stdout) 414 if timeout_wait: 415 handler.terminate.assert_called_once_with(proc) 416 417 418TESTDATA_4 = [ 419 (True, False, True, None, None, 420 ['valgrind', '--error-exitcode=2', '--leak-check=full', 421 f'--suppressions={ZEPHYR_BASE}/scripts/valgrind.supp', 422 '--log-file=build_dir/valgrind.log', '--track-origins=yes', 423 'generator']), 424 (False, True, False, 123, None, ['generator', '-C', 'build_dir', 'run', '--seed=123']), 425 (False, False, False, None, ['ex1', 'ex2'], ['build_dir/zephyr/zephyr.exe', 'ex1', 'ex2']), 426] 427 428@pytest.mark.parametrize( 429 'robot_test, call_make_run, enable_valgrind, seed,' \ 430 ' extra_args, expected', 431 TESTDATA_4, 432 ids=['robot, valgrind', 'make run, seed', 'binary, extra'] 433) 434def test_binaryhandler_create_command( 435 mocked_instance, 436 robot_test, 437 call_make_run, 438 enable_valgrind, 439 seed, 440 extra_args, 441 expected 442): 443 options = SimpleNamespace() 444 options.enable_valgrind = enable_valgrind 445 options.coverage_basedir = "coverage_basedir" 446 options.sim_name = None 447 handler = BinaryHandler(mocked_instance, 'build', options, 'generator', False) 448 handler.binary = 'bin' 449 handler.call_make_run = call_make_run 450 handler.seed = seed 451 handler.extra_test_args = extra_args 452 handler.build_dir = 'build_dir' 453 handler.instance.sysbuild = False 454 handler.platform = SimpleNamespace() 455 handler.platform.resc = "file.resc" 456 handler.platform.uart = "uart" 457 458 command = handler._create_command(robot_test) 459 460 assert command == expected 461 462 463TESTDATA_5 = [ 464 (False, False, False), 465 (True, False, False), 466 (True, True, False), 467 (False, False, True), 468] 469 470@pytest.mark.parametrize( 471 'enable_asan, enable_lsan, enable_ubsan', 472 TESTDATA_5, 473 ids=['none', 'asan', 'asan, lsan', 'ubsan'] 474) 475def test_binaryhandler_create_env( 476 mocked_instance, 477 enable_asan, 478 enable_lsan, 479 enable_ubsan 480): 481 handler = BinaryHandler(mocked_instance, 'build', mock.Mock( 482 enable_asan=enable_asan, 483 enable_lsan=enable_lsan, 484 enable_ubsan=enable_ubsan 485 )) 486 487 env = { 488 'example_env_var': True, 489 'ASAN_OPTIONS': 'dummy=dummy:', 490 'UBSAN_OPTIONS': 'dummy=dummy:' 491 } 492 493 with mock.patch('os.environ', env): 494 res = handler._create_env() 495 496 assert env['example_env_var'] == res['example_env_var'] 497 498 if enable_ubsan: 499 assert env['UBSAN_OPTIONS'] in res['UBSAN_OPTIONS'] 500 assert 'log_path=stdout:' in res['UBSAN_OPTIONS'] 501 assert 'halt_on_error=1:' in res['UBSAN_OPTIONS'] 502 503 if enable_asan: 504 assert env['ASAN_OPTIONS'] in res['ASAN_OPTIONS'] 505 assert 'log_path=stdout:' in res['ASAN_OPTIONS'] 506 507 if not enable_lsan: 508 assert 'detect_leaks=0' in res['ASAN_OPTIONS'] 509 510 511TESTDATA_6 = [ 512 (TwisterStatus.NONE, False, 2, True, TwisterStatus.FAIL, 'Valgrind error', False), 513 (TwisterStatus.NONE, False, 1, False, TwisterStatus.FAIL, 'Failed (rc=1)', False), 514 (TwisterStatus.FAIL, False, 0, False, TwisterStatus.FAIL, "Failed harness:'foobar'", False), 515 ('success', False, 0, False, 'success', 'Unknown', False), 516 (TwisterStatus.NONE, True, 1, True, TwisterStatus.FAIL, 'Timeout', True), 517] 518 519@pytest.mark.parametrize( 520 'harness_status, terminated, returncode, enable_valgrind,' \ 521 ' expected_status, expected_reason, do_add_missing', 522 TESTDATA_6, 523 ids=['valgrind error', 'failed', 'harness failed', 'custom success', 'no status'] 524) 525def test_binaryhandler_update_instance_info( 526 mocked_instance, 527 harness_status, 528 terminated, 529 returncode, 530 enable_valgrind, 531 expected_status, 532 expected_reason, 533 do_add_missing 534): 535 handler = BinaryHandler(mocked_instance, 'build', mock.Mock( 536 enable_valgrind=enable_valgrind 537 )) 538 handler_time = 59 539 handler.terminated = terminated 540 handler.returncode = returncode 541 missing_mock = mock.Mock() 542 handler.instance.add_missing_case_status = missing_mock 543 mocked_harness = mock.Mock(status=harness_status, reason="foobar") 544 545 handler._update_instance_info(mocked_harness, handler_time) 546 547 assert handler.instance.execution_time == handler_time 548 549 assert handler.instance.status == expected_status 550 assert handler.instance.reason == expected_reason 551 552 if do_add_missing: 553 missing_mock.assert_called_once_with(TwisterStatus.BLOCK, expected_reason) 554 555 556TESTDATA_7 = [ 557 (True, False, False), 558 (False, True, False), 559 (False, False, True), 560] 561 562@pytest.mark.parametrize( 563 'is_robot_test, coverage, isatty', 564 TESTDATA_7, 565 ids=['robot test', 'coverage', 'isatty'] 566) 567def test_binaryhandler_handle( 568 mocked_instance, 569 caplog, 570 is_robot_test, 571 coverage, 572 isatty 573): 574 thread_mock_obj = mock.Mock() 575 576 def mock_popen(command, *args, **kwargs,): 577 return mock.Mock( 578 __enter__=mock.Mock(return_value=mock.Mock(pid=0, returncode=0)), 579 __exit__=mock.Mock(return_value=None) 580 ) 581 582 def mock_thread(target, *args, **kwargs): 583 return thread_mock_obj 584 585 handler = BinaryHandler(mocked_instance, 'build', mock.Mock(coverage=coverage)) 586 handler.sourcedir = 'source_dir' 587 handler.build_dir = 'build_dir' 588 handler.name= 'Dummy Name' 589 handler._create_command = mock.Mock(return_value=['dummy' , 'command']) 590 handler._create_env = mock.Mock(return_value=[]) 591 handler._update_instance_info = mock.Mock() 592 handler._final_handle_actions = mock.Mock() 593 handler.terminate = mock.Mock() 594 handler.try_kill_process_by_pid = mock.Mock() 595 596 robot_mock = mock.Mock() 597 harness = mock.Mock(is_robot_test=is_robot_test, run_robot_test=robot_mock) 598 599 popen_mock = mock.Mock(side_effect=mock_popen) 600 thread_mock = mock.Mock(side_effect=mock_thread) 601 call_mock = mock.Mock() 602 603 with mock.patch('subprocess.call', call_mock), \ 604 mock.patch('subprocess.Popen', popen_mock), \ 605 mock.patch('threading.Thread', thread_mock), \ 606 mock.patch('sys.stdout.isatty', return_value=isatty): 607 handler.handle(harness) 608 609 if is_robot_test: 610 robot_mock.assert_called_once_with(['dummy', 'command'], mock.ANY) 611 return 612 613 assert 'Spawning BinaryHandler Thread for Dummy Name' in caplog.text 614 615 thread_mock_obj.join.assert_called() 616 handler._update_instance_info.assert_called_once() 617 handler._final_handle_actions.assert_called_once() 618 619 if isatty: 620 call_mock.assert_any_call(['stty', 'sane'], stdin=mock.ANY) 621 622 623TESTDATA_8 = [ 624 ('renode', True, True, False, False), 625 ('native', False, False, False, True), 626 ('build', False, True, False, False), 627] 628 629@pytest.mark.parametrize( 630 'type_str, is_pid_fn, expected_call_make_run, is_binary, expected_ready', 631 TESTDATA_8, 632 ids=[t[0] for t in TESTDATA_8] 633) 634def test_simulationhandler_init( 635 mocked_instance, 636 type_str, 637 is_pid_fn, 638 expected_call_make_run, 639 is_binary, 640 expected_ready 641): 642 handler = SimulationHandler(mocked_instance, type_str, mock.Mock()) 643 644 assert handler.call_make_run == expected_call_make_run 645 assert handler.ready == expected_ready 646 647 if is_pid_fn: 648 assert handler.pid_fn == os.path.join(mocked_instance.build_dir, 649 'renode.pid') 650 if is_binary: 651 assert handler.pid_fn == os.path.join(mocked_instance.build_dir, 652 'zephyr', 'zephyr.exe') 653 654 655TESTDATA_9 = [ 656 (3, 2, 0, 0, 3, -1, True, False, False, 1), 657 (4, 1, 0, 0, -1, -1, False, True, False, 0), 658 (5, 0, 1, 2, -1, 4, False, False, True, 3) 659] 660 661@pytest.mark.parametrize( 662 'success_count, in_waiting_count, oserror_count, readline_error_count,' 663 ' haltless_count, statusless_count, end_by_halt, end_by_close,' 664 ' end_by_status, expected_line_count', 665 TESTDATA_9, 666 ids=[ 667 'halt event', 668 'serial closes', 669 'harness status with errors' 670 ] 671) 672def test_devicehandler_monitor_serial( 673 mocked_instance, 674 success_count, 675 in_waiting_count, 676 oserror_count, 677 readline_error_count, 678 haltless_count, 679 statusless_count, 680 end_by_halt, 681 end_by_close, 682 end_by_status, 683 expected_line_count 684): 685 is_open_iter = iter(lambda: True, False) 686 line_iter = [ 687 TypeError('dummy TypeError') if x % 2 else \ 688 SerialException('dummy SerialException') for x in range( 689 readline_error_count 690 ) 691 ] + [ 692 f'line no {idx}'.encode('utf-8') for idx in range(success_count) 693 ] 694 in_waiting_iter = [False] * in_waiting_count + [ 695 TypeError('dummy TypeError') 696 ] if end_by_close else ( 697 [OSError('dummy OSError')] * oserror_count + [False] * in_waiting_count 698 ) + [True] * (success_count + readline_error_count) 699 700 is_set_iter = [False] * haltless_count + [True] \ 701 if end_by_halt else iter(lambda: False, True) 702 703 status_iter = [TwisterStatus.NONE] * statusless_count + [TwisterStatus.PASS] \ 704 if end_by_status else iter(lambda: TwisterStatus.NONE, TwisterStatus.PASS) 705 706 halt_event = mock.Mock(is_set=mock.Mock(side_effect=is_set_iter)) 707 ser = mock.Mock( 708 isOpen=mock.Mock(side_effect=is_open_iter), 709 readline=mock.Mock(side_effect=line_iter) 710 ) 711 type(ser).in_waiting = mock.PropertyMock( 712 side_effect=in_waiting_iter, 713 return_value=False 714 ) 715 harness = mock.Mock(capture_coverage=False) 716 type(harness).status=mock.PropertyMock(side_effect=status_iter) 717 718 handler = DeviceHandler(mocked_instance, 'build', mock.Mock(enable_coverage=not end_by_status)) 719 720 with mock.patch('builtins.open', mock.mock_open(read_data='')): 721 handler.monitor_serial(ser, halt_event, harness) 722 723 if not end_by_close: 724 ser.close.assert_called_once() 725 726 print(harness.call_args_list) 727 728 harness.handle.assert_has_calls( 729 [mock.call(f'line no {idx}') for idx in range(expected_line_count)] 730 ) 731 732 733def test_devicehandler_monitor_serial_splitlines(mocked_instance): 734 halt_event = mock.Mock(is_set=mock.Mock(return_value=False)) 735 ser = mock.Mock( 736 isOpen=mock.Mock(side_effect=[True, True, False]), 737 in_waiting=mock.Mock(return_value=False), 738 readline=mock.Mock(return_value='\nline1\nline2\n'.encode('utf-8')) 739 ) 740 harness = mock.Mock(status=TwisterStatus.PASS) 741 742 handler = DeviceHandler(mocked_instance, 'build', mock.Mock(enable_coverage=False)) 743 744 with mock.patch('builtins.open', mock.mock_open(read_data='')): 745 handler.monitor_serial(ser, halt_event, harness) 746 747 assert harness.handle.call_count == 2 748 749 750TESTDATA_10 = [ 751 ( 752 'dummy_platform', 753 'dummy fixture', 754 [ 755 mock.Mock( 756 fixtures=[], 757 platform='dummy_platform', 758 available=1, 759 failures=0, 760 counter_increment=mock.Mock(), 761 counter=0 762 ), 763 mock.Mock( 764 fixtures=['dummy fixture'], 765 platform='another_platform', 766 available=1, 767 failures=0, 768 counter_increment=mock.Mock(), 769 counter=0 770 ), 771 mock.Mock( 772 fixtures=['dummy fixture'], 773 platform='dummy_platform', 774 serial_pty=None, 775 serial=None, 776 available=1, 777 failures=0, 778 counter_increment=mock.Mock(), 779 counter=0 780 ), 781 mock.Mock( 782 fixtures=['dummy fixture'], 783 platform='dummy_platform', 784 serial_pty=mock.Mock(), 785 available=1, 786 failures=0, 787 counter_increment=mock.Mock(), 788 counter=0 789 ), 790 mock.Mock( 791 fixtures=['dummy fixture'], 792 platform='dummy_platform', 793 serial_pty=mock.Mock(), 794 available=1, 795 failures=0, 796 counter_increment=mock.Mock(), 797 counter=0 798 ) 799 ], 800 3 801 ), 802 ( 803 'dummy_platform', 804 'dummy fixture', 805 [ 806 mock.Mock( 807 fixtures=[], 808 platform='dummy_platform', 809 available=1, 810 failures=0, 811 counter_increment=mock.Mock(), 812 counter=0 813 ), 814 mock.Mock( 815 fixtures=['dummy fixture'], 816 platform='another_platform', 817 available=1, 818 failures=0, 819 counter_increment=mock.Mock(), 820 counter=0 821 ), 822 mock.Mock( 823 fixtures=['dummy fixture'], 824 platform='dummy_platform', 825 serial_pty=None, 826 serial=None, 827 available=1, 828 failures=0, 829 counter_increment=mock.Mock(), 830 counter=0 831 ), 832 mock.Mock( 833 fixtures=['dummy fixture'], 834 platform='dummy_platform', 835 serial_pty=mock.Mock(), 836 available=1, 837 failures=1, 838 counter_increment=mock.Mock(), 839 counter=0 840 ), 841 mock.Mock( 842 fixtures=['dummy fixture'], 843 platform='dummy_platform', 844 serial_pty=mock.Mock(), 845 available=1, 846 failures=0, 847 counter_increment=mock.Mock(), 848 counter=0 849 ) 850 ], 851 4 852 ), 853 ( 854 'dummy_platform', 855 'dummy fixture', 856 [], 857 TwisterException 858 ), 859 ( 860 'dummy_platform', 861 'dummy fixture', 862 [ 863 mock.Mock( 864 fixtures=['dummy fixture'], 865 platform='dummy_platform', 866 serial_pty=mock.Mock(), 867 counter_increment=mock.Mock(), 868 failures=0, 869 available=0 870 ), 871 mock.Mock( 872 fixtures=['another fixture'], 873 platform='dummy_platform', 874 serial_pty=mock.Mock(), 875 counter_increment=mock.Mock(), 876 failures=0, 877 available=0 878 ), 879 mock.Mock( 880 fixtures=['dummy fixture'], 881 platform='dummy_platform', 882 serial=mock.Mock(), 883 counter_increment=mock.Mock(), 884 failures=0, 885 available=0 886 ), 887 mock.Mock( 888 fixtures=['another fixture'], 889 platform='dummy_platform', 890 serial=mock.Mock(), 891 counter_increment=mock.Mock(), 892 failures=0, 893 available=0 894 ) 895 ], 896 None 897 ) 898] 899 900@pytest.mark.parametrize( 901 'platform_name, fixture, duts, expected', 902 TESTDATA_10, 903 ids=['two good duts, select the first one', 904 'two duts, the first was failed once, select the second not failed', 905 'exception - no duts', 'no available duts'] 906) 907def test_devicehandler_device_is_available( 908 mocked_instance, 909 platform_name, 910 fixture, 911 duts, 912 expected 913): 914 mocked_instance.platform.name = platform_name 915 mocked_instance.testsuite.harness_config = {'fixture': fixture} 916 917 handler = DeviceHandler(mocked_instance, 'build', mock.Mock()) 918 handler.duts = duts 919 920 if isinstance(expected, int): 921 device = handler.device_is_available(mocked_instance) 922 923 assert device == duts[expected] 924 assert device.available == 0 925 device.counter_increment.assert_called_once() 926 elif expected is None: 927 device = handler.device_is_available(mocked_instance) 928 929 assert device is None 930 elif isinstance(expected, type): 931 with pytest.raises(expected): 932 device = handler.device_is_available(mocked_instance) 933 else: 934 assert False 935 936 937def test_devicehandler_make_dut_available(mocked_instance): 938 serial = mock.Mock(name='dummy_serial') 939 duts = [ 940 mock.Mock(available=0, serial=serial, serial_pty=None), 941 mock.Mock(available=0, serial=None, serial_pty=serial), 942 mock.Mock( 943 available=0, 944 serial=mock.Mock('another_serial'), 945 serial_pty=None 946 ) 947 ] 948 949 handler = DeviceHandler(mocked_instance, 'build', mock.Mock()) 950 handler.duts = duts 951 952 handler.make_dut_available(duts[1]) 953 954 assert len([None for d in handler.duts if d.available == 1]) == 1 955 assert handler.duts[0].available == 0 956 assert handler.duts[2].available == 0 957 958 handler.make_dut_available(duts[0]) 959 960 assert len([None for d in handler.duts if d.available == 1]) == 2 961 assert handler.duts[2].available == 0 962 963 964TESTDATA_11 = [ 965 (mock.Mock(pid=0, returncode=0), False), 966 (mock.Mock(pid=0, returncode=1), False), 967 (mock.Mock(pid=0, returncode=1), True) 968] 969 970@pytest.mark.parametrize( 971 'mock_process, raise_timeout', 972 TESTDATA_11, 973 ids=['proper script', 'error', 'timeout'] 974) 975def test_devicehandler_run_custom_script(caplog, mock_process, raise_timeout): 976 def raise_timeout_fn(timeout=-1): 977 if raise_timeout and timeout != -1: 978 raise subprocess.TimeoutExpired(None, timeout) 979 else: 980 return mock.Mock(), mock.Mock() 981 982 def assert_popen(command, *args, **kwargs): 983 return mock.Mock( 984 __enter__=mock.Mock(return_value=mock_process), 985 __exit__=mock.Mock(return_value=None) 986 ) 987 988 mock_process.communicate = mock.Mock(side_effect=raise_timeout_fn) 989 990 script = [os.path.join('test','script', 'path'), 'arg'] 991 timeout = 60 992 993 with mock.patch('subprocess.Popen', side_effect=assert_popen): 994 DeviceHandler.run_custom_script(script, timeout) 995 996 if raise_timeout: 997 assert all( 998 t in caplog.text.lower() for t in [str(script), 'timed out'] 999 ) 1000 mock_process.assert_has_calls( 1001 [ 1002 mock.call.communicate(timeout=timeout), 1003 mock.call.kill(), 1004 mock.call.communicate() 1005 ] 1006 ) 1007 elif mock_process.returncode == 0: 1008 assert not any([r.levelname == 'ERROR' for r in caplog.records]) 1009 else: 1010 assert 'timed out' not in caplog.text.lower() 1011 assert 'custom script failure' in caplog.text.lower() 1012 1013 1014TESTDATA_12 = [ 1015 (0, False), 1016 (4, False), 1017 (0, True) 1018] 1019 1020@pytest.mark.parametrize( 1021 'num_of_failures, raise_exception', 1022 TESTDATA_12, 1023 ids=['no failures', 'with failures', 'exception'] 1024) 1025def test_devicehandler_get_hardware( 1026 mocked_instance, 1027 caplog, 1028 num_of_failures, 1029 raise_exception 1030): 1031 expected_hardware = mock.Mock() 1032 1033 def mock_availability(handler, instance, no=num_of_failures): 1034 if raise_exception: 1035 raise TwisterException(f'dummy message') 1036 if handler.no: 1037 handler.no -= 1 1038 return None 1039 return expected_hardware 1040 1041 handler = DeviceHandler(mocked_instance, 'build', mock.Mock()) 1042 handler.no = num_of_failures 1043 1044 with mock.patch.object( 1045 DeviceHandler, 1046 'device_is_available', 1047 mock_availability 1048 ): 1049 hardware = handler.get_hardware() 1050 1051 if raise_exception: 1052 assert 'dummy message' in caplog.text.lower() 1053 assert mocked_instance.status == TwisterStatus.FAIL 1054 assert mocked_instance.reason == 'dummy message' 1055 else: 1056 assert hardware == expected_hardware 1057 1058 1059TESTDATA_13 = [ 1060 ( 1061 None, 1062 None, 1063 None, 1064 ['generator_cmd', '-C', '$build_dir', 'flash'] 1065 ), 1066 ( 1067 [], 1068 None, 1069 None, 1070 ['west', 'flash', '--skip-rebuild', '-d', '$build_dir'] 1071 ), 1072 ( 1073 '--dummy', 1074 None, 1075 None, 1076 ['west', 'flash', '--skip-rebuild', '-d', '$build_dir', 1077 '--', '--dummy'] 1078 ), 1079 ( 1080 '--dummy1,--dummy2', 1081 None, 1082 None, 1083 ['west', 'flash', '--skip-rebuild', '-d', '$build_dir', 1084 '--', '--dummy1', '--dummy2'] 1085 ), 1086 1087 ( 1088 None, 1089 'runner', 1090 'product', 1091 ['west', 'flash', '--skip-rebuild', '-d', '$build_dir', 1092 '--runner', 'runner', 'param1', 'param2'] 1093 ), 1094 1095 ( 1096 None, 1097 'pyocd', 1098 'product', 1099 ['west', 'flash', '--skip-rebuild', '-d', '$build_dir', 1100 '--runner', 'pyocd', 'param1', 'param2', '--', '--dev-id', 12345] 1101 ), 1102 ( 1103 None, 1104 'nrfjprog', 1105 'product', 1106 ['west', 'flash', '--skip-rebuild', '-d', '$build_dir', 1107 '--runner', 'nrfjprog', 'param1', 'param2', '--', '--dev-id', 12345] 1108 ), 1109 ( 1110 None, 1111 'openocd', 1112 'STM32 STLink', 1113 ['west', 'flash', '--skip-rebuild', '-d', '$build_dir', 1114 '--runner', 'openocd', 'param1', 'param2', 1115 '--', '--cmd-pre-init', 'hla_serial 12345'] 1116 ), 1117 ( 1118 None, 1119 'openocd', 1120 'STLINK-V3', 1121 ['west', 'flash', '--skip-rebuild', '-d', '$build_dir', 1122 '--runner', 'openocd', 'param1', 'param2', 1123 '--', '--cmd-pre-init', 'hla_serial 12345'] 1124 ), 1125 ( 1126 None, 1127 'openocd', 1128 'EDBG CMSIS-DAP', 1129 ['west', 'flash', '--skip-rebuild', '-d', '$build_dir', 1130 '--runner', 'openocd', 'param1', 'param2', 1131 '--', '--cmd-pre-init', 'cmsis_dap_serial 12345'] 1132 ), 1133 ( 1134 None, 1135 'jlink', 1136 'product', 1137 ['west', 'flash', '--skip-rebuild', '-d', '$build_dir', 1138 '--runner', 'jlink', '--dev-id', 12345, 1139 'param1', 'param2'] 1140 ), 1141 ( 1142 None, 1143 'stm32cubeprogrammer', 1144 'product', 1145 ['west', 'flash', '--skip-rebuild', '-d', '$build_dir', 1146 '--runner', 'stm32cubeprogrammer', '--tool-opt=sn=12345', 1147 'param1', 'param2'] 1148 ), 1149 1150] 1151 1152TESTDATA_13_2 = [(True), (False)] 1153 1154@pytest.mark.parametrize( 1155 'self_west_flash, runner,' \ 1156 ' hardware_product_name, expected', 1157 TESTDATA_13, 1158 ids=['generator', '--west-flash', 'one west flash value', 1159 'multiple west flash values', 'generic runner', 'pyocd', 1160 'nrfjprog', 'openocd, STM32 STLink', 'openocd, STLINK-v3', 1161 'openocd, EDBG CMSIS-DAP', 'jlink', 'stm32cubeprogrammer'] 1162) 1163@pytest.mark.parametrize('hardware_probe', TESTDATA_13_2, ids=['probe', 'id']) 1164def test_devicehandler_create_command( 1165 mocked_instance, 1166 self_west_flash, 1167 runner, 1168 hardware_probe, 1169 hardware_product_name, 1170 expected 1171): 1172 handler = DeviceHandler(mocked_instance, 'build', mock.Mock()) 1173 handler.options = mock.Mock(west_flash=self_west_flash) 1174 handler.generator_cmd = 'generator_cmd' 1175 1176 expected = [handler.build_dir if val == '$build_dir' else \ 1177 val for val in expected] 1178 1179 hardware = mock.Mock( 1180 product=hardware_product_name, 1181 probe_id=12345 if hardware_probe else None, 1182 id=12345 if not hardware_probe else None, 1183 runner_params=['param1', 'param2'] 1184 ) 1185 1186 command = handler._create_command(runner, hardware) 1187 1188 assert command == expected 1189 1190 1191TESTDATA_14 = [ 1192 ('success', False, 'success', 'Unknown', False), 1193 (TwisterStatus.FAIL, False, TwisterStatus.FAIL, "Failed harness:'foobar'", True), 1194 (TwisterStatus.ERROR, False, TwisterStatus.ERROR, 'Unknown', True), 1195 (TwisterStatus.NONE, True, TwisterStatus.NONE, 'Unknown', False), 1196 (TwisterStatus.NONE, False, TwisterStatus.FAIL, 'Timeout', True), 1197] 1198 1199@pytest.mark.parametrize( 1200 'harness_status, flash_error,' \ 1201 ' expected_status, expected_reason, do_add_missing', 1202 TESTDATA_14, 1203 ids=['custom success', 'failed', 'error', 'flash error', 'no status'] 1204) 1205def test_devicehandler_update_instance_info( 1206 mocked_instance, 1207 harness_status, 1208 flash_error, 1209 expected_status, 1210 expected_reason, 1211 do_add_missing 1212 ): 1213 handler = DeviceHandler(mocked_instance, 'build', mock.Mock()) 1214 handler_time = 59 1215 missing_mock = mock.Mock() 1216 handler.instance.add_missing_case_status = missing_mock 1217 mocked_harness = mock.Mock(status=harness_status, reason="foobar") 1218 1219 handler._update_instance_info(mocked_harness, handler_time, flash_error) 1220 1221 assert handler.instance.execution_time == handler_time 1222 1223 assert handler.instance.status == expected_status 1224 assert handler.instance.reason == expected_reason 1225 1226 if do_add_missing: 1227 missing_mock.assert_called_with('blocked', expected_reason) 1228 1229 1230TESTDATA_15 = [ 1231 ('dummy device', 'dummy pty', None, None, True, False, False), 1232 ( 1233 'dummy device', 1234 'dummy pty', 1235 mock.Mock(communicate=mock.Mock(return_value=('', ''))), 1236 SerialException, 1237 False, 1238 True, 1239 'dummy pty' 1240 ), 1241 ( 1242 'dummy device', 1243 None, 1244 None, 1245 SerialException, 1246 False, 1247 False, 1248 'dummy device' 1249 ) 1250] 1251 1252@pytest.mark.parametrize( 1253 'serial_device, serial_pty, ser_pty_process, expected_exception,' \ 1254 ' expected_result, terminate_ser_pty_process, make_available', 1255 TESTDATA_15, 1256 ids=['valid', 'serial pty process', 'no serial pty'] 1257) 1258def test_devicehandler_create_serial_connection( 1259 mocked_instance, 1260 serial_device, 1261 serial_pty, 1262 ser_pty_process, 1263 expected_exception, 1264 expected_result, 1265 terminate_ser_pty_process, 1266 make_available 1267): 1268 def mock_serial(*args, **kwargs): 1269 if expected_exception: 1270 raise expected_exception('') 1271 return expected_result 1272 1273 handler = DeviceHandler(mocked_instance, 'build', mock.Mock(timeout_multiplier=1)) 1274 missing_mock = mock.Mock() 1275 handler.instance.add_missing_case_status = missing_mock 1276 twisterlib.handlers.terminate_process = mock.Mock() 1277 1278 dut = DUT() 1279 dut.available = 0 1280 dut.failures = 0 1281 handler.duts = [dut] 1282 1283 hardware_baud = 14400 1284 flash_timeout = 60 1285 serial_mock = mock.Mock(side_effect=mock_serial) 1286 1287 with mock.patch('serial.Serial', serial_mock), \ 1288 pytest.raises(expected_exception) if expected_exception else \ 1289 nullcontext(): 1290 result = handler._create_serial_connection(dut, serial_device, hardware_baud, 1291 flash_timeout, serial_pty, 1292 ser_pty_process) 1293 1294 if expected_result: 1295 assert result is not None 1296 assert dut.failures == 0 1297 1298 if expected_exception: 1299 assert handler.instance.status == TwisterStatus.FAIL 1300 assert handler.instance.reason == 'Serial Device Error' 1301 assert dut.available == 1 1302 assert dut.failures == 1 1303 missing_mock.assert_called_once_with('blocked', 'Serial Device Error') 1304 1305 if terminate_ser_pty_process: 1306 twisterlib.handlers.terminate_process.assert_called_once() 1307 ser_pty_process.communicate.assert_called_once() 1308 1309 1310TESTDATA_16 = [ 1311 ('dummy1 dummy2', None, 'slave name'), 1312 ('dummy1,dummy2', CalledProcessError, None), 1313 (None, None, 'dummy hardware serial'), 1314] 1315 1316@pytest.mark.parametrize( 1317 'serial_pty, popen_exception, expected_device', 1318 TESTDATA_16, 1319 ids=['pty', 'pty process error', 'no pty'] 1320) 1321def test_devicehandler_get_serial_device( 1322 mocked_instance, 1323 serial_pty, 1324 popen_exception, 1325 expected_device 1326): 1327 def mock_popen(command, *args, **kwargs): 1328 assert command == ['dummy1', 'dummy2'] 1329 if popen_exception: 1330 raise popen_exception(command, 'Dummy error') 1331 return mock.Mock() 1332 1333 handler = DeviceHandler(mocked_instance, 'build', mock.Mock()) 1334 hardware_serial = 'dummy hardware serial' 1335 1336 popen_mock = mock.Mock(side_effect=mock_popen) 1337 openpty_mock = mock.Mock(return_value=('master', 'slave')) 1338 ttyname_mock = mock.Mock(side_effect=lambda x: x + ' name') 1339 1340 with mock.patch('subprocess.Popen', popen_mock), \ 1341 mock.patch('pty.openpty', openpty_mock), \ 1342 mock.patch('os.ttyname', ttyname_mock): 1343 result = handler._get_serial_device(serial_pty, hardware_serial) 1344 1345 if popen_exception: 1346 assert result is None 1347 else: 1348 assert result[0] == expected_device 1349 1350TESTDATA_17 = [ 1351 (False, False, False, False, None, False, False, 1352 TwisterStatus.NONE, None, []), 1353 (True, True, False, False, None, False, False, 1354 TwisterStatus.NONE, None, []), 1355 (True, False, True, False, None, False, False, 1356 TwisterStatus.ERROR, 'Device issue (Flash error)', []), 1357 (True, False, False, True, None, False, False, 1358 TwisterStatus.ERROR, 'Device issue (Timeout)', ['Flash operation timed out.']), 1359 (True, False, False, False, 1, False, False, 1360 TwisterStatus.ERROR, 'Device issue (Flash error?)', []), 1361 (True, False, False, False, 0, True, False, 1362 TwisterStatus.NONE, None, ['Timed out while monitoring serial output on IPName']), 1363 (True, False, False, False, 0, False, True, 1364 TwisterStatus.NONE, None, ["Terminating serial-pty:'Serial PTY'", 1365 "Terminated serial-pty:'Serial PTY', stdout:'', stderr:''"]), 1366] 1367 1368@pytest.mark.parametrize( 1369 'has_hardware, raise_create_serial, raise_popen, raise_timeout,' \ 1370 ' returncode, do_timeout_thread, use_pty,' \ 1371 ' expected_status, expected_reason, expected_logs', 1372 TESTDATA_17, 1373 ids=['no hardware', 'create serial failure', 'popen called process error', 1374 'communicate timeout', 'nonzero returncode', 'valid pty', 'valid dev'] 1375) 1376def test_devicehandler_handle( 1377 mocked_instance, 1378 caplog, 1379 has_hardware, 1380 raise_create_serial, 1381 raise_popen, 1382 raise_timeout, 1383 returncode, 1384 do_timeout_thread, 1385 use_pty, 1386 expected_status, 1387 expected_reason, 1388 expected_logs 1389): 1390 def mock_get_serial(serial_pty, hardware_serial): 1391 if serial_pty: 1392 serial_pty_process = mock.Mock( 1393 name='dummy serial PTY process', 1394 communicate=mock.Mock( 1395 return_value=('', '') 1396 ) 1397 ) 1398 return 'dummy serial PTY device', serial_pty_process 1399 return 'dummy serial device', None 1400 1401 def mock_create_serial(*args, **kwargs): 1402 if raise_create_serial: 1403 raise SerialException('dummy cmd', 'dummy msg') 1404 return mock.Mock(name='dummy serial') 1405 1406 def mock_thread(*args, **kwargs): 1407 is_alive_mock = mock.Mock(return_value=bool(do_timeout_thread)) 1408 1409 return mock.Mock(is_alive=is_alive_mock) 1410 1411 def mock_terminate(proc, *args, **kwargs): 1412 proc.communicate = mock.Mock(return_value=(mock.Mock(), mock.Mock())) 1413 1414 def mock_communicate(*args, **kwargs): 1415 if raise_timeout: 1416 raise TimeoutExpired('dummy cmd', 'dummyamount') 1417 return mock.Mock(), mock.Mock() 1418 1419 def mock_popen(command, *args, **kwargs): 1420 if raise_popen: 1421 raise CalledProcessError('dummy proc', 'dummy msg') 1422 1423 mock_process = mock.Mock( 1424 pid=1, 1425 returncode=returncode, 1426 communicate=mock.Mock(side_effect=mock_communicate) 1427 ) 1428 1429 return mock.Mock( 1430 __enter__=mock.Mock(return_value=mock_process), 1431 __exit__=mock.Mock(return_value=None) 1432 ) 1433 1434 hardware = None if not has_hardware else mock.Mock( 1435 baud=14400, 1436 runner='dummy runner', 1437 serial_pty='Serial PTY' if use_pty else None, 1438 serial='dummy serial', 1439 pre_script='dummy pre script', 1440 post_script='dummy post script', 1441 post_flash_script='dummy post flash script', 1442 flash_timeout=60, 1443 flash_with_test=True 1444 ) 1445 1446 handler = DeviceHandler(mocked_instance, 'build', mock.Mock()) 1447 handler.get_hardware = mock.Mock(return_value=hardware) 1448 handler.options = mock.Mock( 1449 timeout_multiplier=1, 1450 west_flash=None, 1451 west_runner=None 1452 ) 1453 handler._get_serial_device = mock.Mock(side_effect=mock_get_serial) 1454 handler._create_command = mock.Mock(return_value=['dummy', 'command']) 1455 handler.run_custom_script = mock.Mock() 1456 handler._create_serial_connection = mock.Mock( 1457 side_effect=mock_create_serial 1458 ) 1459 handler.monitor_serial = mock.Mock() 1460 handler.terminate = mock.Mock(side_effect=mock_terminate) 1461 handler._update_instance_info = mock.Mock() 1462 handler._final_handle_actions = mock.Mock() 1463 handler.make_dut_available = mock.Mock() 1464 twisterlib.handlers.terminate_process = mock.Mock() 1465 handler.instance.platform.name = 'IPName' 1466 1467 harness = mock.Mock() 1468 1469 with mock.patch('builtins.open', mock.mock_open(read_data='')), \ 1470 mock.patch('subprocess.Popen', side_effect=mock_popen), \ 1471 mock.patch('threading.Event', mock.Mock()), \ 1472 mock.patch('threading.Thread', side_effect=mock_thread): 1473 handler.handle(harness) 1474 1475 handler.get_hardware.assert_called_once() 1476 1477 messages = [record.msg for record in caplog.records] 1478 assert all([msg in messages for msg in expected_logs]) 1479 1480 if not has_hardware: 1481 return 1482 1483 handler.run_custom_script.assert_has_calls([ 1484 mock.call('dummy pre script', mock.ANY) 1485 ]) 1486 1487 if raise_create_serial: 1488 return 1489 1490 handler.run_custom_script.assert_has_calls([ 1491 mock.call('dummy pre script', mock.ANY), 1492 mock.call('dummy post flash script', mock.ANY), 1493 mock.call('dummy post script', mock.ANY) 1494 ]) 1495 1496 if expected_reason: 1497 assert handler.instance.reason == expected_reason 1498 if expected_status: 1499 assert handler.instance.status == expected_status 1500 1501 handler.make_dut_available.assert_called_once_with(hardware) 1502 1503 1504TESTDATA_18 = [ 1505 (True, True, True), 1506 (False, False, False), 1507] 1508 1509@pytest.mark.parametrize( 1510 'ignore_qemu_crash, expected_ignore_crash, expected_ignore_unexpected_eof', 1511 TESTDATA_18, 1512 ids=['ignore crash', 'qemu crash'] 1513) 1514def test_qemuhandler_init( 1515 mocked_instance, 1516 ignore_qemu_crash, 1517 expected_ignore_crash, 1518 expected_ignore_unexpected_eof 1519): 1520 mocked_instance.testsuite.ignore_qemu_crash = ignore_qemu_crash 1521 1522 handler = QEMUHandler(mocked_instance, 'build', mock.Mock()) 1523 1524 assert handler.ignore_qemu_crash == expected_ignore_crash 1525 assert handler.ignore_unexpected_eof == expected_ignore_unexpected_eof 1526 1527 1528def test_qemuhandler_get_cpu_time(): 1529 def mock_process(pid): 1530 return mock.Mock( 1531 cpu_times=mock.Mock( 1532 return_value=mock.Mock( 1533 user=20.0, 1534 system=64.0 1535 ) 1536 ) 1537 ) 1538 1539 with mock.patch('psutil.Process', mock_process): 1540 res = QEMUHandler._get_cpu_time(0) 1541 1542 assert res == pytest.approx(84.0) 1543 1544 1545TESTDATA_19 = [ 1546 ( 1547 True, 1548 os.path.join('self', 'dummy_dir', '1'), 1549 mock.PropertyMock(return_value=os.path.join('dummy_dir', '1')), 1550 os.path.join('dummy_dir', '1') 1551 ), 1552 ( 1553 False, 1554 os.path.join('self', 'dummy_dir', '2'), 1555 mock.PropertyMock(return_value=os.path.join('dummy_dir', '2')), 1556 os.path.join('self', 'dummy_dir', '2') 1557 ), 1558] 1559 1560@pytest.mark.parametrize( 1561 'self_sysbuild, self_build_dir, build_dir, expected', 1562 TESTDATA_19, 1563 ids=['domains build dir', 'self build dir'] 1564) 1565def test_qemuhandler_get_default_domain_build_dir( 1566 mocked_instance, 1567 self_sysbuild, 1568 self_build_dir, 1569 build_dir, 1570 expected 1571): 1572 get_default_domain_mock = mock.Mock() 1573 type(get_default_domain_mock()).build_dir = build_dir 1574 domains_mock = mock.Mock(get_default_domain=get_default_domain_mock) 1575 from_file_mock = mock.Mock(return_value=domains_mock) 1576 1577 handler = QEMUHandler(mocked_instance, 'build', mock.Mock()) 1578 handler.instance.sysbuild = self_sysbuild 1579 handler.build_dir = self_build_dir 1580 1581 with mock.patch('domains.Domains.from_file', from_file_mock): 1582 result = handler.get_default_domain_build_dir() 1583 1584 assert result == expected 1585 1586 1587TESTDATA_20 = [ 1588 ( 1589 os.path.join('self', 'dummy_dir', 'log1'), 1590 os.path.join('self', 'dummy_dir', 'pid1'), 1591 os.path.join('sysbuild', 'dummy_dir', 'bd1'), 1592 True 1593 ), 1594 ( 1595 os.path.join('self', 'dummy_dir', 'log2'), 1596 os.path.join('self', 'dummy_dir', 'pid2'), 1597 os.path.join('sysbuild', 'dummy_dir', 'bd2'), 1598 False 1599 ), 1600] 1601 1602@pytest.mark.parametrize( 1603 'self_log, self_pid_fn, sysbuild_build_dir, exists_pid_fn', 1604 TESTDATA_20, 1605 ids=['pid exists', 'pid missing'] 1606) 1607def test_qemuhandler_set_qemu_filenames( 1608 mocked_instance, 1609 self_log, 1610 self_pid_fn, 1611 sysbuild_build_dir, 1612 exists_pid_fn 1613): 1614 unlink_mock = mock.Mock() 1615 exists_mock = mock.Mock(return_value=exists_pid_fn) 1616 1617 handler = QEMUHandler(mocked_instance, 'build', mock.Mock()) 1618 handler.log = self_log 1619 handler.pid_fn = self_pid_fn 1620 1621 with mock.patch('os.unlink', unlink_mock), \ 1622 mock.patch('os.path.exists', exists_mock): 1623 handler._set_qemu_filenames(sysbuild_build_dir) 1624 1625 assert handler.fifo_fn == mocked_instance.build_dir + \ 1626 os.path.sep + 'qemu-fifo' 1627 1628 assert handler.pid_fn == sysbuild_build_dir + os.path.sep + 'qemu.pid' 1629 1630 assert handler.log_fn == self_log 1631 1632 if exists_pid_fn: 1633 unlink_mock.assert_called_once_with(sysbuild_build_dir + \ 1634 os.path.sep + 'qemu.pid') 1635 1636 1637def test_qemuhandler_create_command(mocked_instance): 1638 sysbuild_build_dir = os.path.join('sysbuild', 'dummy_dir') 1639 1640 handler = QEMUHandler(mocked_instance, 'build', mock.Mock(), 'dummy_cmd', False) 1641 1642 result = handler._create_command(sysbuild_build_dir) 1643 1644 assert result == ['dummy_cmd', '-C', 'sysbuild' + os.path.sep + 'dummy_dir', 1645 'run'] 1646 1647 1648TESTDATA_21 = [ 1649 ( 1650 0, 1651 False, 1652 None, 1653 'good dummy status', 1654 False, 1655 TwisterStatus.NONE, 1656 None, 1657 False 1658 ), 1659 ( 1660 1, 1661 True, 1662 None, 1663 'good dummy status', 1664 False, 1665 TwisterStatus.NONE, 1666 None, 1667 False 1668 ), 1669 ( 1670 0, 1671 False, 1672 None, 1673 TwisterStatus.NONE, 1674 True, 1675 TwisterStatus.FAIL, 1676 'Timeout', 1677 True 1678 ), 1679 ( 1680 1, 1681 False, 1682 None, 1683 TwisterStatus.NONE, 1684 False, 1685 TwisterStatus.FAIL, 1686 'Exited with 1', 1687 True 1688 ), 1689 ( 1690 1, 1691 False, 1692 'preexisting reason', 1693 'good dummy status', 1694 False, 1695 TwisterStatus.FAIL, 1696 'preexisting reason', 1697 True 1698 ), 1699] 1700 1701@pytest.mark.parametrize( 1702 'self_returncode, self_ignore_qemu_crash,' \ 1703 ' self_instance_reason, harness_status, is_timeout,' \ 1704 ' expected_status, expected_reason, expected_called_missing_case', 1705 TESTDATA_21, 1706 ids=['not failed', 'qemu ignore', 'timeout', 'bad returncode', 'other fail'] 1707) 1708def test_qemuhandler_update_instance_info( 1709 mocked_instance, 1710 self_returncode, 1711 self_ignore_qemu_crash, 1712 self_instance_reason, 1713 harness_status, 1714 is_timeout, 1715 expected_status, 1716 expected_reason, 1717 expected_called_missing_case 1718): 1719 mocked_instance.add_missing_case_status = mock.Mock() 1720 mocked_instance.reason = self_instance_reason 1721 mocked_harness = mock.Mock(status=harness_status, reason="foobar") 1722 1723 handler = QEMUHandler(mocked_instance, 'build', mock.Mock()) 1724 handler.returncode = self_returncode 1725 handler.ignore_qemu_crash = self_ignore_qemu_crash 1726 1727 handler._update_instance_info(mocked_harness, is_timeout) 1728 1729 assert handler.instance.status == expected_status 1730 assert handler.instance.reason == expected_reason 1731 1732 if expected_called_missing_case: 1733 mocked_instance.add_missing_case_status.assert_called_once_with( 1734 TwisterStatus.BLOCK 1735 ) 1736 1737 1738def test_qemuhandler_thread_get_fifo_names(): 1739 fifo_fn = 'dummy' 1740 1741 fifo_in, fifo_out = QEMUHandler._thread_get_fifo_names(fifo_fn) 1742 1743 assert fifo_in == 'dummy.in' 1744 assert fifo_out == 'dummy.out' 1745 1746 1747TESTDATA_24 = [ 1748 (TwisterStatus.FAIL, 'timeout', TwisterStatus.FAIL, 'timeout'), 1749 (TwisterStatus.FAIL, 'Execution error', TwisterStatus.FAIL, 'Execution error'), 1750 (TwisterStatus.FAIL, 'unexpected eof', TwisterStatus.FAIL, 'unexpected eof'), 1751 (TwisterStatus.FAIL, 'unexpected byte', TwisterStatus.FAIL, 'unexpected byte'), 1752 (TwisterStatus.NONE, None, TwisterStatus.NONE, 'Unknown'), 1753] 1754 1755@pytest.mark.parametrize( 1756 '_status, _reason, expected_status, expected_reason', 1757 TESTDATA_24, 1758 ids=['timeout', 'failed', 'unexpected eof', 'unexpected byte', 'unknown'] 1759) 1760def test_qemuhandler_thread_update_instance_info( 1761 mocked_instance, 1762 _status, 1763 _reason, 1764 expected_status, 1765 expected_reason 1766): 1767 handler = QEMUHandler(mocked_instance, 'build', mock.Mock()) 1768 handler_time = 59 1769 1770 QEMUHandler._thread_update_instance_info(handler, handler_time, _status, _reason) 1771 1772 assert handler.instance.execution_time == handler_time 1773 1774 assert handler.instance.status == expected_status 1775 assert handler.instance.reason == expected_reason 1776 1777 1778TESTDATA_25 = [ 1779 ( 1780 ('1\n' * 60).encode('utf-8'), 1781 60, 1782 1, 1783 [TwisterStatus.NONE] * 60 + [TwisterStatus.PASS] * 6, 1784 1000, 1785 False, 1786 TwisterStatus.FAIL, 1787 'timeout', 1788 [mock.call('1\n'), mock.call('1\n')] 1789 ), 1790 ( 1791 ('1\n' * 60).encode('utf-8'), 1792 60, 1793 -1, 1794 [TwisterStatus.NONE] * 60 + [TwisterStatus.PASS] * 30, 1795 100, 1796 False, 1797 TwisterStatus.FAIL, 1798 None, 1799 [mock.call('1\n'), mock.call('1\n')] 1800 ), 1801 ( 1802 b'', 1803 60, 1804 1, 1805 [TwisterStatus.PASS] * 3, 1806 100, 1807 False, 1808 TwisterStatus.FAIL, 1809 'unexpected eof', 1810 [] 1811 ), 1812 ( 1813 b'\x81', 1814 60, 1815 1, 1816 [TwisterStatus.PASS] * 3, 1817 100, 1818 False, 1819 TwisterStatus.FAIL, 1820 'unexpected byte', 1821 [] 1822 ), 1823 ( 1824 '1\n2\n3\n4\n5\n'.encode('utf-8'), 1825 600, 1826 1, 1827 [TwisterStatus.NONE] * 3 + [TwisterStatus.PASS] * 7, 1828 100, 1829 False, 1830 TwisterStatus.PASS, 1831 None, 1832 [mock.call('1\n'), mock.call('2\n'), mock.call('3\n'), mock.call('4\n')] 1833 ), 1834 ( 1835 '1\n2\n3\n4\n5\n'.encode('utf-8'), 1836 600, 1837 0, 1838 [TwisterStatus.NONE] * 3 + [TwisterStatus.PASS] * 7, 1839 100, 1840 False, 1841 TwisterStatus.FAIL, 1842 'timeout', 1843 [mock.call('1\n'), mock.call('2\n')] 1844 ), 1845 ( 1846 '1\n2\n3\n4\n5\n'.encode('utf-8'), 1847 60, 1848 1, 1849 [TwisterStatus.NONE] * 3 + [TwisterStatus.PASS] * 7, 1850 (n for n in [100, 100, 10000]), 1851 True, 1852 TwisterStatus.PASS, 1853 None, 1854 [mock.call('1\n'), mock.call('2\n'), mock.call('3\n'), mock.call('4\n')] 1855 ), 1856] 1857 1858@pytest.mark.parametrize( 1859 'content, timeout, pid, harness_statuses, cputime, capture_coverage,' \ 1860 ' expected_status, expected_reason, expected_log_calls', 1861 TESTDATA_25, 1862 ids=[ 1863 'timeout', 1864 'harness failed', 1865 'unexpected eof', 1866 'unexpected byte', 1867 'harness success', 1868 'timeout by pid=0', 1869 'capture_coverage' 1870 ] 1871) 1872def test_qemuhandler_thread( 1873 mocked_instance, 1874 faux_timer, 1875 content, 1876 timeout, 1877 pid, 1878 harness_statuses, 1879 cputime, 1880 capture_coverage, 1881 expected_status, 1882 expected_reason, 1883 expected_log_calls 1884): 1885 def mock_cputime(pid): 1886 if pid > 0: 1887 return cputime if isinstance(cputime, int) else next(cputime) 1888 else: 1889 raise ProcessLookupError() 1890 1891 type(mocked_instance.testsuite).timeout = mock.PropertyMock(return_value=timeout) 1892 handler = QEMUHandler(mocked_instance, 'build', mock.Mock(timeout_multiplier=1)) 1893 handler.ignore_unexpected_eof = False 1894 handler.pid_fn = 'pid_fn' 1895 handler.fifo_fn = 'fifo_fn' 1896 1897 file_objs = {} 1898 1899 def mocked_open(filename, *args, **kwargs): 1900 if filename == handler.pid_fn: 1901 contents = str(pid).encode('utf-8') 1902 elif filename == handler.fifo_fn + '.out': 1903 contents = content 1904 else: 1905 contents = b'' 1906 1907 file_object = mock.mock_open(read_data=contents).return_value 1908 file_object.__iter__.return_value = contents.splitlines(True) 1909 if file_object not in file_objs: 1910 file_objs[filename] = file_object 1911 1912 return file_object 1913 1914 harness = mock.Mock(capture_coverage=capture_coverage, handle=print) 1915 type(harness).status = mock.PropertyMock(side_effect=harness_statuses) 1916 1917 p = mock.Mock() 1918 p.poll = mock.Mock( 1919 side_effect=itertools.cycle([True, True, True, True, False]) 1920 ) 1921 1922 mock_thread_get_fifo_names = mock.Mock( 1923 return_value=('fifo_fn.in', 'fifo_fn.out') 1924 ) 1925 mock_thread_update_instance_info = mock.Mock() 1926 1927 with mock.patch('time.time', side_effect=faux_timer.time), \ 1928 mock.patch('builtins.open', new=mocked_open), \ 1929 mock.patch('select.poll', return_value=p), \ 1930 mock.patch('os.path.exists', return_value=True), \ 1931 mock.patch('os.unlink', mock.Mock()), \ 1932 mock.patch('os.mkfifo', mock.Mock()), \ 1933 mock.patch('os.kill', mock.Mock()), \ 1934 mock.patch('twisterlib.handlers.QEMUHandler._get_cpu_time', 1935 mock_cputime), \ 1936 mock.patch('twisterlib.handlers.QEMUHandler._thread_get_fifo_names', 1937 mock_thread_get_fifo_names), \ 1938 mock.patch('twisterlib.handlers.QEMUHandler.' \ 1939 '_thread_update_instance_info', 1940 mock_thread_update_instance_info): 1941 QEMUHandler._thread( 1942 handler, 1943 handler.get_test_timeout(), 1944 handler.build_dir, 1945 handler.log, 1946 handler.fifo_fn, 1947 handler.pid_fn, 1948 harness, 1949 handler.ignore_unexpected_eof 1950 ) 1951 1952 mock_thread_update_instance_info.assert_called_once_with( 1953 handler, 1954 mock.ANY, 1955 expected_status, 1956 mock.ANY 1957 ) 1958 1959 file_objs[handler.log].write.assert_has_calls(expected_log_calls) 1960 1961 1962TESTDATA_26 = [ 1963 (True, False, TwisterStatus.NONE, True, 1964 ['No timeout, return code from QEMU (1): 1', 1965 'return code from QEMU (1): 1']), 1966 (False, True, TwisterStatus.PASS, True, ['return code from QEMU (1): 0']), 1967 (False, True, TwisterStatus.FAIL, False, ['return code from QEMU (None): 1']), 1968] 1969 1970@pytest.mark.parametrize( 1971 'isatty, do_timeout, harness_status, exists_pid_fn, expected_logs', 1972 TESTDATA_26, 1973 ids=['no timeout, isatty', 'timeout passed', 'timeout, no pid_fn'] 1974) 1975def test_qemuhandler_handle( 1976 mocked_instance, 1977 caplog, 1978 tmp_path, 1979 isatty, 1980 do_timeout, 1981 harness_status, 1982 exists_pid_fn, 1983 expected_logs 1984): 1985 def mock_wait(*args, **kwargs): 1986 if do_timeout: 1987 raise TimeoutExpired('dummy cmd', 'dummyamount') 1988 1989 mock_process = mock.Mock(pid=0, returncode=1) 1990 mock_process.communicate = mock.Mock( 1991 return_value=(mock.Mock(), mock.Mock()) 1992 ) 1993 mock_process.wait = mock.Mock(side_effect=mock_wait) 1994 1995 handler = QEMUHandler(mocked_instance, 'build', mock.Mock(timeout_multiplier=1)) 1996 1997 def mock_path_exists(name, *args, **kwargs): 1998 return exists_pid_fn 1999 2000 def mock_popen(command, stdout=None, stdin=None, stderr=None, cwd=None): 2001 return mock.Mock( 2002 __enter__=mock.Mock(return_value=mock_process), 2003 __exit__=mock.Mock(return_value=None), 2004 communicate=mock.Mock(return_value=(mock.Mock(), mock.Mock())) 2005 ) 2006 2007 def mock_thread(name=None, target=None, daemon=None, args=None): 2008 return mock.Mock() 2009 2010 def mock_filenames(sysbuild_build_dir): 2011 handler.fifo_fn = os.path.join('dummy', 'qemu-fifo') 2012 handler.pid_fn = os.path.join(sysbuild_build_dir, 'qemu.pid') 2013 handler.log_fn = os.path.join('dummy', 'log') 2014 2015 harness = mock.Mock(status=harness_status) 2016 handler_options_west_flash = [] 2017 2018 domain_build_dir = os.path.join('sysbuild', 'dummydir') 2019 command = ['generator_cmd', '-C', os.path.join('cmd', 'path'), 'run'] 2020 2021 handler.options = mock.Mock( 2022 timeout_multiplier=1, 2023 west_flash=handler_options_west_flash, 2024 west_runner=None 2025 ) 2026 handler.run_custom_script = mock.Mock(return_value=None) 2027 handler.make_device_available = mock.Mock(return_value=None) 2028 handler._final_handle_actions = mock.Mock(return_value=None) 2029 handler._create_command = mock.Mock(return_value=command) 2030 handler._set_qemu_filenames = mock.Mock(side_effect=mock_filenames) 2031 handler.get_default_domain_build_dir = mock.Mock(return_value=domain_build_dir) 2032 handler.terminate = mock.Mock() 2033 2034 unlink_mock = mock.Mock() 2035 2036 with mock.patch('subprocess.Popen', side_effect=mock_popen), \ 2037 mock.patch('builtins.open', mock.mock_open(read_data='1')), \ 2038 mock.patch('threading.Thread', side_effect=mock_thread), \ 2039 mock.patch('os.path.exists', side_effect=mock_path_exists), \ 2040 mock.patch('os.unlink', unlink_mock), \ 2041 mock.patch('sys.stdout.isatty', return_value=isatty): 2042 handler.handle(harness) 2043 2044 assert all([expected_log in caplog.text for expected_log in expected_logs]) 2045 2046 2047def test_qemuhandler_get_fifo(mocked_instance): 2048 handler = QEMUHandler(mocked_instance, 'build', mock.Mock(timeout_multiplier=1)) 2049 handler.fifo_fn = 'fifo_fn' 2050 2051 result = handler.get_fifo() 2052 2053 assert result == 'fifo_fn' 2054