1#!/usr/bin/env python3 2# vim: set syntax=python ts=4 : 3# 4# Copyright (c) 2018-2022 Intel Corporation 5# Copyright 2022 NXP 6# SPDX-License-Identifier: Apache-2.0 7 8import logging 9import math 10import os 11import psutil 12import re 13import select 14import shlex 15import signal 16import subprocess 17import sys 18import threading 19import time 20 21from pathlib import Path 22from queue import Queue, Empty 23from twisterlib.environment import ZEPHYR_BASE, strip_ansi_sequences 24from twisterlib.error import TwisterException 25from twisterlib.platform import Platform 26sys.path.insert(0, os.path.join(ZEPHYR_BASE, "scripts/pylib/build_helpers")) 27from domains import Domains 28 29try: 30 import serial 31except ImportError: 32 print("Install pyserial python module with pip to use --device-testing option.") 33 34try: 35 import pty 36except ImportError as capture_error: 37 if os.name == "nt": # "nt" means that program is running on Windows OS 38 pass # "--device-serial-pty" option is not supported on Windows OS 39 else: 40 raise capture_error 41 42logger = logging.getLogger('twister') 43logger.setLevel(logging.DEBUG) 44 45SUPPORTED_SIMS = ["mdb-nsim", "nsim", "renode", "qemu", "tsim", "armfvp", "xt-sim", "native", "custom"] 46SUPPORTED_SIMS_IN_PYTEST = ['native', 'qemu'] 47 48 49def terminate_process(proc): 50 """ 51 encapsulate terminate functionality so we do it consistently where ever 52 we might want to terminate the proc. We need try_kill_process_by_pid 53 because of both how newer ninja (1.6.0 or greater) and .NET / renode 54 work. Newer ninja's don't seem to pass SIGTERM down to the children 55 so we need to use try_kill_process_by_pid. 56 """ 57 58 for child in psutil.Process(proc.pid).children(recursive=True): 59 try: 60 os.kill(child.pid, signal.SIGTERM) 61 except (ProcessLookupError, psutil.NoSuchProcess): 62 pass 63 proc.terminate() 64 # sleep for a while before attempting to kill 65 time.sleep(0.5) 66 proc.kill() 67 68 69class Handler: 70 def __init__(self, instance, type_str="build"): 71 """Constructor 72 73 """ 74 self.options = None 75 76 self.run = False 77 self.type_str = type_str 78 79 self.pid_fn = None 80 self.call_make_run = True 81 82 self.name = instance.name 83 self.instance = instance 84 self.sourcedir = instance.testsuite.source_dir 85 self.build_dir = instance.build_dir 86 self.log = os.path.join(self.build_dir, "handler.log") 87 self.returncode = 0 88 self.generator_cmd = None 89 self.suite_name_check = True 90 self.ready = False 91 92 self.args = [] 93 self.terminated = False 94 95 def get_test_timeout(self): 96 return math.ceil(self.instance.testsuite.timeout * 97 self.instance.platform.timeout_multiplier * 98 self.options.timeout_multiplier) 99 100 def terminate(self, proc): 101 terminate_process(proc) 102 self.terminated = True 103 104 def _verify_ztest_suite_name(self, harness_state, detected_suite_names, handler_time): 105 """ 106 If test suite names was found in test's C source code, then verify if 107 detected suite names from output correspond to expected suite names 108 (and not in reverse). 109 """ 110 expected_suite_names = self.instance.testsuite.ztest_suite_names 111 logger.debug(f"Expected suite names:{expected_suite_names}") 112 logger.debug(f"Detected suite names:{detected_suite_names}") 113 if not expected_suite_names or \ 114 not harness_state == "passed": 115 return 116 if not detected_suite_names: 117 self._missing_suite_name(expected_suite_names, handler_time) 118 return 119 # compare the expect and detect from end one by one without order 120 _d_suite = detected_suite_names[-len(expected_suite_names):] 121 if set(_d_suite) != set(expected_suite_names): 122 if not set(_d_suite).issubset(set(expected_suite_names)): 123 self._missing_suite_name(expected_suite_names, handler_time) 124 125 def _missing_suite_name(self, expected_suite_names, handler_time): 126 """ 127 Change result of performed test if problem with missing or unpropper 128 suite name was occurred. 129 """ 130 self.instance.status = "failed" 131 self.instance.execution_time = handler_time 132 for tc in self.instance.testcases: 133 tc.status = "failed" 134 self.instance.reason = f"Testsuite mismatch" 135 logger.debug("Test suite names were not printed or some of them in " \ 136 "output do not correspond with expected: %s", 137 str(expected_suite_names)) 138 139 def _final_handle_actions(self, harness, handler_time): 140 141 # only for Ztest tests: 142 harness_class_name = type(harness).__name__ 143 if self.suite_name_check and harness_class_name == "Test": 144 self._verify_ztest_suite_name(harness.state, harness.detected_suite_names, handler_time) 145 if self.instance.status == 'failed': 146 return 147 if not harness.matched_run_id and harness.run_id_exists: 148 self.instance.status = "failed" 149 self.instance.execution_time = handler_time 150 self.instance.reason = "RunID mismatch" 151 for tc in self.instance.testcases: 152 tc.status = "failed" 153 154 self.instance.record(harness.recording) 155 156 def get_default_domain_build_dir(self): 157 if self.instance.sysbuild: 158 # Load domain yaml to get default domain build directory 159 # Note: for targets using QEMU, we assume that the target will 160 # have added any additional images to the run target manually 161 domain_path = os.path.join(self.build_dir, "domains.yaml") 162 domains = Domains.from_file(domain_path) 163 logger.debug("Loaded sysbuild domain data from %s" % domain_path) 164 build_dir = domains.get_default_domain().build_dir 165 else: 166 build_dir = self.build_dir 167 return build_dir 168 169 170class BinaryHandler(Handler): 171 def __init__(self, instance, type_str): 172 """Constructor 173 174 @param instance Test Instance 175 """ 176 super().__init__(instance, type_str) 177 178 self.seed = None 179 self.extra_test_args = None 180 self.line = b"" 181 182 def try_kill_process_by_pid(self): 183 if self.pid_fn: 184 pid = int(open(self.pid_fn).read()) 185 os.unlink(self.pid_fn) 186 self.pid_fn = None # clear so we don't try to kill the binary twice 187 try: 188 os.kill(pid, signal.SIGKILL) 189 except (ProcessLookupError, psutil.NoSuchProcess): 190 pass 191 192 def _output_reader(self, proc): 193 self.line = proc.stdout.readline() 194 195 def _output_handler(self, proc, harness): 196 suffix = '\\r\\n' 197 198 with open(self.log, "wt") as log_out_fp: 199 timeout_extended = False 200 timeout_time = time.time() + self.get_test_timeout() 201 while True: 202 this_timeout = timeout_time - time.time() 203 if this_timeout < 0: 204 break 205 reader_t = threading.Thread(target=self._output_reader, args=(proc,), daemon=True) 206 reader_t.start() 207 reader_t.join(this_timeout) 208 if not reader_t.is_alive() and self.line != b"": 209 line_decoded = self.line.decode('utf-8', "replace") 210 if line_decoded.endswith(suffix): 211 stripped_line = line_decoded[:-len(suffix)].rstrip() 212 else: 213 stripped_line = line_decoded.rstrip() 214 logger.debug("OUTPUT: %s", stripped_line) 215 log_out_fp.write(strip_ansi_sequences(line_decoded)) 216 log_out_fp.flush() 217 harness.handle(stripped_line) 218 if harness.state: 219 if not timeout_extended or harness.capture_coverage: 220 timeout_extended = True 221 if harness.capture_coverage: 222 timeout_time = time.time() + 30 223 else: 224 timeout_time = time.time() + 2 225 else: 226 reader_t.join(0) 227 break 228 try: 229 # POSIX arch based ztests end on their own, 230 # so let's give it up to 100ms to do so 231 proc.wait(0.1) 232 except subprocess.TimeoutExpired: 233 self.terminate(proc) 234 235 def _create_command(self, robot_test): 236 if robot_test: 237 keywords = os.path.join(self.options.coverage_basedir, 'tests/robot/common.robot') 238 elf = os.path.join(self.build_dir, "zephyr/zephyr.elf") 239 command = [self.generator_cmd] 240 resc = "" 241 uart = "" 242 # os.path.join cannot be used on a Mock object, so we are 243 # explicitly checking the type 244 if isinstance(self.instance.platform, Platform): 245 for board_dir in self.options.board_root: 246 path = os.path.join(Path(board_dir).parent, self.instance.platform.resc) 247 if os.path.exists(path): 248 resc = path 249 break 250 uart = self.instance.platform.uart 251 command = ["renode-test", 252 "--variable", "KEYWORDS:" + keywords, 253 "--variable", "ELF:@" + elf, 254 "--variable", "RESC:@" + resc, 255 "--variable", "UART:" + uart] 256 elif self.call_make_run: 257 command = [self.generator_cmd, "run"] 258 elif self.instance.testsuite.type == "unit": 259 command = [self.binary] 260 else: 261 binary = os.path.join(self.get_default_domain_build_dir(), "zephyr", "zephyr.exe") 262 command = [binary] 263 264 if self.options.enable_valgrind: 265 command = ["valgrind", "--error-exitcode=2", 266 "--leak-check=full", 267 "--suppressions=" + ZEPHYR_BASE + "/scripts/valgrind.supp", 268 "--log-file=" + self.build_dir + "/valgrind.log", 269 "--track-origins=yes", 270 ] + command 271 272 # Only valid for native_sim 273 if self.seed is not None: 274 command.append(f"--seed={self.seed}") 275 if self.extra_test_args is not None: 276 command.extend(self.extra_test_args) 277 278 return command 279 280 def _create_env(self): 281 env = os.environ.copy() 282 if self.options.enable_asan: 283 env["ASAN_OPTIONS"] = "log_path=stdout:" + \ 284 env.get("ASAN_OPTIONS", "") 285 if not self.options.enable_lsan: 286 env["ASAN_OPTIONS"] += "detect_leaks=0" 287 288 if self.options.enable_ubsan: 289 env["UBSAN_OPTIONS"] = "log_path=stdout:halt_on_error=1:" + \ 290 env.get("UBSAN_OPTIONS", "") 291 292 return env 293 294 def _update_instance_info(self, harness_state, handler_time): 295 self.instance.execution_time = handler_time 296 if not self.terminated and self.returncode != 0: 297 self.instance.status = "failed" 298 if self.options.enable_valgrind and self.returncode == 2: 299 self.instance.reason = "Valgrind error" 300 else: 301 # When a process is killed, the default handler returns 128 + SIGTERM 302 # so in that case the return code itself is not meaningful 303 self.instance.reason = "Failed" 304 elif harness_state: 305 self.instance.status = harness_state 306 if harness_state == "failed": 307 self.instance.reason = "Failed" 308 else: 309 self.instance.status = "failed" 310 self.instance.reason = "Timeout" 311 self.instance.add_missing_case_status("blocked", "Timeout") 312 313 def handle(self, harness): 314 robot_test = getattr(harness, "is_robot_test", False) 315 316 command = self._create_command(robot_test) 317 318 logger.debug("Spawning process: " + 319 " ".join(shlex.quote(word) for word in command) + os.linesep + 320 "in directory: " + self.build_dir) 321 322 start_time = time.time() 323 324 env = self._create_env() 325 326 if robot_test: 327 harness.run_robot_test(command, self) 328 return 329 330 stderr_log = "{}/handler_stderr.log".format(self.instance.build_dir) 331 with open(stderr_log, "w+") as stderr_log_fp, subprocess.Popen(command, stdout=subprocess.PIPE, 332 stderr=stderr_log_fp, cwd=self.build_dir, env=env) as proc: 333 logger.debug("Spawning BinaryHandler Thread for %s" % self.name) 334 t = threading.Thread(target=self._output_handler, args=(proc, harness,), daemon=True) 335 t.start() 336 t.join() 337 if t.is_alive(): 338 self.terminate(proc) 339 t.join() 340 proc.wait() 341 self.returncode = proc.returncode 342 if proc.returncode != 0: 343 self.instance.status = "error" 344 self.instance.reason = "BinaryHandler returned {}".format(proc.returncode) 345 self.try_kill_process_by_pid() 346 347 handler_time = time.time() - start_time 348 349 # FIXME: This is needed when killing the simulator, the console is 350 # garbled and needs to be reset. Did not find a better way to do that. 351 if sys.stdout.isatty(): 352 subprocess.call(["stty", "sane"], stdin=sys.stdout) 353 354 self._update_instance_info(harness.state, handler_time) 355 356 self._final_handle_actions(harness, handler_time) 357 358 359class SimulationHandler(BinaryHandler): 360 def __init__(self, instance, type_str): 361 """Constructor 362 363 @param instance Test Instance 364 """ 365 super().__init__(instance, type_str) 366 367 if type_str == 'renode': 368 self.pid_fn = os.path.join(instance.build_dir, "renode.pid") 369 elif type_str == 'native': 370 self.call_make_run = False 371 self.ready = True 372 373 374class DeviceHandler(Handler): 375 376 def __init__(self, instance, type_str): 377 """Constructor 378 379 @param instance Test Instance 380 """ 381 super().__init__(instance, type_str) 382 383 def get_test_timeout(self): 384 timeout = super().get_test_timeout() 385 if self.options.enable_coverage: 386 # wait more for gcov data to be dumped on console 387 timeout += 120 388 return timeout 389 390 def monitor_serial(self, ser, halt_event, harness): 391 log_out_fp = open(self.log, "wb") 392 393 if self.options.enable_coverage: 394 # Set capture_coverage to True to indicate that right after 395 # test results we should get coverage data, otherwise we exit 396 # from the test. 397 harness.capture_coverage = True 398 399 # Wait for serial connection 400 while not ser.isOpen(): 401 time.sleep(0.1) 402 403 # Clear serial leftover. 404 ser.reset_input_buffer() 405 406 while ser.isOpen(): 407 if halt_event.is_set(): 408 logger.debug('halted') 409 ser.close() 410 break 411 412 try: 413 if not ser.in_waiting: 414 # no incoming bytes are waiting to be read from 415 # the serial input buffer, let other threads run 416 time.sleep(0.001) 417 continue 418 # maybe the serial port is still in reset 419 # check status may cause error 420 # wait for more time 421 except OSError: 422 time.sleep(0.001) 423 continue 424 except TypeError: 425 # This exception happens if the serial port was closed and 426 # its file descriptor cleared in between of ser.isOpen() 427 # and ser.in_waiting checks. 428 logger.debug("Serial port is already closed, stop reading.") 429 break 430 431 serial_line = None 432 try: 433 serial_line = ser.readline() 434 except TypeError: 435 pass 436 # ignore SerialException which may happen during the serial device 437 # power off/on process. 438 except serial.SerialException: 439 pass 440 441 # Just because ser_fileno has data doesn't mean an entire line 442 # is available yet. 443 if serial_line: 444 sl = serial_line.decode('utf-8', 'ignore').lstrip() 445 logger.debug("DEVICE: {0}".format(sl.rstrip())) 446 447 log_out_fp.write(strip_ansi_sequences(sl).encode('utf-8')) 448 log_out_fp.flush() 449 harness.handle(sl.rstrip()) 450 451 if harness.state: 452 if not harness.capture_coverage: 453 ser.close() 454 break 455 456 log_out_fp.close() 457 458 def device_is_available(self, instance): 459 device = instance.platform.name 460 fixture = instance.testsuite.harness_config.get("fixture") 461 dut_found = False 462 463 for d in self.duts: 464 if fixture and fixture not in map(lambda f: f.split(sep=':')[0], d.fixtures): 465 continue 466 if d.platform != device or (d.serial is None and d.serial_pty is None): 467 continue 468 dut_found = True 469 d.lock.acquire() 470 avail = False 471 if d.available: 472 d.available = 0 473 d.counter += 1 474 avail = True 475 d.lock.release() 476 if avail: 477 return d 478 479 if not dut_found: 480 raise TwisterException(f"No device to serve as {device} platform.") 481 482 return None 483 484 def make_device_available(self, serial): 485 for d in self.duts: 486 if serial in [d.serial_pty, d.serial]: 487 d.available = 1 488 489 @staticmethod 490 def run_custom_script(script, timeout): 491 with subprocess.Popen(script, stderr=subprocess.PIPE, stdout=subprocess.PIPE) as proc: 492 try: 493 stdout, stderr = proc.communicate(timeout=timeout) 494 logger.debug(stdout.decode()) 495 if proc.returncode != 0: 496 logger.error(f"Custom script failure: {stderr.decode(errors='ignore')}") 497 498 except subprocess.TimeoutExpired: 499 proc.kill() 500 proc.communicate() 501 logger.error("{} timed out".format(script)) 502 503 def _create_command(self, runner, hardware): 504 if (self.options.west_flash is not None) or runner: 505 command = ["west", "flash", "--skip-rebuild", "-d", self.build_dir] 506 command_extra_args = [] 507 508 # There are three ways this option is used. 509 # 1) bare: --west-flash 510 # This results in options.west_flash == [] 511 # 2) with a value: --west-flash="--board-id=42" 512 # This results in options.west_flash == "--board-id=42" 513 # 3) Multiple values: --west-flash="--board-id=42,--erase" 514 # This results in options.west_flash == "--board-id=42 --erase" 515 if self.options.west_flash and self.options.west_flash != []: 516 command_extra_args.extend(self.options.west_flash.split(',')) 517 518 if runner: 519 command.append("--runner") 520 command.append(runner) 521 522 board_id = hardware.probe_id or hardware.id 523 product = hardware.product 524 if board_id is not None: 525 if runner in ("pyocd", "nrfjprog", "nrfutil"): 526 command_extra_args.append("--dev-id") 527 command_extra_args.append(board_id) 528 elif runner == "openocd" and product == "STM32 STLink": 529 command_extra_args.append("--cmd-pre-init") 530 command_extra_args.append("hla_serial %s" % board_id) 531 elif runner == "openocd" and product == "STLINK-V3": 532 command_extra_args.append("--cmd-pre-init") 533 command_extra_args.append("hla_serial %s" % board_id) 534 elif runner == "openocd" and product == "EDBG CMSIS-DAP": 535 command_extra_args.append("--cmd-pre-init") 536 command_extra_args.append("cmsis_dap_serial %s" % board_id) 537 elif runner == "openocd" and product == "LPC-LINK2 CMSIS-DAP": 538 command_extra_args.append("--cmd-pre-init") 539 command_extra_args.append("adapter serial %s" % board_id) 540 elif runner == "jlink": 541 command.append("--tool-opt=-SelectEmuBySN %s" % board_id) 542 elif runner == "linkserver": 543 # for linkserver 544 # --probe=#<number> select by probe index 545 # --probe=<serial number> select by probe serial number 546 command.append("--probe=%s" % board_id) 547 elif runner == "stm32cubeprogrammer": 548 command.append("--tool-opt=sn=%s" % board_id) 549 550 # Receive parameters from runner_params field. 551 if hardware.runner_params: 552 for param in hardware.runner_params: 553 command.append(param) 554 555 if command_extra_args: 556 command.append('--') 557 command.extend(command_extra_args) 558 else: 559 command = [self.generator_cmd, "-C", self.build_dir, "flash"] 560 561 return command 562 563 def _update_instance_info(self, harness_state, handler_time, flash_error): 564 self.instance.execution_time = handler_time 565 if harness_state: 566 self.instance.status = harness_state 567 if harness_state == "failed": 568 self.instance.reason = "Failed" 569 self.instance.add_missing_case_status("blocked", harness_state) 570 elif not flash_error: 571 self.instance.status = "failed" 572 self.instance.reason = "Timeout" 573 574 if self.instance.status in ["error", "failed"]: 575 self.instance.add_missing_case_status("blocked", self.instance.reason) 576 577 def _terminate_pty(self, ser_pty, ser_pty_process): 578 logger.debug(f"Terminating serial-pty:'{ser_pty}'") 579 terminate_process(ser_pty_process) 580 try: 581 (stdout, stderr) = ser_pty_process.communicate(timeout=self.get_test_timeout()) 582 logger.debug(f"Terminated serial-pty:'{ser_pty}', stdout:'{stdout}', stderr:'{stderr}'") 583 except subprocess.TimeoutExpired: 584 logger.debug(f"Terminated serial-pty:'{ser_pty}'") 585 # 586 587 def _create_serial_connection(self, serial_device, hardware_baud, 588 flash_timeout, serial_pty, ser_pty_process): 589 try: 590 ser = serial.Serial( 591 serial_device, 592 baudrate=hardware_baud, 593 parity=serial.PARITY_NONE, 594 stopbits=serial.STOPBITS_ONE, 595 bytesize=serial.EIGHTBITS, 596 # the worst case of no serial input 597 timeout=max(flash_timeout, self.get_test_timeout()) 598 ) 599 except serial.SerialException as e: 600 self.instance.status = "failed" 601 self.instance.reason = "Serial Device Error" 602 logger.error("Serial device error: %s" % (str(e))) 603 604 self.instance.add_missing_case_status("blocked", "Serial Device Error") 605 if serial_pty and ser_pty_process: 606 self._terminate_pty(serial_pty, ser_pty_process) 607 608 if serial_pty: 609 self.make_device_available(serial_pty) 610 else: 611 self.make_device_available(serial_device) 612 raise 613 614 return ser 615 616 def get_hardware(self): 617 hardware = None 618 try: 619 hardware = self.device_is_available(self.instance) 620 while not hardware: 621 time.sleep(1) 622 hardware = self.device_is_available(self.instance) 623 except TwisterException as error: 624 self.instance.status = "failed" 625 self.instance.reason = str(error) 626 logger.error(self.instance.reason) 627 return hardware 628 629 def _get_serial_device(self, serial_pty, hardware_serial): 630 ser_pty_process = None 631 if serial_pty: 632 master, slave = pty.openpty() 633 try: 634 ser_pty_process = subprocess.Popen( 635 re.split('[, ]', serial_pty), 636 stdout=master, 637 stdin=master, 638 stderr=master 639 ) 640 except subprocess.CalledProcessError as error: 641 logger.error( 642 "Failed to run subprocess {}, error {}".format( 643 serial_pty, 644 error.output 645 ) 646 ) 647 return 648 649 serial_device = os.ttyname(slave) 650 else: 651 serial_device = hardware_serial 652 653 return serial_device, ser_pty_process 654 655 def handle(self, harness): 656 runner = None 657 hardware = self.get_hardware() 658 if hardware: 659 self.instance.dut = hardware.id 660 if not hardware: 661 return 662 663 runner = hardware.runner or self.options.west_runner 664 serial_pty = hardware.serial_pty 665 666 serial_device, ser_pty_process = self._get_serial_device(serial_pty, hardware.serial) 667 668 logger.debug(f"Using serial device {serial_device} @ {hardware.baud} baud") 669 670 command = self._create_command(runner, hardware) 671 672 pre_script = hardware.pre_script 673 post_flash_script = hardware.post_flash_script 674 post_script = hardware.post_script 675 676 if pre_script: 677 self.run_custom_script(pre_script, 30) 678 679 flash_timeout = hardware.flash_timeout 680 if hardware.flash_with_test: 681 flash_timeout += self.get_test_timeout() 682 683 serial_port = None 684 if hardware.flash_before is False: 685 serial_port = serial_device 686 687 try: 688 ser = self._create_serial_connection( 689 serial_port, 690 hardware.baud, 691 flash_timeout, 692 serial_pty, 693 ser_pty_process 694 ) 695 except serial.SerialException: 696 return 697 698 halt_monitor_evt = threading.Event() 699 700 t = threading.Thread(target=self.monitor_serial, daemon=True, 701 args=(ser, halt_monitor_evt, harness)) 702 start_time = time.time() 703 t.start() 704 705 d_log = "{}/device.log".format(self.instance.build_dir) 706 logger.debug('Flash command: %s', command) 707 flash_error = False 708 try: 709 stdout = stderr = None 710 with subprocess.Popen(command, stderr=subprocess.PIPE, stdout=subprocess.PIPE) as proc: 711 try: 712 (stdout, stderr) = proc.communicate(timeout=flash_timeout) 713 # ignore unencodable unicode chars 714 logger.debug(stdout.decode(errors="ignore")) 715 716 if proc.returncode != 0: 717 self.instance.status = "error" 718 self.instance.reason = "Device issue (Flash error?)" 719 flash_error = True 720 with open(d_log, "w") as dlog_fp: 721 dlog_fp.write(stderr.decode()) 722 halt_monitor_evt.set() 723 except subprocess.TimeoutExpired: 724 logger.warning("Flash operation timed out.") 725 self.terminate(proc) 726 (stdout, stderr) = proc.communicate() 727 self.instance.status = "error" 728 self.instance.reason = "Device issue (Timeout)" 729 flash_error = True 730 731 with open(d_log, "w") as dlog_fp: 732 dlog_fp.write(stderr.decode()) 733 734 except subprocess.CalledProcessError: 735 halt_monitor_evt.set() 736 self.instance.status = "error" 737 self.instance.reason = "Device issue (Flash error)" 738 flash_error = True 739 740 if post_flash_script: 741 self.run_custom_script(post_flash_script, 30) 742 743 # Connect to device after flashing it 744 if hardware.flash_before: 745 try: 746 logger.debug(f"Attach serial device {serial_device} @ {hardware.baud} baud") 747 ser.port = serial_device 748 ser.open() 749 except serial.SerialException: 750 return 751 752 if not flash_error: 753 # Always wait at most the test timeout here after flashing. 754 t.join(self.get_test_timeout()) 755 else: 756 # When the flash error is due exceptions, 757 # twister tell the monitor serial thread 758 # to close the serial. But it is necessary 759 # for this thread being run first and close 760 # have the change to close the serial. 761 t.join(0.1) 762 763 if t.is_alive(): 764 logger.debug("Timed out while monitoring serial output on {}".format(self.instance.platform.name)) 765 766 if ser.isOpen(): 767 ser.close() 768 769 if serial_pty: 770 self._terminate_pty(serial_pty, ser_pty_process) 771 772 handler_time = time.time() - start_time 773 774 self._update_instance_info(harness.state, handler_time, flash_error) 775 776 self._final_handle_actions(harness, handler_time) 777 778 if post_script: 779 self.run_custom_script(post_script, 30) 780 781 if serial_pty: 782 self.make_device_available(serial_pty) 783 else: 784 self.make_device_available(serial_device) 785 786 787class QEMUHandler(Handler): 788 """Spawns a thread to monitor QEMU output from pipes 789 790 We pass QEMU_PIPE to 'make run' and monitor the pipes for output. 791 We need to do this as once qemu starts, it runs forever until killed. 792 Test cases emit special messages to the console as they run, we check 793 for these to collect whether the test passed or failed. 794 """ 795 796 def __init__(self, instance, type_str): 797 """Constructor 798 799 @param instance Test instance 800 """ 801 802 super().__init__(instance, type_str) 803 self.fifo_fn = os.path.join(instance.build_dir, "qemu-fifo") 804 805 self.pid_fn = os.path.join(instance.build_dir, "qemu.pid") 806 807 self.stdout_fn = os.path.join(instance.build_dir, "qemu.stdout") 808 809 self.stderr_fn = os.path.join(instance.build_dir, "qemu.stderr") 810 811 if instance.testsuite.ignore_qemu_crash: 812 self.ignore_qemu_crash = True 813 self.ignore_unexpected_eof = True 814 else: 815 self.ignore_qemu_crash = False 816 self.ignore_unexpected_eof = False 817 818 @staticmethod 819 def _get_cpu_time(pid): 820 """get process CPU time. 821 822 The guest virtual time in QEMU icount mode isn't host time and 823 it's maintained by counting guest instructions, so we use QEMU 824 process execution time to mostly simulate the time of guest OS. 825 """ 826 proc = psutil.Process(pid) 827 cpu_time = proc.cpu_times() 828 return cpu_time.user + cpu_time.system 829 830 @staticmethod 831 def _thread_get_fifo_names(fifo_fn): 832 fifo_in = fifo_fn + ".in" 833 fifo_out = fifo_fn + ".out" 834 835 return fifo_in, fifo_out 836 837 @staticmethod 838 def _thread_open_files(fifo_in, fifo_out, logfile): 839 # These in/out nodes are named from QEMU's perspective, not ours 840 if os.path.exists(fifo_in): 841 os.unlink(fifo_in) 842 os.mkfifo(fifo_in) 843 if os.path.exists(fifo_out): 844 os.unlink(fifo_out) 845 os.mkfifo(fifo_out) 846 847 # We don't do anything with out_fp but we need to open it for 848 # writing so that QEMU doesn't block, due to the way pipes work 849 out_fp = open(fifo_in, "wb") 850 # Disable internal buffering, we don't 851 # want read() or poll() to ever block if there is data in there 852 in_fp = open(fifo_out, "rb", buffering=0) 853 log_out_fp = open(logfile, "wt") 854 855 return out_fp, in_fp, log_out_fp 856 857 @staticmethod 858 def _thread_close_files(fifo_in, fifo_out, pid, out_fp, in_fp, log_out_fp): 859 log_out_fp.close() 860 out_fp.close() 861 in_fp.close() 862 if pid: 863 try: 864 if pid: 865 os.kill(pid, signal.SIGTERM) 866 except (ProcessLookupError, psutil.NoSuchProcess): 867 # Oh well, as long as it's dead! User probably sent Ctrl-C 868 pass 869 870 os.unlink(fifo_in) 871 os.unlink(fifo_out) 872 873 @staticmethod 874 def _thread_update_instance_info(handler, handler_time, status, reason): 875 handler.instance.execution_time = handler_time 876 handler.instance.status = status 877 if reason: 878 handler.instance.reason = reason 879 else: 880 handler.instance.reason = "Unknown" 881 882 @staticmethod 883 def _thread(handler, timeout, outdir, logfile, fifo_fn, pid_fn, 884 harness, ignore_unexpected_eof=False): 885 fifo_in, fifo_out = QEMUHandler._thread_get_fifo_names(fifo_fn) 886 887 out_fp, in_fp, log_out_fp = QEMUHandler._thread_open_files( 888 fifo_in, 889 fifo_out, 890 logfile 891 ) 892 893 start_time = time.time() 894 timeout_time = start_time + timeout 895 p = select.poll() 896 p.register(in_fp, select.POLLIN) 897 _status = None 898 _reason = None 899 900 line = "" 901 timeout_extended = False 902 903 pid = 0 904 if os.path.exists(pid_fn): 905 pid = int(open(pid_fn).read()) 906 907 while True: 908 this_timeout = int((timeout_time - time.time()) * 1000) 909 if timeout_extended: 910 # Quit early after timeout extension if no more data is being received 911 this_timeout = min(this_timeout, 1000) 912 if this_timeout < 0 or not p.poll(this_timeout): 913 try: 914 if pid and this_timeout > 0: 915 # there's possibility we polled nothing because 916 # of not enough CPU time scheduled by host for 917 # QEMU process during p.poll(this_timeout) 918 cpu_time = QEMUHandler._get_cpu_time(pid) 919 if cpu_time < timeout and not _status: 920 timeout_time = time.time() + (timeout - cpu_time) 921 continue 922 except psutil.NoSuchProcess: 923 pass 924 except ProcessLookupError: 925 _status = "failed" 926 _reason = "Execution error" 927 break 928 929 if not _status: 930 _status = "failed" 931 _reason = "timeout" 932 break 933 934 if pid == 0 and os.path.exists(pid_fn): 935 pid = int(open(pid_fn).read()) 936 937 try: 938 c = in_fp.read(1).decode("utf-8") 939 except UnicodeDecodeError: 940 # Test is writing something weird, fail 941 _status = "failed" 942 _reason = "unexpected byte" 943 break 944 945 if c == "": 946 # EOF, this shouldn't happen unless QEMU crashes 947 if not ignore_unexpected_eof: 948 _status = "failed" 949 _reason = "unexpected eof" 950 break 951 line = line + c 952 if c != "\n": 953 continue 954 955 # line contains a full line of data output from QEMU 956 log_out_fp.write(strip_ansi_sequences(line)) 957 log_out_fp.flush() 958 line = line.rstrip() 959 logger.debug(f"QEMU ({pid}): {line}") 960 961 harness.handle(line) 962 if harness.state: 963 # if we have registered a fail make sure the status is not 964 # overridden by a false success message coming from the 965 # testsuite 966 if _status != 'failed': 967 _status = harness.state 968 _reason = harness.reason 969 970 # if we get some status, that means test is doing well, we reset 971 # the timeout and wait for 2 more seconds to catch anything 972 # printed late. We wait much longer if code 973 # coverage is enabled since dumping this information can 974 # take some time. 975 if not timeout_extended or harness.capture_coverage: 976 timeout_extended = True 977 if harness.capture_coverage: 978 timeout_time = time.time() + 30 979 else: 980 timeout_time = time.time() + 2 981 line = "" 982 983 handler_time = time.time() - start_time 984 logger.debug(f"QEMU ({pid}) complete with {_status} ({_reason}) after {handler_time} seconds") 985 986 QEMUHandler._thread_update_instance_info(handler, handler_time, _status, _reason) 987 988 QEMUHandler._thread_close_files(fifo_in, fifo_out, pid, out_fp, in_fp, log_out_fp) 989 990 def _set_qemu_filenames(self, sysbuild_build_dir): 991 # We pass this to QEMU which looks for fifos with .in and .out suffixes. 992 # QEMU fifo will use main build dir 993 self.fifo_fn = os.path.join(self.instance.build_dir, "qemu-fifo") 994 # PID file will be created in the main sysbuild app's build dir 995 self.pid_fn = os.path.join(sysbuild_build_dir, "qemu.pid") 996 997 if os.path.exists(self.pid_fn): 998 os.unlink(self.pid_fn) 999 1000 self.log_fn = self.log 1001 1002 def _create_command(self, sysbuild_build_dir): 1003 command = [self.generator_cmd] 1004 command += ["-C", sysbuild_build_dir, "run"] 1005 1006 return command 1007 1008 def _update_instance_info(self, harness_state, is_timeout): 1009 if (self.returncode != 0 and not self.ignore_qemu_crash) or not harness_state: 1010 self.instance.status = "failed" 1011 if is_timeout: 1012 self.instance.reason = "Timeout" 1013 else: 1014 if not self.instance.reason: 1015 self.instance.reason = "Exited with {}".format(self.returncode) 1016 self.instance.add_missing_case_status("blocked") 1017 1018 def handle(self, harness): 1019 self.run = True 1020 1021 domain_build_dir = self.get_default_domain_build_dir() 1022 1023 command = self._create_command(domain_build_dir) 1024 1025 self._set_qemu_filenames(domain_build_dir) 1026 1027 self.thread = threading.Thread(name=self.name, target=QEMUHandler._thread, 1028 args=(self, self.get_test_timeout(), self.build_dir, 1029 self.log_fn, self.fifo_fn, 1030 self.pid_fn, harness, 1031 self.ignore_unexpected_eof)) 1032 1033 self.thread.daemon = True 1034 logger.debug("Spawning QEMUHandler Thread for %s" % self.name) 1035 self.thread.start() 1036 thread_max_time = time.time() + self.get_test_timeout() 1037 if sys.stdout.isatty(): 1038 subprocess.call(["stty", "sane"], stdin=sys.stdout) 1039 1040 logger.debug("Running %s (%s)" % (self.name, self.type_str)) 1041 1042 is_timeout = False 1043 qemu_pid = None 1044 1045 with subprocess.Popen(command, stdout=open(self.stdout_fn, "wt"), stderr=open(self.stderr_fn, "wt"), cwd=self.build_dir) as proc: 1046 logger.debug("Spawning QEMUHandler Thread for %s" % self.name) 1047 1048 try: 1049 proc.wait(self.get_test_timeout()) 1050 except subprocess.TimeoutExpired: 1051 # sometimes QEMU can't handle SIGTERM signal correctly 1052 # in that case kill -9 QEMU process directly and leave 1053 # twister to judge testing result by console output 1054 1055 is_timeout = True 1056 self.terminate(proc) 1057 if harness.state == "passed": 1058 self.returncode = 0 1059 else: 1060 self.returncode = proc.returncode 1061 else: 1062 if os.path.exists(self.pid_fn): 1063 qemu_pid = int(open(self.pid_fn).read()) 1064 logger.debug(f"No timeout, return code from QEMU ({qemu_pid}): {proc.returncode}") 1065 self.returncode = proc.returncode 1066 # Need to wait for harness to finish processing 1067 # output from QEMU. Otherwise it might miss some 1068 # messages. 1069 self.thread.join(max(thread_max_time - time.time(), 0)) 1070 if self.thread.is_alive(): 1071 logger.debug("Timed out while monitoring QEMU output") 1072 1073 if os.path.exists(self.pid_fn): 1074 qemu_pid = int(open(self.pid_fn).read()) 1075 os.unlink(self.pid_fn) 1076 1077 logger.debug(f"return code from QEMU ({qemu_pid}): {self.returncode}") 1078 1079 self._update_instance_info(harness.state, is_timeout) 1080 1081 self._final_handle_actions(harness, 0) 1082 1083 def get_fifo(self): 1084 return self.fifo_fn 1085 1086 1087class QEMUWinHandler(Handler): 1088 """Spawns a thread to monitor QEMU output from pipes on Windows OS 1089 1090 QEMU creates single duplex pipe at //.pipe/path, where path is fifo_fn. 1091 We need to do this as once qemu starts, it runs forever until killed. 1092 Test cases emit special messages to the console as they run, we check 1093 for these to collect whether the test passed or failed. 1094 """ 1095 1096 def __init__(self, instance, type_str): 1097 """Constructor 1098 1099 @param instance Test instance 1100 """ 1101 1102 super().__init__(instance, type_str) 1103 self.pid_fn = os.path.join(instance.build_dir, "qemu.pid") 1104 self.fifo_fn = os.path.join(instance.build_dir, "qemu-fifo") 1105 self.pipe_handle = None 1106 self.pid = 0 1107 self.thread = None 1108 self.stop_thread = False 1109 1110 if instance.testsuite.ignore_qemu_crash: 1111 self.ignore_qemu_crash = True 1112 self.ignore_unexpected_eof = True 1113 else: 1114 self.ignore_qemu_crash = False 1115 self.ignore_unexpected_eof = False 1116 1117 @staticmethod 1118 def _get_cpu_time(pid): 1119 """get process CPU time. 1120 1121 The guest virtual time in QEMU icount mode isn't host time and 1122 it's maintained by counting guest instructions, so we use QEMU 1123 process execution time to mostly simulate the time of guest OS. 1124 """ 1125 proc = psutil.Process(pid) 1126 cpu_time = proc.cpu_times() 1127 return cpu_time.user + cpu_time.system 1128 1129 @staticmethod 1130 def _open_log_file(logfile): 1131 return open(logfile, "wt") 1132 1133 @staticmethod 1134 def _close_log_file(log_file): 1135 log_file.close() 1136 1137 @staticmethod 1138 def _stop_qemu_process(pid): 1139 if pid: 1140 try: 1141 if pid: 1142 os.kill(pid, signal.SIGTERM) 1143 except (ProcessLookupError, psutil.NoSuchProcess): 1144 # Oh well, as long as it's dead! User probably sent Ctrl-C 1145 pass 1146 except OSError: 1147 pass 1148 1149 @staticmethod 1150 def _monitor_update_instance_info(handler, handler_time, status, reason): 1151 handler.instance.execution_time = handler_time 1152 handler.instance.status = status 1153 if reason: 1154 handler.instance.reason = reason 1155 else: 1156 handler.instance.reason = "Unknown" 1157 1158 def _set_qemu_filenames(self, sysbuild_build_dir): 1159 # PID file will be created in the main sysbuild app's build dir 1160 self.pid_fn = os.path.join(sysbuild_build_dir, "qemu.pid") 1161 1162 if os.path.exists(self.pid_fn): 1163 os.unlink(self.pid_fn) 1164 1165 self.log_fn = self.log 1166 1167 def _create_command(self, sysbuild_build_dir): 1168 command = [self.generator_cmd] 1169 command += ["-C", sysbuild_build_dir, "run"] 1170 1171 return command 1172 1173 def _update_instance_info(self, harness_state, is_timeout): 1174 if (self.returncode != 0 and not self.ignore_qemu_crash) or not harness_state: 1175 self.instance.status = "failed" 1176 if is_timeout: 1177 self.instance.reason = "Timeout" 1178 else: 1179 if not self.instance.reason: 1180 self.instance.reason = "Exited with {}".format(self.returncode) 1181 self.instance.add_missing_case_status("blocked") 1182 1183 def _enqueue_char(self, queue): 1184 while not self.stop_thread: 1185 if not self.pipe_handle: 1186 try: 1187 self.pipe_handle = os.open(r"\\.\pipe\\" + self.fifo_fn, os.O_RDONLY) 1188 except FileNotFoundError as e: 1189 if e.args[0] == 2: 1190 # Pipe is not opened yet, try again after a delay. 1191 time.sleep(1) 1192 continue 1193 1194 c = "" 1195 try: 1196 c = os.read(self.pipe_handle, 1) 1197 finally: 1198 queue.put(c) 1199 1200 def _monitor_output(self, queue, timeout, logfile, pid_fn, harness, ignore_unexpected_eof=False): 1201 start_time = time.time() 1202 timeout_time = start_time + timeout 1203 _status = None 1204 _reason = None 1205 line = "" 1206 timeout_extended = False 1207 self.pid = 0 1208 1209 log_out_fp = self._open_log_file(logfile) 1210 1211 while True: 1212 this_timeout = int((timeout_time - time.time()) * 1000) 1213 if this_timeout < 0: 1214 try: 1215 if self.pid and this_timeout > 0: 1216 # there's possibility we polled nothing because 1217 # of not enough CPU time scheduled by host for 1218 # QEMU process during p.poll(this_timeout) 1219 cpu_time = self._get_cpu_time(self.pid) 1220 if cpu_time < timeout and not _status: 1221 timeout_time = time.time() + (timeout - cpu_time) 1222 continue 1223 except psutil.NoSuchProcess: 1224 pass 1225 except ProcessLookupError: 1226 _status = "failed" 1227 _reason = "Execution error" 1228 break 1229 1230 if not _status: 1231 _status = "failed" 1232 _reason = "timeout" 1233 break 1234 1235 if self.pid == 0 and os.path.exists(pid_fn): 1236 try: 1237 self.pid = int(open(pid_fn).read()) 1238 except ValueError: 1239 # pid file probably not contains pid yet, continue 1240 pass 1241 1242 try: 1243 c = queue.get_nowait() 1244 except Empty: 1245 continue 1246 try: 1247 c = c.decode("utf-8") 1248 except UnicodeDecodeError: 1249 # Test is writing something weird, fail 1250 _status = "failed" 1251 _reason = "unexpected byte" 1252 break 1253 1254 if c == "": 1255 # EOF, this shouldn't happen unless QEMU crashes 1256 if not ignore_unexpected_eof: 1257 _status = "failed" 1258 _reason = "unexpected eof" 1259 break 1260 line = line + c 1261 if c != "\n": 1262 continue 1263 1264 # line contains a full line of data output from QEMU 1265 log_out_fp.write(line) 1266 log_out_fp.flush() 1267 line = line.rstrip() 1268 logger.debug(f"QEMU ({self.pid}): {line}") 1269 1270 harness.handle(line) 1271 if harness.state: 1272 # if we have registered a fail make sure the state is not 1273 # overridden by a false success message coming from the 1274 # testsuite 1275 if _status != 'failed': 1276 _status = harness.state 1277 _reason = harness.reason 1278 1279 # if we get some state, that means test is doing well, we reset 1280 # the timeout and wait for 2 more seconds to catch anything 1281 # printed late. We wait much longer if code 1282 # coverage is enabled since dumping this information can 1283 # take some time. 1284 if not timeout_extended or harness.capture_coverage: 1285 timeout_extended = True 1286 if harness.capture_coverage: 1287 timeout_time = time.time() + 30 1288 else: 1289 timeout_time = time.time() + 2 1290 line = "" 1291 1292 self.stop_thread = True 1293 1294 handler_time = time.time() - start_time 1295 logger.debug(f"QEMU ({self.pid}) complete with {_status} ({_reason}) after {handler_time} seconds") 1296 self._monitor_update_instance_info(self, handler_time, _status, _reason) 1297 self._close_log_file(log_out_fp) 1298 self._stop_qemu_process(self.pid) 1299 1300 def handle(self, harness): 1301 self.run = True 1302 1303 domain_build_dir = self.get_default_domain_build_dir() 1304 command = self._create_command(domain_build_dir) 1305 self._set_qemu_filenames(domain_build_dir) 1306 1307 logger.debug("Running %s (%s)" % (self.name, self.type_str)) 1308 is_timeout = False 1309 self.stop_thread = False 1310 queue = Queue() 1311 1312 with subprocess.Popen(command, stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT, 1313 cwd=self.build_dir) as proc: 1314 logger.debug("Spawning QEMUHandler Thread for %s" % self.name) 1315 1316 self.thread = threading.Thread(target=self._enqueue_char, args=(queue,)) 1317 self.thread.daemon = True 1318 self.thread.start() 1319 1320 thread_max_time = time.time() + self.get_test_timeout() 1321 1322 self._monitor_output(queue, self.get_test_timeout(), self.log_fn, self.pid_fn, harness, 1323 self.ignore_unexpected_eof) 1324 1325 if (thread_max_time - time.time()) < 0: 1326 logger.debug("Timed out while monitoring QEMU output") 1327 proc.terminate() 1328 # sleep for a while before attempting to kill 1329 time.sleep(0.5) 1330 proc.kill() 1331 1332 if harness.state == "passed": 1333 self.returncode = 0 1334 else: 1335 self.returncode = proc.returncode 1336 1337 if os.path.exists(self.pid_fn): 1338 os.unlink(self.pid_fn) 1339 1340 logger.debug(f"return code from QEMU ({self.pid}): {self.returncode}") 1341 1342 os.close(self.pipe_handle) 1343 self.pipe_handle = None 1344 1345 self._update_instance_info(harness.state, is_timeout) 1346 1347 self._final_handle_actions(harness, 0) 1348 1349 def get_fifo(self): 1350 return self.fifo_fn 1351