1#!/usr/bin/env python3
2# vim: set syntax=python ts=4 :
3#
4# Copyright (c) 2018-2022 Intel Corporation
5# Copyright 2022 NXP
6# Copyright (c) 2024 Arm Limited (or its affiliates). All rights reserved.
7#
8# SPDX-License-Identifier: Apache-2.0
9
10import argparse
11import contextlib
12import logging
13import math
14import os
15import re
16import select
17import shlex
18import signal
19import subprocess
20import sys
21import threading
22import time
23from contextlib import contextmanager
24from pathlib import Path
25from queue import Empty, Queue
26
27import psutil
28from twisterlib.environment import ZEPHYR_BASE, strip_ansi_sequences
29from twisterlib.error import TwisterException
30from twisterlib.platform import Platform
31from twisterlib.statuses import TwisterStatus
32
33sys.path.insert(0, os.path.join(ZEPHYR_BASE, "scripts/pylib/build_helpers"))
34from domains import Domains
35
36try:
37    import serial
38except ImportError:
39    print("Install pyserial python module with pip to use --device-testing option.")
40
41try:
42    import pty
43except ImportError as capture_error:
44    if os.name == "nt":  # "nt" means that program is running on Windows OS
45        pass  # "--device-serial-pty" option is not supported on Windows OS
46    else:
47        raise capture_error
48
49logger = logging.getLogger('twister')
50logger.setLevel(logging.DEBUG)
51
52
53def terminate_process(proc):
54    """
55    encapsulate terminate functionality so we do it consistently where ever
56    we might want to terminate the proc.  We need try_kill_process_by_pid
57    because of both how newer ninja (1.6.0 or greater) and .NET / renode
58    work.  Newer ninja's don't seem to pass SIGTERM down to the children
59    so we need to use try_kill_process_by_pid.
60    """
61
62    with contextlib.suppress(ProcessLookupError, psutil.NoSuchProcess):
63        for child in psutil.Process(proc.pid).children(recursive=True):
64            with contextlib.suppress(ProcessLookupError, psutil.NoSuchProcess):
65                os.kill(child.pid, signal.SIGTERM)
66    proc.terminate()
67    # sleep for a while before attempting to kill
68    time.sleep(0.5)
69    proc.kill()
70
71
72class Handler:
73    def __init__(self, instance, type_str: str, options: argparse.Namespace,
74                 generator_cmd: str | None = None, suite_name_check: bool = True):
75        """Constructor
76
77        """
78        self.options = options
79
80        self.run = False
81        self.type_str = type_str
82
83        self.pid_fn = None
84        self.call_make_run = True
85
86        self.name = instance.name
87        self.instance = instance
88        self.sourcedir = instance.testsuite.source_dir
89        self.build_dir = instance.build_dir
90        self.log = os.path.join(self.build_dir, "handler.log")
91        self.returncode = 0
92        self.generator_cmd = generator_cmd
93        self.suite_name_check = suite_name_check
94        self.ready = False
95
96        self.args = []
97        self.terminated = False
98
99    def get_test_timeout(self):
100        return math.ceil(self.instance.testsuite.timeout *
101                         self.instance.platform.timeout_multiplier *
102                         self.options.timeout_multiplier)
103
104    def terminate(self, proc):
105        terminate_process(proc)
106        self.terminated = True
107
108    def _verify_ztest_suite_name(self, harness_status, detected_suite_names, handler_time):
109        """
110        If test suite names was found in test's C source code, then verify if
111        detected suite names from output correspond to expected suite names
112        (and not in reverse).
113        """
114        expected_suite_names = self.instance.testsuite.ztest_suite_names
115        logger.debug(f"Expected suite names:{expected_suite_names}")
116        logger.debug(f"Detected suite names:{detected_suite_names}")
117        if not expected_suite_names or \
118                harness_status != TwisterStatus.PASS:
119            return
120        if not detected_suite_names:
121            self._missing_suite_name(expected_suite_names, handler_time)
122            return
123        # compare the expect and detect from end one by one without order
124        _d_suite = detected_suite_names[-len(expected_suite_names):]
125        if (
126            set(_d_suite) != set(expected_suite_names)
127            and not set(_d_suite).issubset(set(expected_suite_names))
128        ):
129            self._missing_suite_name(expected_suite_names, handler_time)
130
131    def _missing_suite_name(self, expected_suite_names, handler_time):
132        """
133        Change result of performed test if problem with missing or unpropper
134        suite name was occurred.
135        """
136        self.instance.status = TwisterStatus.FAIL
137        self.instance.execution_time = handler_time
138        for tc in self.instance.testcases:
139            tc.status = TwisterStatus.FAIL
140        self.instance.reason = "Testsuite mismatch"
141        logger.debug(
142            "Test suite names were not printed or some of them in output"
143            f" do not correspond with expected: {str(expected_suite_names)}",
144        )
145
146    def _final_handle_actions(self, harness, handler_time):
147
148        # only for Ztest tests:
149        harness_class_name = type(harness).__name__
150        if self.suite_name_check and harness_class_name == "Test":
151            self._verify_ztest_suite_name(
152                harness.status,
153                harness.detected_suite_names,
154                handler_time
155            )
156            if self.instance.status == TwisterStatus.FAIL:
157                return
158            if not harness.matched_run_id and harness.run_id_exists:
159                self.instance.status = TwisterStatus.FAIL
160                self.instance.execution_time = handler_time
161                self.instance.reason = "RunID mismatch"
162                for tc in self.instance.testcases:
163                    tc.status = TwisterStatus.FAIL
164
165        self.instance.record(harness.recording)
166
167    def get_default_domain_build_dir(self):
168        if self.instance.sysbuild:
169            # Load domain yaml to get default domain build directory
170            # Note: for targets using QEMU, we assume that the target will
171            # have added any additional images to the run target manually
172            domain_path = os.path.join(self.build_dir, "domains.yaml")
173            domains = Domains.from_file(domain_path)
174            logger.debug(f"Loaded sysbuild domain data from {domain_path}")
175            build_dir = domains.get_default_domain().build_dir
176        else:
177            build_dir = self.build_dir
178        return build_dir
179
180
181class BinaryHandler(Handler):
182    def __init__(
183        self,
184        instance,
185        type_str: str,
186        options: argparse.Namespace,
187        generator_cmd: str | None = None,
188        suite_name_check: bool = True
189    ):
190        """Constructor
191
192        @param instance Test Instance
193        """
194        super().__init__(instance, type_str, options, generator_cmd, suite_name_check)
195
196        self.seed = None
197        self.extra_test_args = None
198        self.line = b""
199        self.binary: str | None = None
200
201    def try_kill_process_by_pid(self):
202        if self.pid_fn:
203            with open(self.pid_fn) as pid_file:
204                pid = int(pid_file.read())
205            os.unlink(self.pid_fn)
206            self.pid_fn = None  # clear so we don't try to kill the binary twice
207            with contextlib.suppress(ProcessLookupError, psutil.NoSuchProcess):
208                os.kill(pid, signal.SIGKILL)
209
210    def _output_reader(self, proc):
211        self.line = proc.stdout.readline()
212
213    def _output_handler(self, proc, harness):
214        suffix = '\\r\\n'
215
216        with open(self.log, "w") as log_out_fp:
217            timeout_extended = False
218            timeout_time = time.time() + self.get_test_timeout()
219            while True:
220                this_timeout = timeout_time - time.time()
221                if this_timeout < 0:
222                    break
223                reader_t = threading.Thread(target=self._output_reader, args=(proc,), daemon=True)
224                reader_t.start()
225                reader_t.join(this_timeout)
226                if not reader_t.is_alive() and self.line != b"":
227                    line_decoded = self.line.decode('utf-8', "replace")
228                    stripped_line = line_decoded.rstrip()
229                    if stripped_line.endswith(suffix):
230                        stripped_line = stripped_line[:-len(suffix)].rstrip()
231                    logger.debug(f"OUTPUT: {stripped_line}")
232                    log_out_fp.write(strip_ansi_sequences(line_decoded))
233                    log_out_fp.flush()
234                    harness.handle(stripped_line)
235                    if (
236                        harness.status != TwisterStatus.NONE
237                        and not timeout_extended
238                        or harness.capture_coverage
239                    ):
240                        timeout_extended = True
241                        if harness.capture_coverage:
242                            timeout_time = time.time() + 30
243                        else:
244                            timeout_time = time.time() + 2
245                else:
246                    reader_t.join(0)
247                    break
248            try:
249                # POSIX arch based ztests end on their own,
250                # so let's give it up to 100ms to do so
251                proc.wait(0.1)
252            except subprocess.TimeoutExpired:
253                self.terminate(proc)
254
255    def _create_command(self, robot_test):
256
257        if robot_test:
258            keywords = os.path.join(self.options.coverage_basedir, 'tests/robot/common.robot')
259            elf = os.path.join(self.build_dir, "zephyr/zephyr.elf")
260            command = [self.generator_cmd]
261            resc = ""
262            uart = ""
263            # os.path.join cannot be used on a Mock object, so we are
264            # explicitly checking the type
265            if isinstance(self.instance.platform, Platform):
266                for board_dir in self.options.board_root:
267                    path = os.path.join(Path(board_dir).parent, self.instance.platform.resc)
268                    if os.path.exists(path):
269                        resc = path
270                        break
271                uart = self.instance.platform.uart
272                command = ["renode-test",
273                            "--variable", "KEYWORDS:" + keywords,
274                            "--variable", "ELF:@" + elf,
275                            "--variable", "RESC:@" + resc,
276                            "--variable", "UART:" + uart]
277        elif self.call_make_run:
278            if self.options.sim_name:
279                target = f"run_{self.options.sim_name}"
280            else:
281                target = "run"
282
283            command = [self.generator_cmd, "-C", self.get_default_domain_build_dir(), target]
284        elif self.instance.testsuite.type == "unit":
285            assert self.binary, "Missing binary in unit testsuite."
286            command = [self.binary]
287        else:
288            binary = os.path.join(self.get_default_domain_build_dir(), "zephyr", "zephyr.exe")
289            command = [binary]
290
291        if self.options.enable_valgrind:
292            command = ["valgrind", "--error-exitcode=2",
293                       "--leak-check=full",
294                       "--suppressions=" + ZEPHYR_BASE + "/scripts/valgrind.supp",
295                       "--log-file=" + self.build_dir + "/valgrind.log",
296                       "--track-origins=yes",
297                       ] + command
298
299        # Only valid for native_sim
300        if self.seed is not None:
301            command.append(f"--seed={self.seed}")
302        if self.extra_test_args is not None:
303            command.extend(self.extra_test_args)
304
305        return command
306
307    def _create_env(self):
308        env = os.environ.copy()
309        if self.options.enable_asan:
310            env["ASAN_OPTIONS"] = "log_path=stdout:" + \
311                                  env.get("ASAN_OPTIONS", "")
312            if not self.options.enable_lsan:
313                env["ASAN_OPTIONS"] += "detect_leaks=0"
314
315        if self.options.enable_ubsan:
316            env["UBSAN_OPTIONS"] = "log_path=stdout:halt_on_error=1:" + \
317                                  env.get("UBSAN_OPTIONS", "")
318
319        return env
320
321    def _update_instance_info(self, harness, handler_time):
322        self.instance.execution_time = handler_time
323        if not self.terminated and self.returncode != 0:
324            self.instance.status = TwisterStatus.FAIL
325            if self.options.enable_valgrind and self.returncode == 2:
326                self.instance.reason = "Valgrind error"
327            else:
328                # When a process is killed, the default handler returns 128 + SIGTERM
329                # so in that case the return code itself is not meaningful
330                self.instance.reason = f"Failed (rc={self.returncode})"
331            self.instance.add_missing_case_status(TwisterStatus.BLOCK)
332        elif harness.status != TwisterStatus.NONE:
333            self.instance.status = harness.status
334            if harness.status == TwisterStatus.FAIL:
335                self.instance.reason = f"Failed harness:'{harness.reason}'"
336            self.instance.add_missing_case_status(TwisterStatus.BLOCK)
337        else:
338            self.instance.status = TwisterStatus.FAIL
339            self.instance.reason = "Timeout"
340            self.instance.add_missing_case_status(TwisterStatus.BLOCK, "Timeout")
341
342    def handle(self, harness):
343        robot_test = getattr(harness, "is_robot_test", False)
344
345        command = self._create_command(robot_test)
346
347        logger.debug("Spawning process: " +
348                     " ".join(shlex.quote(word) for word in command) + os.linesep +
349                     "in directory: " + self.build_dir)
350
351        start_time = time.time()
352
353        env = self._create_env()
354
355        if robot_test:
356            harness.run_robot_test(command, self)
357            return
358
359        stderr_log = f"{self.instance.build_dir}/handler_stderr.log"
360        with open(stderr_log, "w+") as stderr_log_fp, subprocess.Popen(
361            command, stdout=subprocess.PIPE, stderr=stderr_log_fp, cwd=self.build_dir, env=env
362        ) as proc:
363            logger.debug(f"Spawning BinaryHandler Thread for {self.name}")
364            t = threading.Thread(target=self._output_handler, args=(proc, harness,), daemon=True)
365            t.start()
366            t.join()
367            if t.is_alive():
368                self.terminate(proc)
369                t.join()
370            proc.wait()
371            self.returncode = proc.returncode
372            if proc.returncode != 0:
373                self.instance.status = TwisterStatus.ERROR
374                self.instance.reason = f"BinaryHandler returned {proc.returncode}"
375            self.try_kill_process_by_pid()
376
377        handler_time = time.time() - start_time
378
379        # FIXME: This is needed when killing the simulator, the console is
380        # garbled and needs to be reset. Did not find a better way to do that.
381        if sys.stdout.isatty():
382            subprocess.call(["stty", "sane"], stdin=sys.stdout)
383
384        self._update_instance_info(harness, handler_time)
385
386        self._final_handle_actions(harness, handler_time)
387
388
389class SimulationHandler(BinaryHandler):
390    def __init__(
391        self,
392        instance,
393        type_str: str,
394        options: argparse.Namespace,
395        generator_cmd: str | None = None,
396        suite_name_check: bool = True,
397    ):
398        """Constructor
399
400        @param instance Test Instance
401        """
402        super().__init__(instance, type_str, options, generator_cmd, suite_name_check)
403
404        if type_str == 'renode':
405            self.pid_fn = os.path.join(instance.build_dir, "renode.pid")
406        elif type_str == 'native':
407            self.call_make_run = False
408            self.ready = True
409
410
411class DeviceHandler(Handler):
412    def get_test_timeout(self):
413        timeout = super().get_test_timeout()
414        if self.options.enable_coverage:
415            # wait more for gcov data to be dumped on console
416            timeout += 120
417        return timeout
418
419    def monitor_serial(self, ser, halt_event, harness):
420        if self.options.enable_coverage:
421            # Set capture_coverage to True to indicate that right after
422            # test results we should get coverage data, otherwise we exit
423            # from the test.
424            harness.capture_coverage = True
425
426        # Wait for serial connection
427        while not ser.isOpen():
428            time.sleep(0.1)
429
430        # Clear serial leftover.
431        ser.reset_input_buffer()
432
433        with open(self.log, "wb") as log_out_fp:
434            while ser.isOpen():
435                if halt_event.is_set():
436                    logger.debug('halted')
437                    ser.close()
438                    break
439
440                try:
441                    if not ser.in_waiting:
442                        # no incoming bytes are waiting to be read from
443                        # the serial input buffer, let other threads run
444                        time.sleep(0.001)
445                        continue
446                # maybe the serial port is still in reset
447                # check status may cause error
448                # wait for more time
449                except OSError:
450                    time.sleep(0.001)
451                    continue
452                except TypeError:
453                    # This exception happens if the serial port was closed and
454                    # its file descriptor cleared in between of ser.isOpen()
455                    # and ser.in_waiting checks.
456                    logger.debug("Serial port is already closed, stop reading.")
457                    break
458
459                serial_line = None
460                # SerialException may happen during the serial device power off/on process.
461                with contextlib.suppress(TypeError, serial.SerialException):
462                    serial_line = ser.readline()
463
464                # Just because ser_fileno has data doesn't mean an entire line
465                # is available yet.
466                if serial_line:
467                    # can be more lines in serial_line so split them before handling
468                    for sl in serial_line.decode('utf-8', 'ignore').splitlines(keepends=True):
469                        log_out_fp.write(strip_ansi_sequences(sl).encode('utf-8'))
470                        log_out_fp.flush()
471                        if sl := sl.strip():
472                            logger.debug(f"DEVICE: {sl}")
473                            harness.handle(sl)
474
475                if harness.status != TwisterStatus.NONE and not harness.capture_coverage:
476                    ser.close()
477                    break
478
479    @staticmethod
480    @contextmanager
481    def acquire_dut_locks(duts):
482        try:
483            for d in duts:
484                d.lock.acquire()
485            yield
486        finally:
487            for d in duts:
488                d.lock.release()
489
490    def device_is_available(self, instance):
491        device = instance.platform.name
492        fixture = instance.testsuite.harness_config.get("fixture")
493        duts_found = []
494
495        for d in self.duts:
496            if fixture and fixture not in map(lambda f: f.split(sep=':')[0], d.fixtures):
497                continue
498            if d.platform != device or (d.serial is None and d.serial_pty is None):
499                continue
500            duts_found.append(d)
501
502        if not duts_found:
503            raise TwisterException(f"No device to serve as {device} platform.")
504
505        # Select an available DUT with less failures
506        for d in sorted(duts_found, key=lambda _dut: _dut.failures):
507            # get all DUTs with the same id
508            duts_shared_hw = [_d for _d in self.duts if _d.id == d.id]
509            with self.acquire_dut_locks(duts_shared_hw):
510                avail = False
511                if d.available:
512                    for _d in duts_shared_hw:
513                        _d.available = 0
514                    d.counter_increment()
515                    avail = True
516                    logger.debug(f"Retain DUT:{d.platform}, Id:{d.id}, "
517                                f"counter:{d.counter}, failures:{d.failures}")
518            if avail:
519                return d
520
521        return None
522
523    def make_dut_available(self, dut):
524        if self.instance.status in [TwisterStatus.ERROR, TwisterStatus.FAIL]:
525            dut.failures_increment()
526        logger.debug(f"Release DUT:{dut.platform}, Id:{dut.id}, "
527                     f"counter:{dut.counter}, failures:{dut.failures}")
528        # get all DUTs with the same id
529        duts_shared_hw = [_d for _d in self.duts if _d.id == dut.id]
530        with self.acquire_dut_locks(duts_shared_hw):
531            for _d in duts_shared_hw:
532                _d.available = 1
533
534    @staticmethod
535    def run_custom_script(script, timeout):
536        with subprocess.Popen(script, stderr=subprocess.PIPE, stdout=subprocess.PIPE) as proc:
537            try:
538                stdout, stderr = proc.communicate(timeout=timeout)
539                logger.debug(stdout.decode())
540                if proc.returncode != 0:
541                    logger.error(f"Custom script failure: {stderr.decode(errors='ignore')}")
542
543            except subprocess.TimeoutExpired:
544                proc.kill()
545                proc.communicate()
546                logger.error(f"{script} timed out")
547
548    def _create_command(self, runner, hardware):
549        if (self.options.west_flash is not None) or runner:
550            command = ["west", "flash", "--skip-rebuild", "-d", self.build_dir]
551            command_extra_args = []
552
553            # There are three ways this option is used.
554            # 1) bare: --west-flash
555            #    This results in options.west_flash == []
556            # 2) with a value: --west-flash="--board-id=42"
557            #    This results in options.west_flash == "--board-id=42"
558            # 3) Multiple values: --west-flash="--board-id=42,--erase"
559            #    This results in options.west_flash == "--board-id=42 --erase"
560            if self.options.west_flash and self.options.west_flash != []:
561                command_extra_args.extend(self.options.west_flash.split(','))
562
563            if runner:
564                command.append("--runner")
565                command.append(runner)
566
567                board_id = hardware.probe_id or hardware.id
568                product = hardware.product
569                if board_id is not None:
570                    if runner in ("pyocd", "nrfjprog", "nrfutil"):
571                        command_extra_args.append("--dev-id")
572                        command_extra_args.append(board_id)
573                    elif runner == "esp32":
574                        command_extra_args.append("--esp-device")
575                        command_extra_args.append(board_id)
576                    elif (
577                        runner == "openocd"
578                        and product == "STM32 STLink"
579                        or runner == "openocd"
580                        and product == "STLINK-V3"
581                    ):
582                        command_extra_args.append("--cmd-pre-init")
583                        command_extra_args.append(f"hla_serial {board_id}")
584                    elif runner == "openocd" and product == "EDBG CMSIS-DAP":
585                        command_extra_args.append("--cmd-pre-init")
586                        command_extra_args.append(f"cmsis_dap_serial {board_id}")
587                    elif runner == "openocd" and product == "LPC-LINK2 CMSIS-DAP":
588                        command_extra_args.append("--cmd-pre-init")
589                        command_extra_args.append(f"adapter serial {board_id}")
590                    elif runner == "jlink":
591                        command.append("--dev-id")
592                        command.append(board_id)
593                    elif runner == "linkserver":
594                        # for linkserver
595                        # --probe=#<number> select by probe index
596                        # --probe=<serial number> select by probe serial number
597                        command.append(f"--probe={board_id}")
598                    elif runner == "stm32cubeprogrammer" and product != "BOOT-SERIAL":
599                        command.append(f"--tool-opt=sn={board_id}")
600
601                    # Receive parameters from runner_params field.
602                    if hardware.runner_params:
603                        for param in hardware.runner_params:
604                            command.append(param)
605
606            if command_extra_args:
607                command.append('--')
608                command.extend(command_extra_args)
609        else:
610            command = [self.generator_cmd, "-C", self.build_dir, "flash"]
611
612        return command
613
614    def _update_instance_info(self, harness, handler_time, flash_error):
615        self.instance.execution_time = handler_time
616        if harness.status != TwisterStatus.NONE:
617            self.instance.status = harness.status
618            if harness.status == TwisterStatus.FAIL:
619                self.instance.reason = f"Failed harness:'{harness.reason}'"
620            self.instance.add_missing_case_status(TwisterStatus.BLOCK, harness.status)
621        elif not flash_error:
622            self.instance.status = TwisterStatus.FAIL
623            self.instance.reason = "Timeout"
624
625        if self.instance.status in [TwisterStatus.ERROR, TwisterStatus.FAIL]:
626            self.instance.add_missing_case_status(TwisterStatus.BLOCK, self.instance.reason)
627
628    def _terminate_pty(self, ser_pty, ser_pty_process):
629        logger.debug(f"Terminating serial-pty:'{ser_pty}'")
630        terminate_process(ser_pty_process)
631        try:
632            (stdout, stderr) = ser_pty_process.communicate(timeout=self.get_test_timeout())
633            logger.debug(f"Terminated serial-pty:'{ser_pty}', stdout:'{stdout}', stderr:'{stderr}'")
634        except subprocess.TimeoutExpired:
635            logger.debug(f"Terminated serial-pty:'{ser_pty}'")
636    #
637
638    def _create_serial_connection(self, dut, serial_device, hardware_baud,
639                                  flash_timeout, serial_pty, ser_pty_process):
640        try:
641            ser = serial.Serial(
642                serial_device,
643                baudrate=hardware_baud,
644                parity=serial.PARITY_NONE,
645                stopbits=serial.STOPBITS_ONE,
646                bytesize=serial.EIGHTBITS,
647                # the worst case of no serial input
648                timeout=max(flash_timeout, self.get_test_timeout())
649            )
650        except serial.SerialException as e:
651            self._handle_serial_exception(e, dut, serial_pty, ser_pty_process)
652            raise
653
654        return ser
655
656
657    def _handle_serial_exception(self, exception, dut, serial_pty, ser_pty_process):
658        self.instance.status = TwisterStatus.FAIL
659        self.instance.reason = "Serial Device Error"
660        logger.error(f"Serial device error: {exception!s}")
661
662        self.instance.add_missing_case_status(TwisterStatus.BLOCK, "Serial Device Error")
663        if serial_pty and ser_pty_process:
664            self._terminate_pty(serial_pty, ser_pty_process)
665
666        self.make_dut_available(dut)
667
668
669    def get_hardware(self):
670        hardware = None
671        try:
672            hardware = self.device_is_available(self.instance)
673            in_waiting = 0
674            while not hardware:
675                time.sleep(1)
676                in_waiting += 1
677                if in_waiting%60 == 0:
678                    logger.debug(f"Waiting for a DUT to run {self.instance.name}")
679                hardware = self.device_is_available(self.instance)
680        except TwisterException as error:
681            self.instance.status = TwisterStatus.FAIL
682            self.instance.reason = str(error)
683            logger.error(self.instance.reason)
684        return hardware
685
686    def _get_serial_device(self, serial_pty, hardware_serial):
687        ser_pty_process = None
688        if serial_pty:
689            master, slave = pty.openpty()
690            try:
691                ser_pty_process = subprocess.Popen(
692                    re.split('[, ]', serial_pty),
693                    stdout=master,
694                    stdin=master,
695                    stderr=master
696                )
697            except subprocess.CalledProcessError as error:
698                logger.error(
699                    f"Failed to run subprocess {serial_pty}, error {error.output}"
700                )
701                return
702
703            serial_device = os.ttyname(slave)
704        else:
705            serial_device = hardware_serial
706
707        return serial_device, ser_pty_process
708
709    def handle(self, harness):
710        runner = None
711        hardware = self.get_hardware()
712        if hardware:
713            self.instance.dut = hardware.id
714        else:
715            return
716
717        runner = hardware.runner or self.options.west_runner
718        serial_pty = hardware.serial_pty
719
720        serial_device, ser_pty_process = self._get_serial_device(serial_pty, hardware.serial)
721
722        logger.debug(f"Using serial device {serial_device} @ {hardware.baud} baud")
723
724        command = self._create_command(runner, hardware)
725
726        pre_script = hardware.pre_script
727        post_flash_script = hardware.post_flash_script
728        post_script = hardware.post_script
729        script_param = hardware.script_param
730
731        if pre_script:
732            timeout = 30
733            if script_param:
734                timeout = script_param.get("pre_script_timeout", timeout)
735            self.run_custom_script(pre_script, timeout)
736
737        flash_timeout = hardware.flash_timeout
738        if hardware.flash_with_test:
739            flash_timeout += self.get_test_timeout()
740
741        serial_port = None
742        if hardware.flash_before is False:
743            serial_port = serial_device
744
745        try:
746            ser = self._create_serial_connection(
747                hardware,
748                serial_port,
749                hardware.baud,
750                flash_timeout,
751                serial_pty,
752                ser_pty_process
753            )
754        except serial.SerialException:
755            return
756
757        halt_monitor_evt = threading.Event()
758
759        t = threading.Thread(target=self.monitor_serial, daemon=True,
760                             args=(ser, halt_monitor_evt, harness))
761        start_time = time.time()
762        t.start()
763
764        d_log = f"{self.instance.build_dir}/device.log"
765        logger.debug(f'Flash command: {command}', )
766        flash_error = False
767        try:
768            stdout = stderr = None
769            with subprocess.Popen(command, stderr=subprocess.PIPE, stdout=subprocess.PIPE) as proc:
770                try:
771                    (stdout, stderr) = proc.communicate(timeout=flash_timeout)
772                    # ignore unencodable unicode chars
773                    logger.debug(stdout.decode(errors="ignore"))
774
775                    if proc.returncode != 0:
776                        self.instance.status = TwisterStatus.ERROR
777                        self.instance.reason = "Device issue (Flash error?)"
778                        flash_error = True
779                        with open(d_log, "w") as dlog_fp:
780                            dlog_fp.write(stderr.decode())
781                        halt_monitor_evt.set()
782                except subprocess.TimeoutExpired:
783                    logger.warning("Flash operation timed out.")
784                    self.terminate(proc)
785                    (stdout, stderr) = proc.communicate()
786                    self.instance.status = TwisterStatus.ERROR
787                    self.instance.reason = "Device issue (Timeout)"
788                    flash_error = True
789
790            with open(d_log, "w") as dlog_fp:
791                dlog_fp.write(stderr.decode())
792
793        except subprocess.CalledProcessError:
794            halt_monitor_evt.set()
795            self.instance.status = TwisterStatus.ERROR
796            self.instance.reason = "Device issue (Flash error)"
797            flash_error = True
798
799        if post_flash_script:
800            timeout = 30
801            if script_param:
802                timeout = script_param.get("post_flash_timeout", timeout)
803            self.run_custom_script(post_flash_script, timeout)
804
805        # Connect to device after flashing it
806        if hardware.flash_before:
807            try:
808                logger.debug(f"Attach serial device {serial_device} @ {hardware.baud} baud")
809                ser.port = serial_device
810                ser.open()
811            except serial.SerialException as e:
812                self._handle_serial_exception(e, hardware, serial_pty, ser_pty_process)
813                return
814
815        if not flash_error:
816            # Always wait at most the test timeout here after flashing.
817            t.join(self.get_test_timeout())
818        else:
819            # When the flash error is due exceptions,
820            # twister tell the monitor serial thread
821            # to close the serial. But it is necessary
822            # for this thread being run first and close
823            # have the change to close the serial.
824            t.join(0.1)
825
826        if t.is_alive():
827            logger.debug(
828                f"Timed out while monitoring serial output on {self.instance.platform.name}"
829            )
830
831        if ser.isOpen():
832            ser.close()
833
834        if serial_pty:
835            self._terminate_pty(serial_pty, ser_pty_process)
836
837        handler_time = time.time() - start_time
838
839        self._update_instance_info(harness, handler_time, flash_error)
840
841        self._final_handle_actions(harness, handler_time)
842
843        if post_script:
844            timeout = 30
845            if script_param:
846                timeout = script_param.get("post_script_timeout", timeout)
847            self.run_custom_script(post_script, timeout)
848
849        self.make_dut_available(hardware)
850
851
852class QEMUHandler(Handler):
853    """Spawns a thread to monitor QEMU output from pipes
854
855    We pass QEMU_PIPE to 'make run' and monitor the pipes for output.
856    We need to do this as once qemu starts, it runs forever until killed.
857    Test cases emit special messages to the console as they run, we check
858    for these to collect whether the test passed or failed.
859    """
860
861    def __init__(
862        self,
863        instance,
864        type_str: str,
865        options: argparse.Namespace,
866        generator_cmd: str | None = None,
867        suite_name_check: bool = True,
868    ):
869        """Constructor
870
871        @param instance Test instance
872        """
873
874        super().__init__(instance, type_str, options, generator_cmd, suite_name_check)
875        self.fifo_fn = os.path.join(instance.build_dir, "qemu-fifo")
876
877        self.pid_fn = os.path.join(instance.build_dir, "qemu.pid")
878
879        self.stdout_fn = os.path.join(instance.build_dir, "qemu.stdout")
880
881        self.stderr_fn = os.path.join(instance.build_dir, "qemu.stderr")
882
883        if instance.testsuite.ignore_qemu_crash:
884            self.ignore_qemu_crash = True
885            self.ignore_unexpected_eof = True
886        else:
887            self.ignore_qemu_crash = False
888            self.ignore_unexpected_eof = False
889
890    @staticmethod
891    def _get_cpu_time(pid):
892        """get process CPU time.
893
894        The guest virtual time in QEMU icount mode isn't host time and
895        it's maintained by counting guest instructions, so we use QEMU
896        process execution time to mostly simulate the time of guest OS.
897        """
898        proc = psutil.Process(pid)
899        cpu_time = proc.cpu_times()
900        return cpu_time.user + cpu_time.system
901
902    @staticmethod
903    def _thread_get_fifo_names(fifo_fn):
904        fifo_in = fifo_fn + ".in"
905        fifo_out = fifo_fn + ".out"
906
907        return fifo_in, fifo_out
908
909    @staticmethod
910    def _thread_update_instance_info(handler, handler_time, status, reason):
911        handler.instance.execution_time = handler_time
912        handler.instance.status = status
913        if reason:
914            handler.instance.reason = reason
915        else:
916            handler.instance.reason = "Unknown"
917
918    @staticmethod
919    def _thread(handler, timeout, outdir, logfile, fifo_fn, pid_fn,
920                harness, ignore_unexpected_eof=False):
921        fifo_in, fifo_out = QEMUHandler._thread_get_fifo_names(fifo_fn)
922
923        # These in/out nodes are named from QEMU's perspective, not ours
924        if os.path.exists(fifo_in):
925            os.unlink(fifo_in)
926        os.mkfifo(fifo_in)
927        if os.path.exists(fifo_out):
928            os.unlink(fifo_out)
929        os.mkfifo(fifo_out)
930
931        with (
932            # We don't do anything with out_fp but we need to open it for
933            # writing so that QEMU doesn't block, due to the way pipes work
934            open(fifo_in, "wb") as _,
935            # Disable internal buffering, we don't
936            # want read() or poll() to ever block if there is data in there
937            open(fifo_out, "rb", buffering=0) as in_fp,
938            open(logfile, "w") as log_out_fp
939        ):
940            start_time = time.time()
941            timeout_time = start_time + timeout
942            p = select.poll()
943            p.register(in_fp, select.POLLIN)
944            _status = TwisterStatus.NONE
945            _reason = None
946
947            line = ""
948            timeout_extended = False
949
950            pid = 0
951            if os.path.exists(pid_fn):
952                with open(pid_fn) as pid_file:
953                    pid = int(pid_file.read())
954
955            while True:
956                this_timeout = int((timeout_time - time.time()) * 1000)
957                if timeout_extended:
958                    # Quit early after timeout extension if no more data is being received
959                    this_timeout = min(this_timeout, 1000)
960                if this_timeout < 0 or not p.poll(this_timeout):
961                    try:
962                        if pid and this_timeout > 0:
963                            # there's possibility we polled nothing because
964                            # of not enough CPU time scheduled by host for
965                            # QEMU process during p.poll(this_timeout)
966                            cpu_time = QEMUHandler._get_cpu_time(pid)
967                            if cpu_time < timeout and _status == TwisterStatus.NONE:
968                                timeout_time = time.time() + (timeout - cpu_time)
969                                continue
970                    except psutil.NoSuchProcess:
971                        pass
972                    except ProcessLookupError:
973                        _status = TwisterStatus.FAIL
974                        _reason = "Execution error"
975                        break
976
977                    if _status == TwisterStatus.NONE:
978                        _status = TwisterStatus.FAIL
979                        _reason = "timeout"
980                    break
981
982                if pid == 0 and os.path.exists(pid_fn):
983                    with open(pid_fn) as pid_file:
984                        pid = int(pid_file.read())
985
986                try:
987                    c = in_fp.read(1).decode("utf-8")
988                except UnicodeDecodeError:
989                    # Test is writing something weird, fail
990                    _status = TwisterStatus.FAIL
991                    _reason = "unexpected byte"
992                    break
993
994                if c == "":
995                    # EOF, this shouldn't happen unless QEMU crashes
996                    if not ignore_unexpected_eof:
997                        _status = TwisterStatus.FAIL
998                        _reason = "unexpected eof"
999                    break
1000                line = line + c
1001                if c != "\n":
1002                    continue
1003
1004                # line contains a full line of data output from QEMU
1005                log_out_fp.write(strip_ansi_sequences(line))
1006                log_out_fp.flush()
1007                line = line.rstrip()
1008                logger.debug(f"QEMU ({pid}): {line}")
1009
1010                harness.handle(line)
1011                if harness.status != TwisterStatus.NONE:
1012                    # if we have registered a fail make sure the status is not
1013                    # overridden by a false success message coming from the
1014                    # testsuite
1015                    if _status != TwisterStatus.FAIL:
1016                        _status = harness.status
1017                        _reason = harness.reason
1018
1019                    # if we get some status, that means test is doing well, we reset
1020                    # the timeout and wait for 2 more seconds to catch anything
1021                    # printed late. We wait much longer if code
1022                    # coverage is enabled since dumping this information can
1023                    # take some time.
1024                    if not timeout_extended or harness.capture_coverage:
1025                        timeout_extended = True
1026                        if harness.capture_coverage:
1027                            timeout_time = time.time() + 30
1028                        else:
1029                            timeout_time = time.time() + 2
1030                line = ""
1031
1032            handler_time = time.time() - start_time
1033            logger.debug(
1034                f"QEMU ({pid}) complete with {_status} ({_reason}) after {handler_time} seconds"
1035            )
1036
1037            QEMUHandler._thread_update_instance_info(handler, handler_time, _status, _reason)
1038
1039        if pid:
1040            # Oh well, as long as it's dead! User probably sent Ctrl-C
1041            with contextlib.suppress(ProcessLookupError, psutil.NoSuchProcess):
1042                if pid:
1043                    os.kill(pid, signal.SIGTERM)
1044
1045        os.unlink(fifo_in)
1046        os.unlink(fifo_out)
1047
1048    def _set_qemu_filenames(self, sysbuild_build_dir):
1049        # We pass this to QEMU which looks for fifos with .in and .out suffixes.
1050        # QEMU fifo will use main build dir
1051        self.fifo_fn = os.path.join(self.instance.build_dir, "qemu-fifo")
1052        # PID file will be created in the main sysbuild app's build dir
1053        self.pid_fn = os.path.join(sysbuild_build_dir, "qemu.pid")
1054
1055        if os.path.exists(self.pid_fn):
1056            os.unlink(self.pid_fn)
1057
1058        self.log_fn = self.log
1059
1060    def _create_command(self, sysbuild_build_dir):
1061        command = [self.generator_cmd]
1062        command += ["-C", sysbuild_build_dir, "run"]
1063
1064        return command
1065
1066    def _update_instance_info(self, harness, is_timeout):
1067        if (self.returncode != 0 and not self.ignore_qemu_crash) or \
1068            harness.status == TwisterStatus.NONE:
1069            self.instance.status = TwisterStatus.FAIL
1070            if is_timeout:
1071                self.instance.reason = "Timeout"
1072            else:
1073                if not self.instance.reason:
1074                    self.instance.reason = f"Exited with {self.returncode}"
1075            self.instance.add_missing_case_status(TwisterStatus.BLOCK)
1076
1077    def handle(self, harness):
1078        self.run = True
1079
1080        domain_build_dir = self.get_default_domain_build_dir()
1081
1082        command = self._create_command(domain_build_dir)
1083
1084        self._set_qemu_filenames(domain_build_dir)
1085
1086        self.thread = threading.Thread(name=self.name, target=QEMUHandler._thread,
1087                                       args=(self, self.get_test_timeout(), self.build_dir,
1088                                             self.log_fn, self.fifo_fn,
1089                                             self.pid_fn, harness,
1090                                             self.ignore_unexpected_eof))
1091
1092        self.thread.daemon = True
1093        logger.debug(f"Spawning QEMUHandler Thread for {self.name}")
1094        self.thread.start()
1095        thread_max_time = time.time() + self.get_test_timeout()
1096        if sys.stdout.isatty():
1097            subprocess.call(["stty", "sane"], stdin=sys.stdout)
1098
1099        logger.debug(f"Running {self.name} ({self.type_str})")
1100
1101        is_timeout = False
1102        qemu_pid = None
1103
1104        with open(self.stdout_fn, "w") as stdout_fp, \
1105            open(self.stderr_fn, "w") as stderr_fp, subprocess.Popen(
1106            command,
1107            stdout=stdout_fp,
1108            stderr=stderr_fp,
1109            cwd=self.build_dir
1110        ) as proc:
1111            logger.debug(f"Spawning QEMUHandler Thread for {self.name}")
1112
1113            try:
1114                proc.wait(self.get_test_timeout())
1115            except subprocess.TimeoutExpired:
1116                # sometimes QEMU can't handle SIGTERM signal correctly
1117                # in that case kill -9 QEMU process directly and leave
1118                # twister to judge testing result by console output
1119
1120                is_timeout = True
1121                self.terminate(proc)
1122                if harness.status == TwisterStatus.PASS:
1123                    self.returncode = 0
1124                else:
1125                    self.returncode = proc.returncode
1126            else:
1127                if os.path.exists(self.pid_fn):
1128                    with open(self.pid_fn) as pid_file:
1129                        qemu_pid = int(pid_file.read())
1130                logger.debug(f"No timeout, return code from QEMU ({qemu_pid}): {proc.returncode}")
1131                self.returncode = proc.returncode
1132            # Need to wait for harness to finish processing
1133            # output from QEMU. Otherwise it might miss some
1134            # messages.
1135            self.thread.join(max(thread_max_time - time.time(), 0))
1136            if self.thread.is_alive():
1137                logger.debug("Timed out while monitoring QEMU output")
1138
1139            if os.path.exists(self.pid_fn):
1140                with open(self.pid_fn) as pid_file:
1141                    qemu_pid = int(pid_file.read())
1142                os.unlink(self.pid_fn)
1143
1144        logger.debug(f"return code from QEMU ({qemu_pid}): {self.returncode}")
1145
1146        self._update_instance_info(harness, is_timeout)
1147
1148        self._final_handle_actions(harness, 0)
1149
1150    def get_fifo(self):
1151        return self.fifo_fn
1152
1153
1154class QEMUWinHandler(Handler):
1155    """Spawns a thread to monitor QEMU output from pipes on Windows OS
1156
1157     QEMU creates single duplex pipe at //.pipe/path, where path is fifo_fn.
1158     We need to do this as once qemu starts, it runs forever until killed.
1159     Test cases emit special messages to the console as they run, we check
1160     for these to collect whether the test passed or failed.
1161     """
1162
1163    def __init__(
1164        self,
1165        instance,
1166        type_str: str,
1167        options: argparse.Namespace,
1168        generator_cmd: str | None = None,
1169        suite_name_check: bool = True,
1170    ):
1171        """Constructor
1172
1173        @param instance Test instance
1174        """
1175
1176        super().__init__(instance, type_str, options, generator_cmd, suite_name_check)
1177        self.pid_fn = os.path.join(instance.build_dir, "qemu.pid")
1178        self.fifo_fn = os.path.join(instance.build_dir, "qemu-fifo")
1179        self.pipe_handle = None
1180        self.pid = 0
1181        self.thread = None
1182        self.stop_thread = False
1183
1184        if instance.testsuite.ignore_qemu_crash:
1185            self.ignore_qemu_crash = True
1186            self.ignore_unexpected_eof = True
1187        else:
1188            self.ignore_qemu_crash = False
1189            self.ignore_unexpected_eof = False
1190
1191    @staticmethod
1192    def _get_cpu_time(pid):
1193        """get process CPU time.
1194
1195        The guest virtual time in QEMU icount mode isn't host time and
1196        it's maintained by counting guest instructions, so we use QEMU
1197        process execution time to mostly simulate the time of guest OS.
1198        """
1199        proc = psutil.Process(pid)
1200        cpu_time = proc.cpu_times()
1201        return cpu_time.user + cpu_time.system
1202
1203    @staticmethod
1204    def _open_log_file(logfile):
1205        return open(logfile, "w")
1206
1207    @staticmethod
1208    def _close_log_file(log_file):
1209        log_file.close()
1210
1211    @staticmethod
1212    def _stop_qemu_process(pid):
1213        if pid:
1214            try:
1215                if pid:
1216                    os.kill(pid, signal.SIGTERM)
1217            except (ProcessLookupError, psutil.NoSuchProcess):
1218                # Oh well, as long as it's dead! User probably sent Ctrl-C
1219                pass
1220            except OSError:
1221                pass
1222
1223    @staticmethod
1224    def _monitor_update_instance_info(handler, handler_time, status, reason):
1225        handler.instance.execution_time = handler_time
1226        handler.instance.status = status
1227        if reason:
1228            handler.instance.reason = reason
1229        else:
1230            handler.instance.reason = "Unknown"
1231
1232    def _set_qemu_filenames(self, sysbuild_build_dir):
1233        # PID file will be created in the main sysbuild app's build dir
1234        self.pid_fn = os.path.join(sysbuild_build_dir, "qemu.pid")
1235
1236        if os.path.exists(self.pid_fn):
1237            os.unlink(self.pid_fn)
1238
1239        self.log_fn = self.log
1240
1241    def _create_command(self, sysbuild_build_dir):
1242        command = [self.generator_cmd]
1243        command += ["-C", sysbuild_build_dir, "run"]
1244
1245        return command
1246
1247    def _update_instance_info(self, harness, is_timeout):
1248        if (self.returncode != 0 and not self.ignore_qemu_crash) or \
1249            harness.status == TwisterStatus.NONE:
1250            self.instance.status = TwisterStatus.FAIL
1251            if is_timeout:
1252                self.instance.reason = "Timeout"
1253            else:
1254                if not self.instance.reason:
1255                    self.instance.reason = f"Exited with {self.returncode}"
1256            self.instance.add_missing_case_status(TwisterStatus.BLOCK)
1257
1258    def _enqueue_char(self, queue):
1259        while not self.stop_thread:
1260            if not self.pipe_handle:
1261                try:
1262                    self.pipe_handle = os.open(r"\\.\pipe\\" + self.fifo_fn, os.O_RDONLY)
1263                except FileNotFoundError as e:
1264                    if e.args[0] == 2:
1265                        # Pipe is not opened yet, try again after a delay.
1266                        time.sleep(1)
1267                continue
1268
1269            c = ""
1270            try:
1271                c = os.read(self.pipe_handle, 1)
1272            finally:
1273                queue.put(c)
1274
1275    def _monitor_output(
1276        self,
1277        queue,
1278        timeout,
1279        logfile,
1280        pid_fn,
1281        harness,
1282        ignore_unexpected_eof=False
1283    ):
1284        start_time = time.time()
1285        timeout_time = start_time + timeout
1286        _status = TwisterStatus.NONE
1287        _reason = None
1288        line = ""
1289        timeout_extended = False
1290        self.pid = 0
1291
1292        log_out_fp = self._open_log_file(logfile)
1293
1294        while True:
1295            this_timeout = int((timeout_time - time.time()) * 1000)
1296            if this_timeout < 0:
1297                try:
1298                    if self.pid and this_timeout > 0:
1299                        # there's possibility we polled nothing because
1300                        # of not enough CPU time scheduled by host for
1301                        # QEMU process during p.poll(this_timeout)
1302                        cpu_time = self._get_cpu_time(self.pid)
1303                        if cpu_time < timeout and _status == TwisterStatus.NONE:
1304                            timeout_time = time.time() + (timeout - cpu_time)
1305                            continue
1306                except psutil.NoSuchProcess:
1307                    pass
1308                except ProcessLookupError:
1309                    _status = TwisterStatus.FAIL
1310                    _reason = "Execution error"
1311                    break
1312
1313                if _status == TwisterStatus.NONE:
1314                    _status = TwisterStatus.FAIL
1315                    _reason = "timeout"
1316                break
1317
1318            if self.pid == 0 and os.path.exists(pid_fn):
1319                # pid file probably not contains pid yet, continue
1320                with (
1321                    contextlib.suppress(ValueError),
1322                    open(pid_fn) as pid_file
1323                ):
1324                    self.pid = int(pid_file.read())
1325
1326            try:
1327                c = queue.get_nowait()
1328            except Empty:
1329                continue
1330            try:
1331                c = c.decode("utf-8")
1332            except UnicodeDecodeError:
1333                # Test is writing something weird, fail
1334                _status = TwisterStatus.FAIL
1335                _reason = "unexpected byte"
1336                break
1337
1338            if c == "":
1339                # EOF, this shouldn't happen unless QEMU crashes
1340                if not ignore_unexpected_eof:
1341                    _status = TwisterStatus.FAIL
1342                    _reason = "unexpected eof"
1343                break
1344            line = line + c
1345            if c != "\n":
1346                continue
1347
1348            # line contains a full line of data output from QEMU
1349            log_out_fp.write(line)
1350            log_out_fp.flush()
1351            line = line.rstrip()
1352            logger.debug(f"QEMU ({self.pid}): {line}")
1353
1354            harness.handle(line)
1355            if harness.status != TwisterStatus.NONE:
1356                # if we have registered a fail make sure the status is not
1357                # overridden by a false success message coming from the
1358                # testsuite
1359                if _status != TwisterStatus.FAIL:
1360                    _status = harness.status
1361                    _reason = harness.reason
1362
1363                # if we get some status, that means test is doing well, we reset
1364                # the timeout and wait for 2 more seconds to catch anything
1365                # printed late. We wait much longer if code
1366                # coverage is enabled since dumping this information can
1367                # take some time.
1368                if not timeout_extended or harness.capture_coverage:
1369                    timeout_extended = True
1370                    if harness.capture_coverage:
1371                        timeout_time = time.time() + 30
1372                    else:
1373                        timeout_time = time.time() + 2
1374            line = ""
1375
1376        self.stop_thread = True
1377
1378        handler_time = time.time() - start_time
1379        logger.debug(
1380            f"QEMU ({self.pid}) complete with {_status} ({_reason}) after {handler_time} seconds"
1381        )
1382        self._monitor_update_instance_info(self, handler_time, _status, _reason)
1383        self._close_log_file(log_out_fp)
1384        self._stop_qemu_process(self.pid)
1385
1386    def handle(self, harness):
1387        self.run = True
1388
1389        domain_build_dir = self.get_default_domain_build_dir()
1390        command = self._create_command(domain_build_dir)
1391        self._set_qemu_filenames(domain_build_dir)
1392
1393        logger.debug(f"Running {self.name} ({self.type_str})")
1394        is_timeout = False
1395        self.stop_thread = False
1396        queue = Queue()
1397
1398        with subprocess.Popen(command, stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT,
1399                              cwd=self.build_dir) as proc:
1400            logger.debug(f"Spawning QEMUHandler Thread for {self.name}")
1401
1402            self.thread = threading.Thread(target=self._enqueue_char, args=(queue,))
1403            self.thread.daemon = True
1404            self.thread.start()
1405
1406            thread_max_time = time.time() + self.get_test_timeout()
1407
1408            self._monitor_output(queue, self.get_test_timeout(), self.log_fn, self.pid_fn, harness,
1409                                 self.ignore_unexpected_eof)
1410
1411            if (thread_max_time - time.time()) < 0:
1412                logger.debug("Timed out while monitoring QEMU output")
1413                proc.terminate()
1414                # sleep for a while before attempting to kill
1415                time.sleep(0.5)
1416                proc.kill()
1417
1418            if harness.status == TwisterStatus.PASS:
1419                self.returncode = 0
1420            else:
1421                self.returncode = proc.returncode
1422
1423            if os.path.exists(self.pid_fn):
1424                os.unlink(self.pid_fn)
1425
1426        logger.debug(f"return code from QEMU ({self.pid}): {self.returncode}")
1427
1428        os.close(self.pipe_handle)
1429        self.pipe_handle = None
1430
1431        self._update_instance_info(harness, is_timeout)
1432
1433        self._final_handle_actions(harness, 0)
1434
1435    def get_fifo(self):
1436        return self.fifo_fn
1437