1#!/usr/bin/env python3
2# vim: set syntax=python ts=4 :
3#
4# Copyright (c) 2018-2022 Intel Corporation
5# Copyright 2022 NXP
6# SPDX-License-Identifier: Apache-2.0
7
8import csv
9import logging
10import math
11import os
12import psutil
13import re
14import select
15import shlex
16import signal
17import subprocess
18import sys
19import threading
20import time
21
22from twisterlib.environment import ZEPHYR_BASE
23from twisterlib.error import TwisterException
24sys.path.insert(0, os.path.join(ZEPHYR_BASE, "scripts/pylib/build_helpers"))
25from domains import Domains
26
27try:
28    import serial
29except ImportError:
30    print("Install pyserial python module with pip to use --device-testing option.")
31
32try:
33    import pty
34except ImportError as capture_error:
35    if os.name == "nt":  # "nt" means that program is running on Windows OS
36        pass  # "--device-serial-pty" option is not supported on Windows OS
37    else:
38        raise capture_error
39
40logger = logging.getLogger('twister')
41logger.setLevel(logging.DEBUG)
42
43SUPPORTED_SIMS = ["mdb-nsim", "nsim", "renode", "qemu", "tsim", "armfvp", "xt-sim", "native", "custom"]
44SUPPORTED_SIMS_IN_PYTEST = ['native', 'qemu']
45
46
47def terminate_process(proc):
48    """
49    encapsulate terminate functionality so we do it consistently where ever
50    we might want to terminate the proc.  We need try_kill_process_by_pid
51    because of both how newer ninja (1.6.0 or greater) and .NET / renode
52    work.  Newer ninja's don't seem to pass SIGTERM down to the children
53    so we need to use try_kill_process_by_pid.
54    """
55
56    for child in psutil.Process(proc.pid).children(recursive=True):
57        try:
58            os.kill(child.pid, signal.SIGTERM)
59        except (ProcessLookupError, psutil.NoSuchProcess):
60            pass
61    proc.terminate()
62    # sleep for a while before attempting to kill
63    time.sleep(0.5)
64    proc.kill()
65
66
67class Handler:
68    def __init__(self, instance, type_str="build"):
69        """Constructor
70
71        """
72        self.options = None
73
74        self.state = "waiting"
75        self.run = False
76        self.type_str = type_str
77
78        self.binary = None
79        self.pid_fn = None
80        self.call_make_run = True
81
82        self.name = instance.name
83        self.instance = instance
84        self.sourcedir = instance.testsuite.source_dir
85        self.build_dir = instance.build_dir
86        self.log = os.path.join(self.build_dir, "handler.log")
87        self.returncode = 0
88        self.generator = None
89        self.generator_cmd = None
90        self.suite_name_check = True
91        self.ready = False
92
93        self.args = []
94        self.terminated = False
95
96    def get_test_timeout(self):
97        return math.ceil(self.instance.testsuite.timeout *
98                         self.instance.platform.timeout_multiplier *
99                         self.options.timeout_multiplier)
100
101    def record(self, harness):
102        if harness.recording:
103            if self.instance.recording is None:
104                self.instance.recording = harness.recording.copy()
105            else:
106                self.instance.recording.extend(harness.recording)
107
108            filename = os.path.join(self.build_dir, "recording.csv")
109            with open(filename, "at") as csvfile:
110                cw = csv.DictWriter(csvfile,
111                                    fieldnames = harness.recording[0].keys(),
112                                    lineterminator = os.linesep,
113                                    quoting = csv.QUOTE_NONNUMERIC)
114                cw.writeheader()
115                cw.writerows(harness.recording)
116
117    def terminate(self, proc):
118        terminate_process(proc)
119        self.terminated = True
120
121    def _verify_ztest_suite_name(self, harness_state, detected_suite_names, handler_time):
122        """
123        If test suite names was found in test's C source code, then verify if
124        detected suite names from output correspond to expected suite names
125        (and not in reverse).
126        """
127        expected_suite_names = self.instance.testsuite.ztest_suite_names
128        logger.debug(f"Expected suite names:{expected_suite_names}")
129        logger.debug(f"Detected suite names:{detected_suite_names}")
130        if not expected_suite_names or \
131                not harness_state == "passed":
132            return
133        if not detected_suite_names:
134            self._missing_suite_name(expected_suite_names, handler_time)
135        for detected_suite_name in detected_suite_names:
136            if detected_suite_name not in expected_suite_names:
137                self._missing_suite_name(expected_suite_names, handler_time)
138                break
139
140    def _missing_suite_name(self, expected_suite_names, handler_time):
141        """
142        Change result of performed test if problem with missing or unpropper
143        suite name was occurred.
144        """
145        self.instance.status = "failed"
146        self.instance.execution_time = handler_time
147        for tc in self.instance.testcases:
148            tc.status = "failed"
149        self.instance.reason = f"Testsuite mismatch"
150        logger.debug("Test suite names were not printed or some of them in " \
151                     "output do not correspond with expected: %s",
152                     str(expected_suite_names))
153
154    def _final_handle_actions(self, harness, handler_time):
155
156        # only for Ztest tests:
157        harness_class_name = type(harness).__name__
158        if self.suite_name_check and harness_class_name == "Test":
159            self._verify_ztest_suite_name(harness.state, harness.detected_suite_names, handler_time)
160            if self.instance.status == 'failed':
161                return
162            if not harness.matched_run_id and harness.run_id_exists:
163                self.instance.status = "failed"
164                self.instance.execution_time = handler_time
165                self.instance.reason = "RunID mismatch"
166                for tc in self.instance.testcases:
167                    tc.status = "failed"
168
169        self.record(harness)
170
171
172class BinaryHandler(Handler):
173    def __init__(self, instance, type_str):
174        """Constructor
175
176        @param instance Test Instance
177        """
178        super().__init__(instance, type_str)
179
180        self.call_west_flash = False
181        self.seed = None
182        self.extra_test_args = None
183        self.line = b""
184
185    def try_kill_process_by_pid(self):
186        if self.pid_fn:
187            pid = int(open(self.pid_fn).read())
188            os.unlink(self.pid_fn)
189            self.pid_fn = None  # clear so we don't try to kill the binary twice
190            try:
191                os.kill(pid, signal.SIGKILL)
192            except (ProcessLookupError, psutil.NoSuchProcess):
193                pass
194
195    def _output_reader(self, proc):
196        self.line = proc.stdout.readline()
197
198    def _output_handler(self, proc, harness):
199        suffix = '\\r\\n'
200
201        with open(self.log, "wt") as log_out_fp:
202            timeout_extended = False
203            timeout_time = time.time() + self.get_test_timeout()
204            while True:
205                this_timeout = timeout_time - time.time()
206                if this_timeout < 0:
207                    break
208                reader_t = threading.Thread(target=self._output_reader, args=(proc,), daemon=True)
209                reader_t.start()
210                reader_t.join(this_timeout)
211                if not reader_t.is_alive() and self.line != b"":
212                    line_decoded = self.line.decode('utf-8', "replace")
213                    if line_decoded.endswith(suffix):
214                        stripped_line = line_decoded[:-len(suffix)].rstrip()
215                    else:
216                        stripped_line = line_decoded.rstrip()
217                    logger.debug("OUTPUT: %s", stripped_line)
218                    log_out_fp.write(line_decoded)
219                    log_out_fp.flush()
220                    harness.handle(stripped_line)
221                    if harness.state:
222                        if not timeout_extended or harness.capture_coverage:
223                            timeout_extended = True
224                            if harness.capture_coverage:
225                                timeout_time = time.time() + 30
226                            else:
227                                timeout_time = time.time() + 2
228                else:
229                    reader_t.join(0)
230                    break
231            try:
232                # POSIX arch based ztests end on their own,
233                # so let's give it up to 100ms to do so
234                proc.wait(0.1)
235            except subprocess.TimeoutExpired:
236                self.terminate(proc)
237
238    def _create_command(self, robot_test):
239        if robot_test:
240            command = [self.generator_cmd, "run_renode_test"]
241        elif self.call_make_run:
242            command = [self.generator_cmd, "run"]
243        elif self.call_west_flash:
244            command = ["west", "flash", "--skip-rebuild", "-d", self.build_dir]
245        else:
246            command = [self.binary]
247
248        if self.options.enable_valgrind:
249            command = ["valgrind", "--error-exitcode=2",
250                       "--leak-check=full",
251                       "--suppressions=" + ZEPHYR_BASE + "/scripts/valgrind.supp",
252                       "--log-file=" + self.build_dir + "/valgrind.log",
253                       "--track-origins=yes",
254                       ] + command
255
256        # Only valid for native_sim
257        if self.seed is not None:
258            command.append(f"--seed={self.seed}")
259        if self.extra_test_args is not None:
260            command.extend(self.extra_test_args)
261
262        return command
263
264    def _create_env(self):
265        env = os.environ.copy()
266        if self.options.enable_asan:
267            env["ASAN_OPTIONS"] = "log_path=stdout:" + \
268                                  env.get("ASAN_OPTIONS", "")
269            if not self.options.enable_lsan:
270                env["ASAN_OPTIONS"] += "detect_leaks=0"
271
272        if self.options.enable_ubsan:
273            env["UBSAN_OPTIONS"] = "log_path=stdout:halt_on_error=1:" + \
274                                  env.get("UBSAN_OPTIONS", "")
275
276        return env
277
278    def _update_instance_info(self, harness_state, handler_time):
279        self.instance.execution_time = handler_time
280        if not self.terminated and self.returncode != 0:
281            self.instance.status = "failed"
282            if self.options.enable_valgrind and self.returncode == 2:
283                self.instance.reason = "Valgrind error"
284            else:
285                # When a process is killed, the default handler returns 128 + SIGTERM
286                # so in that case the return code itself is not meaningful
287                self.instance.reason = "Failed"
288        elif harness_state:
289            self.instance.status = harness_state
290            if harness_state == "failed":
291                self.instance.reason = "Failed"
292        else:
293            self.instance.status = "failed"
294            self.instance.reason = "Timeout"
295            self.instance.add_missing_case_status("blocked", "Timeout")
296
297    def handle(self, harness):
298        robot_test = getattr(harness, "is_robot_test", False)
299
300        command = self._create_command(robot_test)
301
302        logger.debug("Spawning process: " +
303                     " ".join(shlex.quote(word) for word in command) + os.linesep +
304                     "in directory: " + self.build_dir)
305
306        start_time = time.time()
307
308        env = self._create_env()
309
310        if robot_test:
311            harness.run_robot_test(command, self)
312            return
313
314        with subprocess.Popen(command, stdout=subprocess.PIPE,
315                              stderr=subprocess.PIPE, cwd=self.build_dir, env=env) as proc:
316            logger.debug("Spawning BinaryHandler Thread for %s" % self.name)
317            t = threading.Thread(target=self._output_handler, args=(proc, harness,), daemon=True)
318            t.start()
319            t.join()
320            if t.is_alive():
321                self.terminate(proc)
322                t.join()
323            proc.wait()
324            self.returncode = proc.returncode
325            self.try_kill_process_by_pid()
326
327        handler_time = time.time() - start_time
328
329        # FIXME: This is needed when killing the simulator, the console is
330        # garbled and needs to be reset. Did not find a better way to do that.
331        if sys.stdout.isatty():
332            subprocess.call(["stty", "sane"], stdin=sys.stdout)
333
334        self._update_instance_info(harness.state, handler_time)
335
336        self._final_handle_actions(harness, handler_time)
337
338
339class SimulationHandler(BinaryHandler):
340    def __init__(self, instance, type_str):
341        """Constructor
342
343        @param instance Test Instance
344        """
345        super().__init__(instance, type_str)
346
347        if type_str == 'renode':
348            self.pid_fn = os.path.join(instance.build_dir, "renode.pid")
349        elif type_str == 'native':
350            self.call_make_run = False
351            self.binary = os.path.join(instance.build_dir, "zephyr", "zephyr.exe")
352            self.ready = True
353
354
355class DeviceHandler(Handler):
356
357    def __init__(self, instance, type_str):
358        """Constructor
359
360        @param instance Test Instance
361        """
362        super().__init__(instance, type_str)
363
364    def get_test_timeout(self):
365        timeout = super().get_test_timeout()
366        if self.options.enable_coverage:
367            # wait more for gcov data to be dumped on console
368            timeout += 120
369        return timeout
370
371    def monitor_serial(self, ser, halt_event, harness):
372        log_out_fp = open(self.log, "wb")
373
374        if self.options.enable_coverage:
375            # Set capture_coverage to True to indicate that right after
376            # test results we should get coverage data, otherwise we exit
377            # from the test.
378            harness.capture_coverage = True
379
380        # Clear serial leftover.
381        ser.reset_input_buffer()
382
383        while ser.isOpen():
384            if halt_event.is_set():
385                logger.debug('halted')
386                ser.close()
387                break
388
389            try:
390                if not ser.in_waiting:
391                    # no incoming bytes are waiting to be read from
392                    # the serial input buffer, let other threads run
393                    time.sleep(0.001)
394                    continue
395            # maybe the serial port is still in reset
396            # check status may cause error
397            # wait for more time
398            except OSError:
399                time.sleep(0.001)
400                continue
401            except TypeError:
402                # This exception happens if the serial port was closed and
403                # its file descriptor cleared in between of ser.isOpen()
404                # and ser.in_waiting checks.
405                logger.debug("Serial port is already closed, stop reading.")
406                break
407
408            serial_line = None
409            try:
410                serial_line = ser.readline()
411            except TypeError:
412                pass
413            # ignore SerialException which may happen during the serial device
414            # power off/on process.
415            except serial.SerialException:
416                pass
417
418            # Just because ser_fileno has data doesn't mean an entire line
419            # is available yet.
420            if serial_line:
421                sl = serial_line.decode('utf-8', 'ignore').lstrip()
422                logger.debug("DEVICE: {0}".format(sl.rstrip()))
423
424                log_out_fp.write(sl.encode('utf-8'))
425                log_out_fp.flush()
426                harness.handle(sl.rstrip())
427
428            if harness.state:
429                if not harness.capture_coverage:
430                    ser.close()
431                    break
432
433        log_out_fp.close()
434
435    def device_is_available(self, instance):
436        device = instance.platform.name
437        fixture = instance.testsuite.harness_config.get("fixture")
438        dut_found = False
439
440        for d in self.duts:
441            if fixture and fixture not in d.fixtures:
442                continue
443            if d.platform != device or (d.serial is None and d.serial_pty is None):
444                continue
445            dut_found = True
446            d.lock.acquire()
447            avail = False
448            if d.available:
449                d.available = 0
450                d.counter += 1
451                avail = True
452            d.lock.release()
453            if avail:
454                return d
455
456        if not dut_found:
457            raise TwisterException(f"No device to serve as {device} platform.")
458
459        return None
460
461    def make_device_available(self, serial):
462        for d in self.duts:
463            if serial in [d.serial_pty, d.serial]:
464                d.available = 1
465
466    @staticmethod
467    def run_custom_script(script, timeout):
468        with subprocess.Popen(script, stderr=subprocess.PIPE, stdout=subprocess.PIPE) as proc:
469            try:
470                stdout, stderr = proc.communicate(timeout=timeout)
471                logger.debug(stdout.decode())
472                if proc.returncode != 0:
473                    logger.error(f"Custom script failure: {stderr.decode(errors='ignore')}")
474
475            except subprocess.TimeoutExpired:
476                proc.kill()
477                proc.communicate()
478                logger.error("{} timed out".format(script))
479
480    def _create_command(self, runner, hardware):
481        if (self.options.west_flash is not None) or runner:
482            command = ["west", "flash", "--skip-rebuild", "-d", self.build_dir]
483            command_extra_args = []
484
485            # There are three ways this option is used.
486            # 1) bare: --west-flash
487            #    This results in options.west_flash == []
488            # 2) with a value: --west-flash="--board-id=42"
489            #    This results in options.west_flash == "--board-id=42"
490            # 3) Multiple values: --west-flash="--board-id=42,--erase"
491            #    This results in options.west_flash == "--board-id=42 --erase"
492            if self.options.west_flash and self.options.west_flash != []:
493                command_extra_args.extend(self.options.west_flash.split(','))
494
495            if runner:
496                command.append("--runner")
497                command.append(runner)
498
499                board_id = hardware.probe_id or hardware.id
500                product = hardware.product
501                if board_id is not None:
502                    if runner in ("pyocd", "nrfjprog"):
503                        command_extra_args.append("--dev-id")
504                        command_extra_args.append(board_id)
505                    elif runner == "openocd" and product == "STM32 STLink":
506                        command_extra_args.append("--cmd-pre-init")
507                        command_extra_args.append("hla_serial %s" % board_id)
508                    elif runner == "openocd" and product == "STLINK-V3":
509                        command_extra_args.append("--cmd-pre-init")
510                        command_extra_args.append("hla_serial %s" % board_id)
511                    elif runner == "openocd" and product == "EDBG CMSIS-DAP":
512                        command_extra_args.append("--cmd-pre-init")
513                        command_extra_args.append("cmsis_dap_serial %s" % board_id)
514                    elif runner == "jlink":
515                        command.append("--tool-opt=-SelectEmuBySN  %s" % board_id)
516                    elif runner == "stm32cubeprogrammer":
517                        command.append("--tool-opt=sn=%s" % board_id)
518
519                    # Receive parameters from runner_params field.
520                    if hardware.runner_params:
521                        for param in hardware.runner_params:
522                            command.append(param)
523
524            if command_extra_args:
525                command.append('--')
526                command.extend(command_extra_args)
527        else:
528            command = [self.generator_cmd, "-C", self.build_dir, "flash"]
529
530        return command
531
532    def _update_instance_info(self, harness_state, handler_time, flash_error):
533        self.instance.execution_time = handler_time
534        if harness_state:
535            self.instance.status = harness_state
536            if harness_state == "failed":
537                self.instance.reason = "Failed"
538        elif not flash_error:
539            self.instance.status = "failed"
540            self.instance.reason = "Timeout"
541
542        if self.instance.status in ["error", "failed"]:
543            self.instance.add_missing_case_status("blocked", self.instance.reason)
544
545    def _create_serial_connection(self, serial_device, hardware_baud,
546                                  flash_timeout, serial_pty, ser_pty_process):
547        try:
548            ser = serial.Serial(
549                serial_device,
550                baudrate=hardware_baud,
551                parity=serial.PARITY_NONE,
552                stopbits=serial.STOPBITS_ONE,
553                bytesize=serial.EIGHTBITS,
554                # the worst case of no serial input
555                timeout=max(flash_timeout, self.get_test_timeout())
556            )
557        except serial.SerialException as e:
558            self.instance.status = "failed"
559            self.instance.reason = "Serial Device Error"
560            logger.error("Serial device error: %s" % (str(e)))
561
562            self.instance.add_missing_case_status("blocked", "Serial Device Error")
563            if serial_pty and ser_pty_process:
564                ser_pty_process.terminate()
565                outs, errs = ser_pty_process.communicate()
566                logger.debug("Process {} terminated outs: {} errs {}".format(serial_pty, outs, errs))
567
568            if serial_pty:
569                self.make_device_available(serial_pty)
570            else:
571                self.make_device_available(serial_device)
572            raise
573
574        return ser
575
576    def get_hardware(self):
577        hardware = None
578        try:
579            hardware = self.device_is_available(self.instance)
580            while not hardware:
581                time.sleep(1)
582                hardware = self.device_is_available(self.instance)
583        except TwisterException as error:
584            self.instance.status = "failed"
585            self.instance.reason = str(error)
586            logger.error(self.instance.reason)
587        return hardware
588
589    def _get_serial_device(self, serial_pty, hardware_serial):
590        ser_pty_process = None
591        if serial_pty:
592            master, slave = pty.openpty()
593            try:
594                ser_pty_process = subprocess.Popen(
595                    re.split('[, ]', serial_pty),
596                    stdout=master,
597                    stdin=master,
598                    stderr=master
599                )
600            except subprocess.CalledProcessError as error:
601                logger.error(
602                    "Failed to run subprocess {}, error {}".format(
603                        serial_pty,
604                        error.output
605                    )
606                )
607                return
608
609            serial_device = os.ttyname(slave)
610        else:
611            serial_device = hardware_serial
612
613        return serial_device, ser_pty_process
614
615    def handle(self, harness):
616        runner = None
617        hardware = self.get_hardware()
618        if hardware:
619            self.instance.dut = hardware.id
620        if not hardware:
621            return
622
623        runner = hardware.runner or self.options.west_runner
624        serial_pty = hardware.serial_pty
625
626        serial_device, ser_pty_process = self._get_serial_device(serial_pty, hardware.serial)
627
628        logger.debug(f"Using serial device {serial_device} @ {hardware.baud} baud")
629
630        command = self._create_command(runner, hardware)
631
632        pre_script = hardware.pre_script
633        post_flash_script = hardware.post_flash_script
634        post_script = hardware.post_script
635
636        if pre_script:
637            self.run_custom_script(pre_script, 30)
638
639        flash_timeout = hardware.flash_timeout
640        if hardware.flash_with_test:
641            flash_timeout += self.get_test_timeout()
642
643        try:
644            ser = self._create_serial_connection(
645                serial_device,
646                hardware.baud,
647                flash_timeout,
648                serial_pty,
649                ser_pty_process
650            )
651        except serial.SerialException:
652            return
653
654        halt_monitor_evt = threading.Event()
655
656        t = threading.Thread(target=self.monitor_serial, daemon=True,
657                             args=(ser, halt_monitor_evt, harness))
658        start_time = time.time()
659        t.start()
660
661        d_log = "{}/device.log".format(self.instance.build_dir)
662        logger.debug('Flash command: %s', command)
663        flash_error = False
664        try:
665            stdout = stderr = None
666            with subprocess.Popen(command, stderr=subprocess.PIPE, stdout=subprocess.PIPE) as proc:
667                try:
668                    (stdout, stderr) = proc.communicate(timeout=flash_timeout)
669                    # ignore unencodable unicode chars
670                    logger.debug(stdout.decode(errors="ignore"))
671
672                    if proc.returncode != 0:
673                        self.instance.status = "error"
674                        self.instance.reason = "Device issue (Flash error?)"
675                        flash_error = True
676                        with open(d_log, "w") as dlog_fp:
677                            dlog_fp.write(stderr.decode())
678                        halt_monitor_evt.set()
679                except subprocess.TimeoutExpired:
680                    logger.warning("Flash operation timed out.")
681                    self.terminate(proc)
682                    (stdout, stderr) = proc.communicate()
683                    self.instance.status = "error"
684                    self.instance.reason = "Device issue (Timeout)"
685                    flash_error = True
686
687            with open(d_log, "w") as dlog_fp:
688                dlog_fp.write(stderr.decode())
689
690        except subprocess.CalledProcessError:
691            halt_monitor_evt.set()
692            self.instance.status = "error"
693            self.instance.reason = "Device issue (Flash error)"
694            flash_error = True
695
696        if post_flash_script:
697            self.run_custom_script(post_flash_script, 30)
698
699        if not flash_error:
700            # Always wait at most the test timeout here after flashing.
701            t.join(self.get_test_timeout())
702        else:
703            # When the flash error is due exceptions,
704            # twister tell the monitor serial thread
705            # to close the serial. But it is necessary
706            # for this thread being run first and close
707            # have the change to close the serial.
708            t.join(0.1)
709
710        if t.is_alive():
711            logger.debug("Timed out while monitoring serial output on {}".format(self.instance.platform.name))
712
713        if ser.isOpen():
714            ser.close()
715
716        if serial_pty:
717            ser_pty_process.terminate()
718            outs, errs = ser_pty_process.communicate()
719            logger.debug("Process {} terminated outs: {} errs {}".format(serial_pty, outs, errs))
720
721        handler_time = time.time() - start_time
722
723        self._update_instance_info(harness.state, handler_time, flash_error)
724
725        self._final_handle_actions(harness, handler_time)
726
727        if post_script:
728            self.run_custom_script(post_script, 30)
729
730        if serial_pty:
731            self.make_device_available(serial_pty)
732        else:
733            self.make_device_available(serial_device)
734
735
736class QEMUHandler(Handler):
737    """Spawns a thread to monitor QEMU output from pipes
738
739    We pass QEMU_PIPE to 'make run' and monitor the pipes for output.
740    We need to do this as once qemu starts, it runs forever until killed.
741    Test cases emit special messages to the console as they run, we check
742    for these to collect whether the test passed or failed.
743    """
744
745    def __init__(self, instance, type_str):
746        """Constructor
747
748        @param instance Test instance
749        """
750
751        super().__init__(instance, type_str)
752        self.fifo_fn = os.path.join(instance.build_dir, "qemu-fifo")
753
754        self.pid_fn = os.path.join(instance.build_dir, "qemu.pid")
755
756        if instance.testsuite.ignore_qemu_crash:
757            self.ignore_qemu_crash = True
758            self.ignore_unexpected_eof = True
759        else:
760            self.ignore_qemu_crash = False
761            self.ignore_unexpected_eof = False
762
763    @staticmethod
764    def _get_cpu_time(pid):
765        """get process CPU time.
766
767        The guest virtual time in QEMU icount mode isn't host time and
768        it's maintained by counting guest instructions, so we use QEMU
769        process execution time to mostly simulate the time of guest OS.
770        """
771        proc = psutil.Process(pid)
772        cpu_time = proc.cpu_times()
773        return cpu_time.user + cpu_time.system
774
775    @staticmethod
776    def _thread_get_fifo_names(fifo_fn):
777        fifo_in = fifo_fn + ".in"
778        fifo_out = fifo_fn + ".out"
779
780        return fifo_in, fifo_out
781
782    @staticmethod
783    def _thread_open_files(fifo_in, fifo_out, logfile):
784        # These in/out nodes are named from QEMU's perspective, not ours
785        if os.path.exists(fifo_in):
786            os.unlink(fifo_in)
787        os.mkfifo(fifo_in)
788        if os.path.exists(fifo_out):
789            os.unlink(fifo_out)
790        os.mkfifo(fifo_out)
791
792        # We don't do anything with out_fp but we need to open it for
793        # writing so that QEMU doesn't block, due to the way pipes work
794        out_fp = open(fifo_in, "wb")
795        # Disable internal buffering, we don't
796        # want read() or poll() to ever block if there is data in there
797        in_fp = open(fifo_out, "rb", buffering=0)
798        log_out_fp = open(logfile, "wt")
799
800        return out_fp, in_fp, log_out_fp
801
802    @staticmethod
803    def _thread_close_files(fifo_in, fifo_out, pid, out_fp, in_fp, log_out_fp):
804        log_out_fp.close()
805        out_fp.close()
806        in_fp.close()
807        if pid:
808            try:
809                if pid:
810                    os.kill(pid, signal.SIGTERM)
811            except (ProcessLookupError, psutil.NoSuchProcess):
812                # Oh well, as long as it's dead! User probably sent Ctrl-C
813                pass
814
815        os.unlink(fifo_in)
816        os.unlink(fifo_out)
817
818    @staticmethod
819    def _thread_update_instance_info(handler, handler_time, out_state):
820        handler.instance.execution_time = handler_time
821        if out_state == "timeout":
822            handler.instance.status = "failed"
823            handler.instance.reason = "Timeout"
824        elif out_state == "failed":
825            handler.instance.status = "failed"
826            handler.instance.reason = "Failed"
827        elif out_state in ['unexpected eof', 'unexpected byte']:
828            handler.instance.status = "failed"
829            handler.instance.reason = out_state
830        else:
831            handler.instance.status = out_state
832            handler.instance.reason = "Unknown"
833
834    @staticmethod
835    def _thread(handler, timeout, outdir, logfile, fifo_fn, pid_fn, results,
836                harness, ignore_unexpected_eof=False):
837        fifo_in, fifo_out = QEMUHandler._thread_get_fifo_names(fifo_fn)
838
839        out_fp, in_fp, log_out_fp = QEMUHandler._thread_open_files(
840            fifo_in,
841            fifo_out,
842            logfile
843        )
844
845        start_time = time.time()
846        timeout_time = start_time + timeout
847        p = select.poll()
848        p.register(in_fp, select.POLLIN)
849        out_state = None
850
851        line = ""
852        timeout_extended = False
853
854        pid = 0
855        if os.path.exists(pid_fn):
856            pid = int(open(pid_fn).read())
857
858        while True:
859            this_timeout = int((timeout_time - time.time()) * 1000)
860            if this_timeout < 0 or not p.poll(this_timeout):
861                try:
862                    if pid and this_timeout > 0:
863                        # there's possibility we polled nothing because
864                        # of not enough CPU time scheduled by host for
865                        # QEMU process during p.poll(this_timeout)
866                        cpu_time = QEMUHandler._get_cpu_time(pid)
867                        if cpu_time < timeout and not out_state:
868                            timeout_time = time.time() + (timeout - cpu_time)
869                            continue
870                except psutil.NoSuchProcess:
871                    pass
872                except ProcessLookupError:
873                    out_state = "failed"
874                    break
875
876                if not out_state:
877                    out_state = "timeout"
878                break
879
880            if pid == 0 and os.path.exists(pid_fn):
881                pid = int(open(pid_fn).read())
882
883            try:
884                c = in_fp.read(1).decode("utf-8")
885            except UnicodeDecodeError:
886                # Test is writing something weird, fail
887                out_state = "unexpected byte"
888                break
889
890            if c == "":
891                # EOF, this shouldn't happen unless QEMU crashes
892                if not ignore_unexpected_eof:
893                    out_state = "unexpected eof"
894                break
895            line = line + c
896            if c != "\n":
897                continue
898
899            # line contains a full line of data output from QEMU
900            log_out_fp.write(line)
901            log_out_fp.flush()
902            line = line.rstrip()
903            logger.debug(f"QEMU ({pid}): {line}")
904
905            harness.handle(line)
906            if harness.state:
907                # if we have registered a fail make sure the state is not
908                # overridden by a false success message coming from the
909                # testsuite
910                if out_state not in ['failed', 'unexpected eof', 'unexpected byte']:
911                    out_state = harness.state
912
913                # if we get some state, that means test is doing well, we reset
914                # the timeout and wait for 2 more seconds to catch anything
915                # printed late. We wait much longer if code
916                # coverage is enabled since dumping this information can
917                # take some time.
918                if not timeout_extended or harness.capture_coverage:
919                    timeout_extended = True
920                    if harness.capture_coverage:
921                        timeout_time = time.time() + 30
922                    else:
923                        timeout_time = time.time() + 2
924            line = ""
925
926        handler_time = time.time() - start_time
927        logger.debug(f"QEMU ({pid}) complete ({out_state}) after {handler_time} seconds")
928
929        QEMUHandler._thread_update_instance_info(handler, handler_time, out_state)
930
931        QEMUHandler._thread_close_files(fifo_in, fifo_out, pid, out_fp, in_fp, log_out_fp)
932
933    def _get_sysbuild_build_dir(self):
934        if self.instance.testsuite.sysbuild:
935            # Load domain yaml to get default domain build directory
936            # Note: for targets using QEMU, we assume that the target will
937            # have added any additional images to the run target manually
938            domain_path = os.path.join(self.build_dir, "domains.yaml")
939            domains = Domains.from_file(domain_path)
940            logger.debug("Loaded sysbuild domain data from %s" % domain_path)
941            build_dir = domains.get_default_domain().build_dir
942        else:
943            build_dir = self.build_dir
944
945        return build_dir
946
947    def _set_qemu_filenames(self, sysbuild_build_dir):
948        # We pass this to QEMU which looks for fifos with .in and .out suffixes.
949        # QEMU fifo will use main build dir
950        self.fifo_fn = os.path.join(self.instance.build_dir, "qemu-fifo")
951        # PID file will be created in the main sysbuild app's build dir
952        self.pid_fn = os.path.join(sysbuild_build_dir, "qemu.pid")
953
954        if os.path.exists(self.pid_fn):
955            os.unlink(self.pid_fn)
956
957        self.log_fn = self.log
958
959    def _create_command(self, sysbuild_build_dir):
960        command = [self.generator_cmd]
961        command += ["-C", sysbuild_build_dir, "run"]
962
963        return command
964
965    def _update_instance_info(self, harness_state, is_timeout):
966        if (self.returncode != 0 and not self.ignore_qemu_crash) or not harness_state:
967            self.instance.status = "failed"
968            if is_timeout:
969                self.instance.reason = "Timeout"
970            else:
971                if not self.instance.reason:
972                    self.instance.reason = "Exited with {}".format(self.returncode)
973            self.instance.add_missing_case_status("blocked")
974
975    def handle(self, harness):
976        self.results = {}
977        self.run = True
978
979        sysbuild_build_dir = self._get_sysbuild_build_dir()
980
981        command = self._create_command(sysbuild_build_dir)
982
983        self._set_qemu_filenames(sysbuild_build_dir)
984
985        self.thread = threading.Thread(name=self.name, target=QEMUHandler._thread,
986                                       args=(self, self.get_test_timeout(), self.build_dir,
987                                             self.log_fn, self.fifo_fn,
988                                             self.pid_fn, self.results, harness,
989                                             self.ignore_unexpected_eof))
990
991        self.thread.daemon = True
992        logger.debug("Spawning QEMUHandler Thread for %s" % self.name)
993        self.thread.start()
994        thread_max_time = time.time() + self.get_test_timeout()
995        if sys.stdout.isatty():
996            subprocess.call(["stty", "sane"], stdin=sys.stdout)
997
998        logger.debug("Running %s (%s)" % (self.name, self.type_str))
999
1000        is_timeout = False
1001        qemu_pid = None
1002
1003        with subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=self.build_dir) as proc:
1004            logger.debug("Spawning QEMUHandler Thread for %s" % self.name)
1005
1006            try:
1007                proc.wait(self.get_test_timeout())
1008            except subprocess.TimeoutExpired:
1009                # sometimes QEMU can't handle SIGTERM signal correctly
1010                # in that case kill -9 QEMU process directly and leave
1011                # twister to judge testing result by console output
1012
1013                is_timeout = True
1014                self.terminate(proc)
1015                if harness.state == "passed":
1016                    self.returncode = 0
1017                else:
1018                    self.returncode = proc.returncode
1019            else:
1020                if os.path.exists(self.pid_fn):
1021                    qemu_pid = int(open(self.pid_fn).read())
1022                logger.debug(f"No timeout, return code from QEMU ({qemu_pid}): {proc.returncode}")
1023                self.returncode = proc.returncode
1024            # Need to wait for harness to finish processing
1025            # output from QEMU. Otherwise it might miss some
1026            # messages.
1027            self.thread.join(max(thread_max_time - time.time(), 0))
1028            if self.thread.is_alive():
1029                logger.debug("Timed out while monitoring QEMU output")
1030
1031            if os.path.exists(self.pid_fn):
1032                qemu_pid = int(open(self.pid_fn).read())
1033                os.unlink(self.pid_fn)
1034
1035        logger.debug(f"return code from QEMU ({qemu_pid}): {self.returncode}")
1036
1037        self._update_instance_info(harness.state, is_timeout)
1038
1039        self._final_handle_actions(harness, 0)
1040
1041    def get_fifo(self):
1042        return self.fifo_fn
1043