1#!/usr/bin/env python3 2# vim: set syntax=python ts=4 : 3# 4# Copyright (c) 2018-2022 Intel Corporation 5# Copyright 2022 NXP 6# SPDX-License-Identifier: Apache-2.0 7 8import csv 9import logging 10import math 11import os 12import psutil 13import re 14import select 15import shlex 16import signal 17import subprocess 18import sys 19import threading 20import time 21 22from twisterlib.environment import ZEPHYR_BASE 23from twisterlib.error import TwisterException 24sys.path.insert(0, os.path.join(ZEPHYR_BASE, "scripts/pylib/build_helpers")) 25from domains import Domains 26 27try: 28 import serial 29except ImportError: 30 print("Install pyserial python module with pip to use --device-testing option.") 31 32try: 33 import pty 34except ImportError as capture_error: 35 if os.name == "nt": # "nt" means that program is running on Windows OS 36 pass # "--device-serial-pty" option is not supported on Windows OS 37 else: 38 raise capture_error 39 40logger = logging.getLogger('twister') 41logger.setLevel(logging.DEBUG) 42 43SUPPORTED_SIMS = ["mdb-nsim", "nsim", "renode", "qemu", "tsim", "armfvp", "xt-sim", "native", "custom"] 44SUPPORTED_SIMS_IN_PYTEST = ['native', 'qemu'] 45 46 47def terminate_process(proc): 48 """ 49 encapsulate terminate functionality so we do it consistently where ever 50 we might want to terminate the proc. We need try_kill_process_by_pid 51 because of both how newer ninja (1.6.0 or greater) and .NET / renode 52 work. Newer ninja's don't seem to pass SIGTERM down to the children 53 so we need to use try_kill_process_by_pid. 54 """ 55 56 for child in psutil.Process(proc.pid).children(recursive=True): 57 try: 58 os.kill(child.pid, signal.SIGTERM) 59 except ProcessLookupError: 60 pass 61 proc.terminate() 62 # sleep for a while before attempting to kill 63 time.sleep(0.5) 64 proc.kill() 65 66 67class Handler: 68 def __init__(self, instance, type_str="build"): 69 """Constructor 70 71 """ 72 self.options = None 73 74 self.state = "waiting" 75 self.run = False 76 self.type_str = type_str 77 78 self.binary = None 79 self.pid_fn = None 80 self.call_make_run = True 81 82 self.name = instance.name 83 self.instance = instance 84 self.sourcedir = instance.testsuite.source_dir 85 self.build_dir = instance.build_dir 86 self.log = os.path.join(self.build_dir, "handler.log") 87 self.returncode = 0 88 self.generator = None 89 self.generator_cmd = None 90 self.suite_name_check = True 91 self.ready = False 92 93 self.args = [] 94 self.terminated = False 95 96 def get_test_timeout(self): 97 return math.ceil(self.instance.testsuite.timeout * 98 self.instance.platform.timeout_multiplier * 99 self.options.timeout_multiplier) 100 101 def record(self, harness): 102 if harness.recording: 103 filename = os.path.join(self.build_dir, "recording.csv") 104 with open(filename, "at") as csvfile: 105 cw = csv.writer(csvfile, harness.fieldnames, lineterminator=os.linesep) 106 cw.writerow(harness.fieldnames) 107 for instance in harness.recording: 108 cw.writerow(instance) 109 110 def terminate(self, proc): 111 terminate_process(proc) 112 self.terminated = True 113 114 def _verify_ztest_suite_name(self, harness_state, detected_suite_names, handler_time): 115 """ 116 If test suite names was found in test's C source code, then verify if 117 detected suite names from output correspond to expected suite names 118 (and not in reverse). 119 """ 120 expected_suite_names = self.instance.testsuite.ztest_suite_names 121 logger.debug(f"Expected suite names:{expected_suite_names}") 122 logger.debug(f"Detected suite names:{detected_suite_names}") 123 if not expected_suite_names or \ 124 not harness_state == "passed": 125 return 126 if not detected_suite_names: 127 self._missing_suite_name(expected_suite_names, handler_time) 128 for detected_suite_name in detected_suite_names: 129 if detected_suite_name not in expected_suite_names: 130 self._missing_suite_name(expected_suite_names, handler_time) 131 break 132 133 def _missing_suite_name(self, expected_suite_names, handler_time): 134 """ 135 Change result of performed test if problem with missing or unpropper 136 suite name was occurred. 137 """ 138 self.instance.status = "failed" 139 self.instance.execution_time = handler_time 140 for tc in self.instance.testcases: 141 tc.status = "failed" 142 self.instance.reason = f"Testsuite mismatch" 143 logger.debug("Test suite names were not printed or some of them in " \ 144 "output do not correspond with expected: %s", 145 str(expected_suite_names)) 146 147 def _final_handle_actions(self, harness, handler_time): 148 149 # only for Ztest tests: 150 harness_class_name = type(harness).__name__ 151 if self.suite_name_check and harness_class_name == "Test": 152 self._verify_ztest_suite_name(harness.state, harness.detected_suite_names, handler_time) 153 if self.instance.status == 'failed': 154 return 155 if not harness.matched_run_id and harness.run_id_exists: 156 self.instance.status = "failed" 157 self.instance.execution_time = handler_time 158 self.instance.reason = "RunID mismatch" 159 for tc in self.instance.testcases: 160 tc.status = "failed" 161 162 self.record(harness) 163 164 165class BinaryHandler(Handler): 166 def __init__(self, instance, type_str): 167 """Constructor 168 169 @param instance Test Instance 170 """ 171 super().__init__(instance, type_str) 172 173 self.call_west_flash = False 174 self.seed = None 175 self.extra_test_args = None 176 self.line = b"" 177 178 def try_kill_process_by_pid(self): 179 if self.pid_fn: 180 pid = int(open(self.pid_fn).read()) 181 os.unlink(self.pid_fn) 182 self.pid_fn = None # clear so we don't try to kill the binary twice 183 try: 184 os.kill(pid, signal.SIGKILL) 185 except ProcessLookupError: 186 pass 187 188 def _output_reader(self, proc): 189 self.line = proc.stdout.readline() 190 191 def _output_handler(self, proc, harness): 192 suffix = '\\r\\n' 193 194 with open(self.log, "wt") as log_out_fp: 195 timeout_extended = False 196 timeout_time = time.time() + self.get_test_timeout() 197 while True: 198 this_timeout = timeout_time - time.time() 199 if this_timeout < 0: 200 break 201 reader_t = threading.Thread(target=self._output_reader, args=(proc,), daemon=True) 202 reader_t.start() 203 reader_t.join(this_timeout) 204 if not reader_t.is_alive() and self.line != b"": 205 line_decoded = self.line.decode('utf-8', "replace") 206 if line_decoded.endswith(suffix): 207 stripped_line = line_decoded[:-len(suffix)].rstrip() 208 else: 209 stripped_line = line_decoded.rstrip() 210 logger.debug("OUTPUT: %s", stripped_line) 211 log_out_fp.write(line_decoded) 212 log_out_fp.flush() 213 harness.handle(stripped_line) 214 if harness.state: 215 if not timeout_extended or harness.capture_coverage: 216 timeout_extended = True 217 if harness.capture_coverage: 218 timeout_time = time.time() + 30 219 else: 220 timeout_time = time.time() + 2 221 else: 222 reader_t.join(0) 223 break 224 try: 225 # POSIX arch based ztests end on their own, 226 # so let's give it up to 100ms to do so 227 proc.wait(0.1) 228 except subprocess.TimeoutExpired: 229 self.terminate(proc) 230 231 def _create_command(self, robot_test): 232 if robot_test: 233 command = [self.generator_cmd, "run_renode_test"] 234 elif self.call_make_run: 235 command = [self.generator_cmd, "run"] 236 elif self.call_west_flash: 237 command = ["west", "flash", "--skip-rebuild", "-d", self.build_dir] 238 else: 239 command = [self.binary] 240 241 if self.options.enable_valgrind: 242 command = ["valgrind", "--error-exitcode=2", 243 "--leak-check=full", 244 "--suppressions=" + ZEPHYR_BASE + "/scripts/valgrind.supp", 245 "--log-file=" + self.build_dir + "/valgrind.log", 246 "--track-origins=yes", 247 ] + command 248 249 # Only valid for native_posix 250 if self.seed is not None: 251 command.append(f"--seed={self.seed}") 252 if self.extra_test_args is not None: 253 command.extend(self.extra_test_args) 254 255 return command 256 257 def _create_env(self): 258 env = os.environ.copy() 259 if self.options.enable_asan: 260 env["ASAN_OPTIONS"] = "log_path=stdout:" + \ 261 env.get("ASAN_OPTIONS", "") 262 if not self.options.enable_lsan: 263 env["ASAN_OPTIONS"] += "detect_leaks=0" 264 265 if self.options.enable_ubsan: 266 env["UBSAN_OPTIONS"] = "log_path=stdout:halt_on_error=1:" + \ 267 env.get("UBSAN_OPTIONS", "") 268 269 return env 270 271 def _update_instance_info(self, harness_state, handler_time): 272 self.instance.execution_time = handler_time 273 if not self.terminated and self.returncode != 0: 274 self.instance.status = "failed" 275 if self.options.enable_valgrind and self.returncode == 2: 276 self.instance.reason = "Valgrind error" 277 else: 278 # When a process is killed, the default handler returns 128 + SIGTERM 279 # so in that case the return code itself is not meaningful 280 self.instance.reason = "Failed" 281 elif harness_state: 282 self.instance.status = harness_state 283 if harness_state == "failed": 284 self.instance.reason = "Failed" 285 else: 286 self.instance.status = "failed" 287 self.instance.reason = "Timeout" 288 self.instance.add_missing_case_status("blocked", "Timeout") 289 290 def handle(self, harness): 291 robot_test = getattr(harness, "is_robot_test", False) 292 293 command = self._create_command(robot_test) 294 295 logger.debug("Spawning process: " + 296 " ".join(shlex.quote(word) for word in command) + os.linesep + 297 "in directory: " + self.build_dir) 298 299 start_time = time.time() 300 301 env = self._create_env() 302 303 if robot_test: 304 harness.run_robot_test(command, self) 305 return 306 307 with subprocess.Popen(command, stdout=subprocess.PIPE, 308 stderr=subprocess.PIPE, cwd=self.build_dir, env=env) as proc: 309 logger.debug("Spawning BinaryHandler Thread for %s" % self.name) 310 t = threading.Thread(target=self._output_handler, args=(proc, harness,), daemon=True) 311 t.start() 312 t.join() 313 if t.is_alive(): 314 self.terminate(proc) 315 t.join() 316 proc.wait() 317 self.returncode = proc.returncode 318 self.try_kill_process_by_pid() 319 320 handler_time = time.time() - start_time 321 322 if self.options.coverage: 323 subprocess.call(["GCOV_PREFIX=" + self.build_dir, 324 "gcov", self.sourcedir, "-b", "-s", self.build_dir], shell=True) 325 326 # FIXME: This is needed when killing the simulator, the console is 327 # garbled and needs to be reset. Did not find a better way to do that. 328 if sys.stdout.isatty(): 329 subprocess.call(["stty", "sane"], stdin=sys.stdout) 330 331 self._update_instance_info(harness.state, handler_time) 332 333 self._final_handle_actions(harness, handler_time) 334 335 336class SimulationHandler(BinaryHandler): 337 def __init__(self, instance, type_str): 338 """Constructor 339 340 @param instance Test Instance 341 """ 342 super().__init__(instance, type_str) 343 344 if type_str == 'renode': 345 self.pid_fn = os.path.join(instance.build_dir, "renode.pid") 346 elif type_str == 'native': 347 self.call_make_run = False 348 self.binary = os.path.join(instance.build_dir, "zephyr", "zephyr.exe") 349 self.ready = True 350 351 352class DeviceHandler(Handler): 353 354 def __init__(self, instance, type_str): 355 """Constructor 356 357 @param instance Test Instance 358 """ 359 super().__init__(instance, type_str) 360 361 def monitor_serial(self, ser, halt_event, harness): 362 log_out_fp = open(self.log, "wb") 363 364 if self.options.coverage: 365 # Set capture_coverage to True to indicate that right after 366 # test results we should get coverage data, otherwise we exit 367 # from the test. 368 harness.capture_coverage = True 369 370 # Clear serial leftover. 371 ser.reset_input_buffer() 372 373 while ser.isOpen(): 374 if halt_event.is_set(): 375 logger.debug('halted') 376 ser.close() 377 break 378 379 try: 380 if not ser.in_waiting: 381 # no incoming bytes are waiting to be read from 382 # the serial input buffer, let other threads run 383 time.sleep(0.001) 384 continue 385 # maybe the serial port is still in reset 386 # check status may cause error 387 # wait for more time 388 except OSError: 389 time.sleep(0.001) 390 continue 391 except TypeError: 392 # This exception happens if the serial port was closed and 393 # its file descriptor cleared in between of ser.isOpen() 394 # and ser.in_waiting checks. 395 logger.debug("Serial port is already closed, stop reading.") 396 break 397 398 serial_line = None 399 try: 400 serial_line = ser.readline() 401 except TypeError: 402 pass 403 # ignore SerialException which may happen during the serial device 404 # power off/on process. 405 except serial.SerialException: 406 pass 407 408 # Just because ser_fileno has data doesn't mean an entire line 409 # is available yet. 410 if serial_line: 411 sl = serial_line.decode('utf-8', 'ignore').lstrip() 412 logger.debug("DEVICE: {0}".format(sl.rstrip())) 413 414 log_out_fp.write(sl.encode('utf-8')) 415 log_out_fp.flush() 416 harness.handle(sl.rstrip()) 417 418 if harness.state: 419 if not harness.capture_coverage: 420 ser.close() 421 break 422 423 log_out_fp.close() 424 425 def device_is_available(self, instance): 426 device = instance.platform.name 427 fixture = instance.testsuite.harness_config.get("fixture") 428 dut_found = False 429 430 for d in self.duts: 431 if fixture and fixture not in d.fixtures: 432 continue 433 if d.platform != device or (d.serial is None and d.serial_pty is None): 434 continue 435 dut_found = True 436 d.lock.acquire() 437 avail = False 438 if d.available: 439 d.available = 0 440 d.counter += 1 441 avail = True 442 d.lock.release() 443 if avail: 444 return d 445 446 if not dut_found: 447 raise TwisterException(f"No device to serve as {device} platform.") 448 449 return None 450 451 def make_device_available(self, serial): 452 for d in self.duts: 453 if serial in [d.serial_pty, d.serial]: 454 d.available = 1 455 456 @staticmethod 457 def run_custom_script(script, timeout): 458 with subprocess.Popen(script, stderr=subprocess.PIPE, stdout=subprocess.PIPE) as proc: 459 try: 460 stdout, stderr = proc.communicate(timeout=timeout) 461 logger.debug(stdout.decode()) 462 if proc.returncode != 0: 463 logger.error(f"Custom script failure: {stderr.decode(errors='ignore')}") 464 465 except subprocess.TimeoutExpired: 466 proc.kill() 467 proc.communicate() 468 logger.error("{} timed out".format(script)) 469 470 def _create_command(self, runner, hardware): 471 if (self.options.west_flash is not None) or runner: 472 command = ["west", "flash", "--skip-rebuild", "-d", self.build_dir] 473 command_extra_args = [] 474 475 # There are three ways this option is used. 476 # 1) bare: --west-flash 477 # This results in options.west_flash == [] 478 # 2) with a value: --west-flash="--board-id=42" 479 # This results in options.west_flash == "--board-id=42" 480 # 3) Multiple values: --west-flash="--board-id=42,--erase" 481 # This results in options.west_flash == "--board-id=42 --erase" 482 if self.options.west_flash and self.options.west_flash != []: 483 command_extra_args.extend(self.options.west_flash.split(',')) 484 485 if runner: 486 command.append("--runner") 487 command.append(runner) 488 489 board_id = hardware.probe_id or hardware.id 490 product = hardware.product 491 if board_id is not None: 492 if runner in ("pyocd", "nrfjprog"): 493 command_extra_args.append("--dev-id") 494 command_extra_args.append(board_id) 495 elif runner == "openocd" and product == "STM32 STLink": 496 command_extra_args.append("--cmd-pre-init") 497 command_extra_args.append("hla_serial %s" % board_id) 498 elif runner == "openocd" and product == "STLINK-V3": 499 command_extra_args.append("--cmd-pre-init") 500 command_extra_args.append("hla_serial %s" % board_id) 501 elif runner == "openocd" and product == "EDBG CMSIS-DAP": 502 command_extra_args.append("--cmd-pre-init") 503 command_extra_args.append("cmsis_dap_serial %s" % board_id) 504 elif runner == "jlink": 505 command.append("--tool-opt=-SelectEmuBySN %s" % board_id) 506 elif runner == "stm32cubeprogrammer": 507 command.append("--tool-opt=sn=%s" % board_id) 508 509 # Receive parameters from runner_params field. 510 if hardware.runner_params: 511 for param in hardware.runner_params: 512 command.append(param) 513 514 if command_extra_args: 515 command.append('--') 516 command.extend(command_extra_args) 517 else: 518 command = [self.generator_cmd, "-C", self.build_dir, "flash"] 519 520 return command 521 522 def _update_instance_info(self, harness_state, handler_time, flash_error): 523 self.instance.execution_time = handler_time 524 if harness_state: 525 self.instance.status = harness_state 526 if harness_state == "failed": 527 self.instance.reason = "Failed" 528 elif not flash_error: 529 self.instance.status = "failed" 530 self.instance.reason = "Timeout" 531 532 if self.instance.status in ["error", "failed"]: 533 self.instance.add_missing_case_status("blocked", self.instance.reason) 534 535 def _create_serial_connection(self, serial_device, hardware_baud, 536 flash_timeout, serial_pty, ser_pty_process): 537 try: 538 ser = serial.Serial( 539 serial_device, 540 baudrate=hardware_baud, 541 parity=serial.PARITY_NONE, 542 stopbits=serial.STOPBITS_ONE, 543 bytesize=serial.EIGHTBITS, 544 # the worst case of no serial input 545 timeout=max(flash_timeout, self.get_test_timeout()) 546 ) 547 except serial.SerialException as e: 548 self.instance.status = "failed" 549 self.instance.reason = "Serial Device Error" 550 logger.error("Serial device error: %s" % (str(e))) 551 552 self.instance.add_missing_case_status("blocked", "Serial Device Error") 553 if serial_pty and ser_pty_process: 554 ser_pty_process.terminate() 555 outs, errs = ser_pty_process.communicate() 556 logger.debug("Process {} terminated outs: {} errs {}".format(serial_pty, outs, errs)) 557 558 if serial_pty: 559 self.make_device_available(serial_pty) 560 else: 561 self.make_device_available(serial_device) 562 raise 563 564 return ser 565 566 def get_hardware(self): 567 hardware = None 568 try: 569 hardware = self.device_is_available(self.instance) 570 while not hardware: 571 time.sleep(1) 572 hardware = self.device_is_available(self.instance) 573 except TwisterException as error: 574 self.instance.status = "failed" 575 self.instance.reason = str(error) 576 logger.error(self.instance.reason) 577 return hardware 578 579 def _get_serial_device(self, serial_pty, hardware_serial): 580 ser_pty_process = None 581 if serial_pty: 582 master, slave = pty.openpty() 583 try: 584 ser_pty_process = subprocess.Popen( 585 re.split('[, ]', serial_pty), 586 stdout=master, 587 stdin=master, 588 stderr=master 589 ) 590 except subprocess.CalledProcessError as error: 591 logger.error( 592 "Failed to run subprocess {}, error {}".format( 593 serial_pty, 594 error.output 595 ) 596 ) 597 return 598 599 serial_device = os.ttyname(slave) 600 else: 601 serial_device = hardware_serial 602 603 return serial_device, ser_pty_process 604 605 def handle(self, harness): 606 runner = None 607 hardware = self.get_hardware() 608 if hardware: 609 self.instance.dut = hardware.id 610 if not hardware: 611 return 612 613 runner = hardware.runner or self.options.west_runner 614 serial_pty = hardware.serial_pty 615 616 serial_device, ser_pty_process = self._get_serial_device(serial_pty, hardware.serial) 617 618 logger.debug(f"Using serial device {serial_device} @ {hardware.baud} baud") 619 620 command = self._create_command(runner, hardware) 621 622 pre_script = hardware.pre_script 623 post_flash_script = hardware.post_flash_script 624 post_script = hardware.post_script 625 626 if pre_script: 627 self.run_custom_script(pre_script, 30) 628 629 flash_timeout = hardware.flash_timeout 630 if hardware.flash_with_test: 631 flash_timeout += self.get_test_timeout() 632 633 try: 634 ser = self._create_serial_connection( 635 serial_device, 636 hardware.baud, 637 flash_timeout, 638 serial_pty, 639 ser_pty_process 640 ) 641 except serial.SerialException: 642 return 643 644 halt_monitor_evt = threading.Event() 645 646 t = threading.Thread(target=self.monitor_serial, daemon=True, 647 args=(ser, halt_monitor_evt, harness)) 648 start_time = time.time() 649 t.start() 650 651 d_log = "{}/device.log".format(self.instance.build_dir) 652 logger.debug('Flash command: %s', command) 653 flash_error = False 654 try: 655 stdout = stderr = None 656 with subprocess.Popen(command, stderr=subprocess.PIPE, stdout=subprocess.PIPE) as proc: 657 try: 658 (stdout, stderr) = proc.communicate(timeout=flash_timeout) 659 # ignore unencodable unicode chars 660 logger.debug(stdout.decode(errors="ignore")) 661 662 if proc.returncode != 0: 663 self.instance.status = "error" 664 self.instance.reason = "Device issue (Flash error?)" 665 flash_error = True 666 with open(d_log, "w") as dlog_fp: 667 dlog_fp.write(stderr.decode()) 668 halt_monitor_evt.set() 669 except subprocess.TimeoutExpired: 670 logger.warning("Flash operation timed out.") 671 self.terminate(proc) 672 (stdout, stderr) = proc.communicate() 673 self.instance.status = "error" 674 self.instance.reason = "Device issue (Timeout)" 675 flash_error = True 676 677 with open(d_log, "w") as dlog_fp: 678 dlog_fp.write(stderr.decode()) 679 680 except subprocess.CalledProcessError: 681 halt_monitor_evt.set() 682 self.instance.status = "error" 683 self.instance.reason = "Device issue (Flash error)" 684 flash_error = True 685 686 if post_flash_script: 687 self.run_custom_script(post_flash_script, 30) 688 689 if not flash_error: 690 # Always wait at most the test timeout here after flashing. 691 t.join(self.get_test_timeout()) 692 else: 693 # When the flash error is due exceptions, 694 # twister tell the monitor serial thread 695 # to close the serial. But it is necessary 696 # for this thread being run first and close 697 # have the change to close the serial. 698 t.join(0.1) 699 700 if t.is_alive(): 701 logger.debug("Timed out while monitoring serial output on {}".format(self.instance.platform.name)) 702 703 if ser.isOpen(): 704 ser.close() 705 706 if serial_pty: 707 ser_pty_process.terminate() 708 outs, errs = ser_pty_process.communicate() 709 logger.debug("Process {} terminated outs: {} errs {}".format(serial_pty, outs, errs)) 710 711 handler_time = time.time() - start_time 712 713 self._update_instance_info(harness.state, handler_time, flash_error) 714 715 self._final_handle_actions(harness, handler_time) 716 717 if post_script: 718 self.run_custom_script(post_script, 30) 719 720 if serial_pty: 721 self.make_device_available(serial_pty) 722 else: 723 self.make_device_available(serial_device) 724 725 726class QEMUHandler(Handler): 727 """Spawns a thread to monitor QEMU output from pipes 728 729 We pass QEMU_PIPE to 'make run' and monitor the pipes for output. 730 We need to do this as once qemu starts, it runs forever until killed. 731 Test cases emit special messages to the console as they run, we check 732 for these to collect whether the test passed or failed. 733 """ 734 735 def __init__(self, instance, type_str): 736 """Constructor 737 738 @param instance Test instance 739 """ 740 741 super().__init__(instance, type_str) 742 self.fifo_fn = os.path.join(instance.build_dir, "qemu-fifo") 743 744 self.pid_fn = os.path.join(instance.build_dir, "qemu.pid") 745 746 if instance.testsuite.ignore_qemu_crash: 747 self.ignore_qemu_crash = True 748 self.ignore_unexpected_eof = True 749 else: 750 self.ignore_qemu_crash = False 751 self.ignore_unexpected_eof = False 752 753 @staticmethod 754 def _get_cpu_time(pid): 755 """get process CPU time. 756 757 The guest virtual time in QEMU icount mode isn't host time and 758 it's maintained by counting guest instructions, so we use QEMU 759 process execution time to mostly simulate the time of guest OS. 760 """ 761 proc = psutil.Process(pid) 762 cpu_time = proc.cpu_times() 763 return cpu_time.user + cpu_time.system 764 765 @staticmethod 766 def _thread_get_fifo_names(fifo_fn): 767 fifo_in = fifo_fn + ".in" 768 fifo_out = fifo_fn + ".out" 769 770 return fifo_in, fifo_out 771 772 @staticmethod 773 def _thread_open_files(fifo_in, fifo_out, logfile): 774 # These in/out nodes are named from QEMU's perspective, not ours 775 if os.path.exists(fifo_in): 776 os.unlink(fifo_in) 777 os.mkfifo(fifo_in) 778 if os.path.exists(fifo_out): 779 os.unlink(fifo_out) 780 os.mkfifo(fifo_out) 781 782 # We don't do anything with out_fp but we need to open it for 783 # writing so that QEMU doesn't block, due to the way pipes work 784 out_fp = open(fifo_in, "wb") 785 # Disable internal buffering, we don't 786 # want read() or poll() to ever block if there is data in there 787 in_fp = open(fifo_out, "rb", buffering=0) 788 log_out_fp = open(logfile, "wt") 789 790 return out_fp, in_fp, log_out_fp 791 792 @staticmethod 793 def _thread_close_files(fifo_in, fifo_out, pid, out_fp, in_fp, log_out_fp): 794 log_out_fp.close() 795 out_fp.close() 796 in_fp.close() 797 if pid: 798 try: 799 if pid: 800 os.kill(pid, signal.SIGTERM) 801 except ProcessLookupError: 802 # Oh well, as long as it's dead! User probably sent Ctrl-C 803 pass 804 805 os.unlink(fifo_in) 806 os.unlink(fifo_out) 807 808 @staticmethod 809 def _thread_update_instance_info(handler, handler_time, out_state): 810 handler.instance.execution_time = handler_time 811 if out_state == "timeout": 812 handler.instance.status = "failed" 813 handler.instance.reason = "Timeout" 814 elif out_state == "failed": 815 handler.instance.status = "failed" 816 handler.instance.reason = "Failed" 817 elif out_state in ['unexpected eof', 'unexpected byte']: 818 handler.instance.status = "failed" 819 handler.instance.reason = out_state 820 else: 821 handler.instance.status = out_state 822 handler.instance.reason = "Unknown" 823 824 @staticmethod 825 def _thread(handler, timeout, outdir, logfile, fifo_fn, pid_fn, results, 826 harness, ignore_unexpected_eof=False): 827 fifo_in, fifo_out = QEMUHandler._thread_get_fifo_names(fifo_fn) 828 829 out_fp, in_fp, log_out_fp = QEMUHandler._thread_open_files( 830 fifo_in, 831 fifo_out, 832 logfile 833 ) 834 835 start_time = time.time() 836 timeout_time = start_time + timeout 837 p = select.poll() 838 p.register(in_fp, select.POLLIN) 839 out_state = None 840 841 line = "" 842 timeout_extended = False 843 844 pid = 0 845 if os.path.exists(pid_fn): 846 pid = int(open(pid_fn).read()) 847 848 while True: 849 this_timeout = int((timeout_time - time.time()) * 1000) 850 if this_timeout < 0 or not p.poll(this_timeout): 851 try: 852 if pid and this_timeout > 0: 853 # there's possibility we polled nothing because 854 # of not enough CPU time scheduled by host for 855 # QEMU process during p.poll(this_timeout) 856 cpu_time = QEMUHandler._get_cpu_time(pid) 857 if cpu_time < timeout and not out_state: 858 timeout_time = time.time() + (timeout - cpu_time) 859 continue 860 except ProcessLookupError: 861 out_state = "failed" 862 break 863 864 if not out_state: 865 out_state = "timeout" 866 break 867 868 if pid == 0 and os.path.exists(pid_fn): 869 pid = int(open(pid_fn).read()) 870 871 try: 872 c = in_fp.read(1).decode("utf-8") 873 except UnicodeDecodeError: 874 # Test is writing something weird, fail 875 out_state = "unexpected byte" 876 break 877 878 if c == "": 879 # EOF, this shouldn't happen unless QEMU crashes 880 if not ignore_unexpected_eof: 881 out_state = "unexpected eof" 882 break 883 line = line + c 884 if c != "\n": 885 continue 886 887 # line contains a full line of data output from QEMU 888 log_out_fp.write(line) 889 log_out_fp.flush() 890 line = line.rstrip() 891 logger.debug(f"QEMU ({pid}): {line}") 892 893 harness.handle(line) 894 if harness.state: 895 # if we have registered a fail make sure the state is not 896 # overridden by a false success message coming from the 897 # testsuite 898 if out_state not in ['failed', 'unexpected eof', 'unexpected byte']: 899 out_state = harness.state 900 901 # if we get some state, that means test is doing well, we reset 902 # the timeout and wait for 2 more seconds to catch anything 903 # printed late. We wait much longer if code 904 # coverage is enabled since dumping this information can 905 # take some time. 906 if not timeout_extended or harness.capture_coverage: 907 timeout_extended = True 908 if harness.capture_coverage: 909 timeout_time = time.time() + 30 910 else: 911 timeout_time = time.time() + 2 912 line = "" 913 914 handler_time = time.time() - start_time 915 logger.debug(f"QEMU ({pid}) complete ({out_state}) after {handler_time} seconds") 916 917 QEMUHandler._thread_update_instance_info(handler, handler_time, out_state) 918 919 QEMUHandler._thread_close_files(fifo_in, fifo_out, pid, out_fp, in_fp, log_out_fp) 920 921 def _get_sysbuild_build_dir(self): 922 if self.instance.testsuite.sysbuild: 923 # Load domain yaml to get default domain build directory 924 # Note: for targets using QEMU, we assume that the target will 925 # have added any additional images to the run target manually 926 domain_path = os.path.join(self.build_dir, "domains.yaml") 927 domains = Domains.from_file(domain_path) 928 logger.debug("Loaded sysbuild domain data from %s" % domain_path) 929 build_dir = domains.get_default_domain().build_dir 930 else: 931 build_dir = self.build_dir 932 933 return build_dir 934 935 def _set_qemu_filenames(self, sysbuild_build_dir): 936 # We pass this to QEMU which looks for fifos with .in and .out suffixes. 937 # QEMU fifo will use main build dir 938 self.fifo_fn = os.path.join(self.instance.build_dir, "qemu-fifo") 939 # PID file will be created in the main sysbuild app's build dir 940 self.pid_fn = os.path.join(sysbuild_build_dir, "qemu.pid") 941 942 if os.path.exists(self.pid_fn): 943 os.unlink(self.pid_fn) 944 945 self.log_fn = self.log 946 947 def _create_command(self, sysbuild_build_dir): 948 command = [self.generator_cmd] 949 command += ["-C", sysbuild_build_dir, "run"] 950 951 return command 952 953 def _update_instance_info(self, harness_state, is_timeout): 954 if (self.returncode != 0 and not self.ignore_qemu_crash) or not harness_state: 955 self.instance.status = "failed" 956 if is_timeout: 957 self.instance.reason = "Timeout" 958 else: 959 if not self.instance.reason: 960 self.instance.reason = "Exited with {}".format(self.returncode) 961 self.instance.add_missing_case_status("blocked") 962 963 def handle(self, harness): 964 self.results = {} 965 self.run = True 966 967 sysbuild_build_dir = self._get_sysbuild_build_dir() 968 969 command = self._create_command(sysbuild_build_dir) 970 971 self._set_qemu_filenames(sysbuild_build_dir) 972 973 self.thread = threading.Thread(name=self.name, target=QEMUHandler._thread, 974 args=(self, self.get_test_timeout(), self.build_dir, 975 self.log_fn, self.fifo_fn, 976 self.pid_fn, self.results, harness, 977 self.ignore_unexpected_eof)) 978 979 self.thread.daemon = True 980 logger.debug("Spawning QEMUHandler Thread for %s" % self.name) 981 self.thread.start() 982 if sys.stdout.isatty(): 983 subprocess.call(["stty", "sane"], stdin=sys.stdout) 984 985 logger.debug("Running %s (%s)" % (self.name, self.type_str)) 986 987 is_timeout = False 988 qemu_pid = None 989 990 with subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=self.build_dir) as proc: 991 logger.debug("Spawning QEMUHandler Thread for %s" % self.name) 992 993 try: 994 proc.wait(self.get_test_timeout()) 995 except subprocess.TimeoutExpired: 996 # sometimes QEMU can't handle SIGTERM signal correctly 997 # in that case kill -9 QEMU process directly and leave 998 # twister to judge testing result by console output 999 1000 is_timeout = True 1001 self.terminate(proc) 1002 if harness.state == "passed": 1003 self.returncode = 0 1004 else: 1005 self.returncode = proc.returncode 1006 else: 1007 if os.path.exists(self.pid_fn): 1008 qemu_pid = int(open(self.pid_fn).read()) 1009 logger.debug(f"No timeout, return code from QEMU ({qemu_pid}): {proc.returncode}") 1010 self.returncode = proc.returncode 1011 # Need to wait for harness to finish processing 1012 # output from QEMU. Otherwise it might miss some 1013 # error messages. 1014 self.thread.join(0) 1015 if self.thread.is_alive(): 1016 logger.debug("Timed out while monitoring QEMU output") 1017 1018 if os.path.exists(self.pid_fn): 1019 qemu_pid = int(open(self.pid_fn).read()) 1020 os.unlink(self.pid_fn) 1021 1022 logger.debug(f"return code from QEMU ({qemu_pid}): {self.returncode}") 1023 1024 self._update_instance_info(harness.state, is_timeout) 1025 1026 self._final_handle_actions(harness, 0) 1027 1028 def get_fifo(self): 1029 return self.fifo_fn 1030