1#!/usr/bin/env python3
2# vim: set syntax=python ts=4 :
3#
4# Copyright (c) 2018-2022 Intel Corporation
5# Copyright 2022 NXP
6# SPDX-License-Identifier: Apache-2.0
7
8import csv
9import logging
10import math
11import os
12import psutil
13import re
14import select
15import shlex
16import signal
17import subprocess
18import sys
19import threading
20import time
21
22from twisterlib.environment import ZEPHYR_BASE
23from twisterlib.error import TwisterException
24sys.path.insert(0, os.path.join(ZEPHYR_BASE, "scripts/pylib/build_helpers"))
25from domains import Domains
26
27try:
28    import serial
29except ImportError:
30    print("Install pyserial python module with pip to use --device-testing option.")
31
32try:
33    import pty
34except ImportError as capture_error:
35    if os.name == "nt":  # "nt" means that program is running on Windows OS
36        pass  # "--device-serial-pty" option is not supported on Windows OS
37    else:
38        raise capture_error
39
40logger = logging.getLogger('twister')
41logger.setLevel(logging.DEBUG)
42
43SUPPORTED_SIMS = ["mdb-nsim", "nsim", "renode", "qemu", "tsim", "armfvp", "xt-sim", "native", "custom"]
44SUPPORTED_SIMS_IN_PYTEST = ['native', 'qemu']
45
46
47def terminate_process(proc):
48    """
49    encapsulate terminate functionality so we do it consistently where ever
50    we might want to terminate the proc.  We need try_kill_process_by_pid
51    because of both how newer ninja (1.6.0 or greater) and .NET / renode
52    work.  Newer ninja's don't seem to pass SIGTERM down to the children
53    so we need to use try_kill_process_by_pid.
54    """
55
56    for child in psutil.Process(proc.pid).children(recursive=True):
57        try:
58            os.kill(child.pid, signal.SIGTERM)
59        except ProcessLookupError:
60            pass
61    proc.terminate()
62    # sleep for a while before attempting to kill
63    time.sleep(0.5)
64    proc.kill()
65
66
67class Handler:
68    def __init__(self, instance, type_str="build"):
69        """Constructor
70
71        """
72        self.options = None
73
74        self.state = "waiting"
75        self.run = False
76        self.type_str = type_str
77
78        self.binary = None
79        self.pid_fn = None
80        self.call_make_run = True
81
82        self.name = instance.name
83        self.instance = instance
84        self.sourcedir = instance.testsuite.source_dir
85        self.build_dir = instance.build_dir
86        self.log = os.path.join(self.build_dir, "handler.log")
87        self.returncode = 0
88        self.generator = None
89        self.generator_cmd = None
90        self.suite_name_check = True
91        self.ready = False
92
93        self.args = []
94        self.terminated = False
95
96    def get_test_timeout(self):
97        return math.ceil(self.instance.testsuite.timeout *
98                         self.instance.platform.timeout_multiplier *
99                         self.options.timeout_multiplier)
100
101    def record(self, harness):
102        if harness.recording:
103            filename = os.path.join(self.build_dir, "recording.csv")
104            with open(filename, "at") as csvfile:
105                cw = csv.writer(csvfile, harness.fieldnames, lineterminator=os.linesep)
106                cw.writerow(harness.fieldnames)
107                for instance in harness.recording:
108                    cw.writerow(instance)
109
110    def terminate(self, proc):
111        terminate_process(proc)
112        self.terminated = True
113
114    def _verify_ztest_suite_name(self, harness_state, detected_suite_names, handler_time):
115        """
116        If test suite names was found in test's C source code, then verify if
117        detected suite names from output correspond to expected suite names
118        (and not in reverse).
119        """
120        expected_suite_names = self.instance.testsuite.ztest_suite_names
121        logger.debug(f"Expected suite names:{expected_suite_names}")
122        logger.debug(f"Detected suite names:{detected_suite_names}")
123        if not expected_suite_names or \
124                not harness_state == "passed":
125            return
126        if not detected_suite_names:
127            self._missing_suite_name(expected_suite_names, handler_time)
128        for detected_suite_name in detected_suite_names:
129            if detected_suite_name not in expected_suite_names:
130                self._missing_suite_name(expected_suite_names, handler_time)
131                break
132
133    def _missing_suite_name(self, expected_suite_names, handler_time):
134        """
135        Change result of performed test if problem with missing or unpropper
136        suite name was occurred.
137        """
138        self.instance.status = "failed"
139        self.instance.execution_time = handler_time
140        for tc in self.instance.testcases:
141            tc.status = "failed"
142        self.instance.reason = f"Testsuite mismatch"
143        logger.debug("Test suite names were not printed or some of them in " \
144                     "output do not correspond with expected: %s",
145                     str(expected_suite_names))
146
147    def _final_handle_actions(self, harness, handler_time):
148
149        # only for Ztest tests:
150        harness_class_name = type(harness).__name__
151        if self.suite_name_check and harness_class_name == "Test":
152            self._verify_ztest_suite_name(harness.state, harness.detected_suite_names, handler_time)
153            if self.instance.status == 'failed':
154                return
155            if not harness.matched_run_id and harness.run_id_exists:
156                self.instance.status = "failed"
157                self.instance.execution_time = handler_time
158                self.instance.reason = "RunID mismatch"
159                for tc in self.instance.testcases:
160                    tc.status = "failed"
161
162        self.record(harness)
163
164
165class BinaryHandler(Handler):
166    def __init__(self, instance, type_str):
167        """Constructor
168
169        @param instance Test Instance
170        """
171        super().__init__(instance, type_str)
172
173        self.call_west_flash = False
174        self.seed = None
175        self.extra_test_args = None
176        self.line = b""
177
178    def try_kill_process_by_pid(self):
179        if self.pid_fn:
180            pid = int(open(self.pid_fn).read())
181            os.unlink(self.pid_fn)
182            self.pid_fn = None  # clear so we don't try to kill the binary twice
183            try:
184                os.kill(pid, signal.SIGKILL)
185            except ProcessLookupError:
186                pass
187
188    def _output_reader(self, proc):
189        self.line = proc.stdout.readline()
190
191    def _output_handler(self, proc, harness):
192        suffix = '\\r\\n'
193
194        with open(self.log, "wt") as log_out_fp:
195            timeout_extended = False
196            timeout_time = time.time() + self.get_test_timeout()
197            while True:
198                this_timeout = timeout_time - time.time()
199                if this_timeout < 0:
200                    break
201                reader_t = threading.Thread(target=self._output_reader, args=(proc,), daemon=True)
202                reader_t.start()
203                reader_t.join(this_timeout)
204                if not reader_t.is_alive() and self.line != b"":
205                    line_decoded = self.line.decode('utf-8', "replace")
206                    if line_decoded.endswith(suffix):
207                        stripped_line = line_decoded[:-len(suffix)].rstrip()
208                    else:
209                        stripped_line = line_decoded.rstrip()
210                    logger.debug("OUTPUT: %s", stripped_line)
211                    log_out_fp.write(line_decoded)
212                    log_out_fp.flush()
213                    harness.handle(stripped_line)
214                    if harness.state:
215                        if not timeout_extended or harness.capture_coverage:
216                            timeout_extended = True
217                            if harness.capture_coverage:
218                                timeout_time = time.time() + 30
219                            else:
220                                timeout_time = time.time() + 2
221                else:
222                    reader_t.join(0)
223                    break
224            try:
225                # POSIX arch based ztests end on their own,
226                # so let's give it up to 100ms to do so
227                proc.wait(0.1)
228            except subprocess.TimeoutExpired:
229                self.terminate(proc)
230
231    def _create_command(self, robot_test):
232        if robot_test:
233            command = [self.generator_cmd, "run_renode_test"]
234        elif self.call_make_run:
235            command = [self.generator_cmd, "run"]
236        elif self.call_west_flash:
237            command = ["west", "flash", "--skip-rebuild", "-d", self.build_dir]
238        else:
239            command = [self.binary]
240
241        if self.options.enable_valgrind:
242            command = ["valgrind", "--error-exitcode=2",
243                       "--leak-check=full",
244                       "--suppressions=" + ZEPHYR_BASE + "/scripts/valgrind.supp",
245                       "--log-file=" + self.build_dir + "/valgrind.log",
246                       "--track-origins=yes",
247                       ] + command
248
249        # Only valid for native_posix
250        if self.seed is not None:
251            command.append(f"--seed={self.seed}")
252        if self.extra_test_args is not None:
253            command.extend(self.extra_test_args)
254
255        return command
256
257    def _create_env(self):
258        env = os.environ.copy()
259        if self.options.enable_asan:
260            env["ASAN_OPTIONS"] = "log_path=stdout:" + \
261                                  env.get("ASAN_OPTIONS", "")
262            if not self.options.enable_lsan:
263                env["ASAN_OPTIONS"] += "detect_leaks=0"
264
265        if self.options.enable_ubsan:
266            env["UBSAN_OPTIONS"] = "log_path=stdout:halt_on_error=1:" + \
267                                  env.get("UBSAN_OPTIONS", "")
268
269        return env
270
271    def _update_instance_info(self, harness_state, handler_time):
272        self.instance.execution_time = handler_time
273        if not self.terminated and self.returncode != 0:
274            self.instance.status = "failed"
275            if self.options.enable_valgrind and self.returncode == 2:
276                self.instance.reason = "Valgrind error"
277            else:
278                # When a process is killed, the default handler returns 128 + SIGTERM
279                # so in that case the return code itself is not meaningful
280                self.instance.reason = "Failed"
281        elif harness_state:
282            self.instance.status = harness_state
283            if harness_state == "failed":
284                self.instance.reason = "Failed"
285        else:
286            self.instance.status = "failed"
287            self.instance.reason = "Timeout"
288            self.instance.add_missing_case_status("blocked", "Timeout")
289
290    def handle(self, harness):
291        robot_test = getattr(harness, "is_robot_test", False)
292
293        command = self._create_command(robot_test)
294
295        logger.debug("Spawning process: " +
296                     " ".join(shlex.quote(word) for word in command) + os.linesep +
297                     "in directory: " + self.build_dir)
298
299        start_time = time.time()
300
301        env = self._create_env()
302
303        if robot_test:
304            harness.run_robot_test(command, self)
305            return
306
307        with subprocess.Popen(command, stdout=subprocess.PIPE,
308                              stderr=subprocess.PIPE, cwd=self.build_dir, env=env) as proc:
309            logger.debug("Spawning BinaryHandler Thread for %s" % self.name)
310            t = threading.Thread(target=self._output_handler, args=(proc, harness,), daemon=True)
311            t.start()
312            t.join()
313            if t.is_alive():
314                self.terminate(proc)
315                t.join()
316            proc.wait()
317            self.returncode = proc.returncode
318            self.try_kill_process_by_pid()
319
320        handler_time = time.time() - start_time
321
322        if self.options.coverage:
323            subprocess.call(["GCOV_PREFIX=" + self.build_dir,
324                             "gcov", self.sourcedir, "-b", "-s", self.build_dir], shell=True)
325
326        # FIXME: This is needed when killing the simulator, the console is
327        # garbled and needs to be reset. Did not find a better way to do that.
328        if sys.stdout.isatty():
329            subprocess.call(["stty", "sane"], stdin=sys.stdout)
330
331        self._update_instance_info(harness.state, handler_time)
332
333        self._final_handle_actions(harness, handler_time)
334
335
336class SimulationHandler(BinaryHandler):
337    def __init__(self, instance, type_str):
338        """Constructor
339
340        @param instance Test Instance
341        """
342        super().__init__(instance, type_str)
343
344        if type_str == 'renode':
345            self.pid_fn = os.path.join(instance.build_dir, "renode.pid")
346        elif type_str == 'native':
347            self.call_make_run = False
348            self.binary = os.path.join(instance.build_dir, "zephyr", "zephyr.exe")
349            self.ready = True
350
351
352class DeviceHandler(Handler):
353
354    def __init__(self, instance, type_str):
355        """Constructor
356
357        @param instance Test Instance
358        """
359        super().__init__(instance, type_str)
360
361    def monitor_serial(self, ser, halt_event, harness):
362        log_out_fp = open(self.log, "wb")
363
364        if self.options.coverage:
365            # Set capture_coverage to True to indicate that right after
366            # test results we should get coverage data, otherwise we exit
367            # from the test.
368            harness.capture_coverage = True
369
370        # Clear serial leftover.
371        ser.reset_input_buffer()
372
373        while ser.isOpen():
374            if halt_event.is_set():
375                logger.debug('halted')
376                ser.close()
377                break
378
379            try:
380                if not ser.in_waiting:
381                    # no incoming bytes are waiting to be read from
382                    # the serial input buffer, let other threads run
383                    time.sleep(0.001)
384                    continue
385            # maybe the serial port is still in reset
386            # check status may cause error
387            # wait for more time
388            except OSError:
389                time.sleep(0.001)
390                continue
391            except TypeError:
392                # This exception happens if the serial port was closed and
393                # its file descriptor cleared in between of ser.isOpen()
394                # and ser.in_waiting checks.
395                logger.debug("Serial port is already closed, stop reading.")
396                break
397
398            serial_line = None
399            try:
400                serial_line = ser.readline()
401            except TypeError:
402                pass
403            # ignore SerialException which may happen during the serial device
404            # power off/on process.
405            except serial.SerialException:
406                pass
407
408            # Just because ser_fileno has data doesn't mean an entire line
409            # is available yet.
410            if serial_line:
411                sl = serial_line.decode('utf-8', 'ignore').lstrip()
412                logger.debug("DEVICE: {0}".format(sl.rstrip()))
413
414                log_out_fp.write(sl.encode('utf-8'))
415                log_out_fp.flush()
416                harness.handle(sl.rstrip())
417
418            if harness.state:
419                if not harness.capture_coverage:
420                    ser.close()
421                    break
422
423        log_out_fp.close()
424
425    def device_is_available(self, instance):
426        device = instance.platform.name
427        fixture = instance.testsuite.harness_config.get("fixture")
428        dut_found = False
429
430        for d in self.duts:
431            if fixture and fixture not in d.fixtures:
432                continue
433            if d.platform != device or (d.serial is None and d.serial_pty is None):
434                continue
435            dut_found = True
436            d.lock.acquire()
437            avail = False
438            if d.available:
439                d.available = 0
440                d.counter += 1
441                avail = True
442            d.lock.release()
443            if avail:
444                return d
445
446        if not dut_found:
447            raise TwisterException(f"No device to serve as {device} platform.")
448
449        return None
450
451    def make_device_available(self, serial):
452        for d in self.duts:
453            if serial in [d.serial_pty, d.serial]:
454                d.available = 1
455
456    @staticmethod
457    def run_custom_script(script, timeout):
458        with subprocess.Popen(script, stderr=subprocess.PIPE, stdout=subprocess.PIPE) as proc:
459            try:
460                stdout, stderr = proc.communicate(timeout=timeout)
461                logger.debug(stdout.decode())
462                if proc.returncode != 0:
463                    logger.error(f"Custom script failure: {stderr.decode(errors='ignore')}")
464
465            except subprocess.TimeoutExpired:
466                proc.kill()
467                proc.communicate()
468                logger.error("{} timed out".format(script))
469
470    def _create_command(self, runner, hardware):
471        if (self.options.west_flash is not None) or runner:
472            command = ["west", "flash", "--skip-rebuild", "-d", self.build_dir]
473            command_extra_args = []
474
475            # There are three ways this option is used.
476            # 1) bare: --west-flash
477            #    This results in options.west_flash == []
478            # 2) with a value: --west-flash="--board-id=42"
479            #    This results in options.west_flash == "--board-id=42"
480            # 3) Multiple values: --west-flash="--board-id=42,--erase"
481            #    This results in options.west_flash == "--board-id=42 --erase"
482            if self.options.west_flash and self.options.west_flash != []:
483                command_extra_args.extend(self.options.west_flash.split(','))
484
485            if runner:
486                command.append("--runner")
487                command.append(runner)
488
489                board_id = hardware.probe_id or hardware.id
490                product = hardware.product
491                if board_id is not None:
492                    if runner in ("pyocd", "nrfjprog"):
493                        command_extra_args.append("--dev-id")
494                        command_extra_args.append(board_id)
495                    elif runner == "openocd" and product == "STM32 STLink":
496                        command_extra_args.append("--cmd-pre-init")
497                        command_extra_args.append("hla_serial %s" % board_id)
498                    elif runner == "openocd" and product == "STLINK-V3":
499                        command_extra_args.append("--cmd-pre-init")
500                        command_extra_args.append("hla_serial %s" % board_id)
501                    elif runner == "openocd" and product == "EDBG CMSIS-DAP":
502                        command_extra_args.append("--cmd-pre-init")
503                        command_extra_args.append("cmsis_dap_serial %s" % board_id)
504                    elif runner == "jlink":
505                        command.append("--tool-opt=-SelectEmuBySN  %s" % board_id)
506                    elif runner == "stm32cubeprogrammer":
507                        command.append("--tool-opt=sn=%s" % board_id)
508
509                    # Receive parameters from runner_params field.
510                    if hardware.runner_params:
511                        for param in hardware.runner_params:
512                            command.append(param)
513
514            if command_extra_args:
515                command.append('--')
516                command.extend(command_extra_args)
517        else:
518            command = [self.generator_cmd, "-C", self.build_dir, "flash"]
519
520        return command
521
522    def _update_instance_info(self, harness_state, handler_time, flash_error):
523        self.instance.execution_time = handler_time
524        if harness_state:
525            self.instance.status = harness_state
526            if harness_state == "failed":
527                self.instance.reason = "Failed"
528        elif not flash_error:
529            self.instance.status = "failed"
530            self.instance.reason = "Timeout"
531
532        if self.instance.status in ["error", "failed"]:
533            self.instance.add_missing_case_status("blocked", self.instance.reason)
534
535    def _create_serial_connection(self, serial_device, hardware_baud,
536                                  flash_timeout, serial_pty, ser_pty_process):
537        try:
538            ser = serial.Serial(
539                serial_device,
540                baudrate=hardware_baud,
541                parity=serial.PARITY_NONE,
542                stopbits=serial.STOPBITS_ONE,
543                bytesize=serial.EIGHTBITS,
544                # the worst case of no serial input
545                timeout=max(flash_timeout, self.get_test_timeout())
546            )
547        except serial.SerialException as e:
548            self.instance.status = "failed"
549            self.instance.reason = "Serial Device Error"
550            logger.error("Serial device error: %s" % (str(e)))
551
552            self.instance.add_missing_case_status("blocked", "Serial Device Error")
553            if serial_pty and ser_pty_process:
554                ser_pty_process.terminate()
555                outs, errs = ser_pty_process.communicate()
556                logger.debug("Process {} terminated outs: {} errs {}".format(serial_pty, outs, errs))
557
558            if serial_pty:
559                self.make_device_available(serial_pty)
560            else:
561                self.make_device_available(serial_device)
562            raise
563
564        return ser
565
566    def get_hardware(self):
567        hardware = None
568        try:
569            hardware = self.device_is_available(self.instance)
570            while not hardware:
571                time.sleep(1)
572                hardware = self.device_is_available(self.instance)
573        except TwisterException as error:
574            self.instance.status = "failed"
575            self.instance.reason = str(error)
576            logger.error(self.instance.reason)
577        return hardware
578
579    def _get_serial_device(self, serial_pty, hardware_serial):
580        ser_pty_process = None
581        if serial_pty:
582            master, slave = pty.openpty()
583            try:
584                ser_pty_process = subprocess.Popen(
585                    re.split('[, ]', serial_pty),
586                    stdout=master,
587                    stdin=master,
588                    stderr=master
589                )
590            except subprocess.CalledProcessError as error:
591                logger.error(
592                    "Failed to run subprocess {}, error {}".format(
593                        serial_pty,
594                        error.output
595                    )
596                )
597                return
598
599            serial_device = os.ttyname(slave)
600        else:
601            serial_device = hardware_serial
602
603        return serial_device, ser_pty_process
604
605    def handle(self, harness):
606        runner = None
607        hardware = self.get_hardware()
608        if hardware:
609            self.instance.dut = hardware.id
610        if not hardware:
611            return
612
613        runner = hardware.runner or self.options.west_runner
614        serial_pty = hardware.serial_pty
615
616        serial_device, ser_pty_process = self._get_serial_device(serial_pty, hardware.serial)
617
618        logger.debug(f"Using serial device {serial_device} @ {hardware.baud} baud")
619
620        command = self._create_command(runner, hardware)
621
622        pre_script = hardware.pre_script
623        post_flash_script = hardware.post_flash_script
624        post_script = hardware.post_script
625
626        if pre_script:
627            self.run_custom_script(pre_script, 30)
628
629        flash_timeout = hardware.flash_timeout
630        if hardware.flash_with_test:
631            flash_timeout += self.get_test_timeout()
632
633        try:
634            ser = self._create_serial_connection(
635                serial_device,
636                hardware.baud,
637                flash_timeout,
638                serial_pty,
639                ser_pty_process
640            )
641        except serial.SerialException:
642            return
643
644        halt_monitor_evt = threading.Event()
645
646        t = threading.Thread(target=self.monitor_serial, daemon=True,
647                             args=(ser, halt_monitor_evt, harness))
648        start_time = time.time()
649        t.start()
650
651        d_log = "{}/device.log".format(self.instance.build_dir)
652        logger.debug('Flash command: %s', command)
653        flash_error = False
654        try:
655            stdout = stderr = None
656            with subprocess.Popen(command, stderr=subprocess.PIPE, stdout=subprocess.PIPE) as proc:
657                try:
658                    (stdout, stderr) = proc.communicate(timeout=flash_timeout)
659                    # ignore unencodable unicode chars
660                    logger.debug(stdout.decode(errors="ignore"))
661
662                    if proc.returncode != 0:
663                        self.instance.status = "error"
664                        self.instance.reason = "Device issue (Flash error?)"
665                        flash_error = True
666                        with open(d_log, "w") as dlog_fp:
667                            dlog_fp.write(stderr.decode())
668                        halt_monitor_evt.set()
669                except subprocess.TimeoutExpired:
670                    logger.warning("Flash operation timed out.")
671                    self.terminate(proc)
672                    (stdout, stderr) = proc.communicate()
673                    self.instance.status = "error"
674                    self.instance.reason = "Device issue (Timeout)"
675                    flash_error = True
676
677            with open(d_log, "w") as dlog_fp:
678                dlog_fp.write(stderr.decode())
679
680        except subprocess.CalledProcessError:
681            halt_monitor_evt.set()
682            self.instance.status = "error"
683            self.instance.reason = "Device issue (Flash error)"
684            flash_error = True
685
686        if post_flash_script:
687            self.run_custom_script(post_flash_script, 30)
688
689        if not flash_error:
690            # Always wait at most the test timeout here after flashing.
691            t.join(self.get_test_timeout())
692        else:
693            # When the flash error is due exceptions,
694            # twister tell the monitor serial thread
695            # to close the serial. But it is necessary
696            # for this thread being run first and close
697            # have the change to close the serial.
698            t.join(0.1)
699
700        if t.is_alive():
701            logger.debug("Timed out while monitoring serial output on {}".format(self.instance.platform.name))
702
703        if ser.isOpen():
704            ser.close()
705
706        if serial_pty:
707            ser_pty_process.terminate()
708            outs, errs = ser_pty_process.communicate()
709            logger.debug("Process {} terminated outs: {} errs {}".format(serial_pty, outs, errs))
710
711        handler_time = time.time() - start_time
712
713        self._update_instance_info(harness.state, handler_time, flash_error)
714
715        self._final_handle_actions(harness, handler_time)
716
717        if post_script:
718            self.run_custom_script(post_script, 30)
719
720        if serial_pty:
721            self.make_device_available(serial_pty)
722        else:
723            self.make_device_available(serial_device)
724
725
726class QEMUHandler(Handler):
727    """Spawns a thread to monitor QEMU output from pipes
728
729    We pass QEMU_PIPE to 'make run' and monitor the pipes for output.
730    We need to do this as once qemu starts, it runs forever until killed.
731    Test cases emit special messages to the console as they run, we check
732    for these to collect whether the test passed or failed.
733    """
734
735    def __init__(self, instance, type_str):
736        """Constructor
737
738        @param instance Test instance
739        """
740
741        super().__init__(instance, type_str)
742        self.fifo_fn = os.path.join(instance.build_dir, "qemu-fifo")
743
744        self.pid_fn = os.path.join(instance.build_dir, "qemu.pid")
745
746        if instance.testsuite.ignore_qemu_crash:
747            self.ignore_qemu_crash = True
748            self.ignore_unexpected_eof = True
749        else:
750            self.ignore_qemu_crash = False
751            self.ignore_unexpected_eof = False
752
753    @staticmethod
754    def _get_cpu_time(pid):
755        """get process CPU time.
756
757        The guest virtual time in QEMU icount mode isn't host time and
758        it's maintained by counting guest instructions, so we use QEMU
759        process execution time to mostly simulate the time of guest OS.
760        """
761        proc = psutil.Process(pid)
762        cpu_time = proc.cpu_times()
763        return cpu_time.user + cpu_time.system
764
765    @staticmethod
766    def _thread_get_fifo_names(fifo_fn):
767        fifo_in = fifo_fn + ".in"
768        fifo_out = fifo_fn + ".out"
769
770        return fifo_in, fifo_out
771
772    @staticmethod
773    def _thread_open_files(fifo_in, fifo_out, logfile):
774        # These in/out nodes are named from QEMU's perspective, not ours
775        if os.path.exists(fifo_in):
776            os.unlink(fifo_in)
777        os.mkfifo(fifo_in)
778        if os.path.exists(fifo_out):
779            os.unlink(fifo_out)
780        os.mkfifo(fifo_out)
781
782        # We don't do anything with out_fp but we need to open it for
783        # writing so that QEMU doesn't block, due to the way pipes work
784        out_fp = open(fifo_in, "wb")
785        # Disable internal buffering, we don't
786        # want read() or poll() to ever block if there is data in there
787        in_fp = open(fifo_out, "rb", buffering=0)
788        log_out_fp = open(logfile, "wt")
789
790        return out_fp, in_fp, log_out_fp
791
792    @staticmethod
793    def _thread_close_files(fifo_in, fifo_out, pid, out_fp, in_fp, log_out_fp):
794        log_out_fp.close()
795        out_fp.close()
796        in_fp.close()
797        if pid:
798            try:
799                if pid:
800                    os.kill(pid, signal.SIGTERM)
801            except ProcessLookupError:
802                # Oh well, as long as it's dead! User probably sent Ctrl-C
803                pass
804
805        os.unlink(fifo_in)
806        os.unlink(fifo_out)
807
808    @staticmethod
809    def _thread_update_instance_info(handler, handler_time, out_state):
810        handler.instance.execution_time = handler_time
811        if out_state == "timeout":
812            handler.instance.status = "failed"
813            handler.instance.reason = "Timeout"
814        elif out_state == "failed":
815            handler.instance.status = "failed"
816            handler.instance.reason = "Failed"
817        elif out_state in ['unexpected eof', 'unexpected byte']:
818            handler.instance.status = "failed"
819            handler.instance.reason = out_state
820        else:
821            handler.instance.status = out_state
822            handler.instance.reason = "Unknown"
823
824    @staticmethod
825    def _thread(handler, timeout, outdir, logfile, fifo_fn, pid_fn, results,
826                harness, ignore_unexpected_eof=False):
827        fifo_in, fifo_out = QEMUHandler._thread_get_fifo_names(fifo_fn)
828
829        out_fp, in_fp, log_out_fp = QEMUHandler._thread_open_files(
830            fifo_in,
831            fifo_out,
832            logfile
833        )
834
835        start_time = time.time()
836        timeout_time = start_time + timeout
837        p = select.poll()
838        p.register(in_fp, select.POLLIN)
839        out_state = None
840
841        line = ""
842        timeout_extended = False
843
844        pid = 0
845        if os.path.exists(pid_fn):
846            pid = int(open(pid_fn).read())
847
848        while True:
849            this_timeout = int((timeout_time - time.time()) * 1000)
850            if this_timeout < 0 or not p.poll(this_timeout):
851                try:
852                    if pid and this_timeout > 0:
853                        # there's possibility we polled nothing because
854                        # of not enough CPU time scheduled by host for
855                        # QEMU process during p.poll(this_timeout)
856                        cpu_time = QEMUHandler._get_cpu_time(pid)
857                        if cpu_time < timeout and not out_state:
858                            timeout_time = time.time() + (timeout - cpu_time)
859                            continue
860                except ProcessLookupError:
861                    out_state = "failed"
862                    break
863
864                if not out_state:
865                    out_state = "timeout"
866                break
867
868            if pid == 0 and os.path.exists(pid_fn):
869                pid = int(open(pid_fn).read())
870
871            try:
872                c = in_fp.read(1).decode("utf-8")
873            except UnicodeDecodeError:
874                # Test is writing something weird, fail
875                out_state = "unexpected byte"
876                break
877
878            if c == "":
879                # EOF, this shouldn't happen unless QEMU crashes
880                if not ignore_unexpected_eof:
881                    out_state = "unexpected eof"
882                break
883            line = line + c
884            if c != "\n":
885                continue
886
887            # line contains a full line of data output from QEMU
888            log_out_fp.write(line)
889            log_out_fp.flush()
890            line = line.rstrip()
891            logger.debug(f"QEMU ({pid}): {line}")
892
893            harness.handle(line)
894            if harness.state:
895                # if we have registered a fail make sure the state is not
896                # overridden by a false success message coming from the
897                # testsuite
898                if out_state not in ['failed', 'unexpected eof', 'unexpected byte']:
899                    out_state = harness.state
900
901                # if we get some state, that means test is doing well, we reset
902                # the timeout and wait for 2 more seconds to catch anything
903                # printed late. We wait much longer if code
904                # coverage is enabled since dumping this information can
905                # take some time.
906                if not timeout_extended or harness.capture_coverage:
907                    timeout_extended = True
908                    if harness.capture_coverage:
909                        timeout_time = time.time() + 30
910                    else:
911                        timeout_time = time.time() + 2
912            line = ""
913
914        handler_time = time.time() - start_time
915        logger.debug(f"QEMU ({pid}) complete ({out_state}) after {handler_time} seconds")
916
917        QEMUHandler._thread_update_instance_info(handler, handler_time, out_state)
918
919        QEMUHandler._thread_close_files(fifo_in, fifo_out, pid, out_fp, in_fp, log_out_fp)
920
921    def _get_sysbuild_build_dir(self):
922        if self.instance.testsuite.sysbuild:
923            # Load domain yaml to get default domain build directory
924            # Note: for targets using QEMU, we assume that the target will
925            # have added any additional images to the run target manually
926            domain_path = os.path.join(self.build_dir, "domains.yaml")
927            domains = Domains.from_file(domain_path)
928            logger.debug("Loaded sysbuild domain data from %s" % domain_path)
929            build_dir = domains.get_default_domain().build_dir
930        else:
931            build_dir = self.build_dir
932
933        return build_dir
934
935    def _set_qemu_filenames(self, sysbuild_build_dir):
936        # We pass this to QEMU which looks for fifos with .in and .out suffixes.
937        # QEMU fifo will use main build dir
938        self.fifo_fn = os.path.join(self.instance.build_dir, "qemu-fifo")
939        # PID file will be created in the main sysbuild app's build dir
940        self.pid_fn = os.path.join(sysbuild_build_dir, "qemu.pid")
941
942        if os.path.exists(self.pid_fn):
943            os.unlink(self.pid_fn)
944
945        self.log_fn = self.log
946
947    def _create_command(self, sysbuild_build_dir):
948        command = [self.generator_cmd]
949        command += ["-C", sysbuild_build_dir, "run"]
950
951        return command
952
953    def _update_instance_info(self, harness_state, is_timeout):
954        if (self.returncode != 0 and not self.ignore_qemu_crash) or not harness_state:
955            self.instance.status = "failed"
956            if is_timeout:
957                self.instance.reason = "Timeout"
958            else:
959                if not self.instance.reason:
960                    self.instance.reason = "Exited with {}".format(self.returncode)
961            self.instance.add_missing_case_status("blocked")
962
963    def handle(self, harness):
964        self.results = {}
965        self.run = True
966
967        sysbuild_build_dir = self._get_sysbuild_build_dir()
968
969        command = self._create_command(sysbuild_build_dir)
970
971        self._set_qemu_filenames(sysbuild_build_dir)
972
973        self.thread = threading.Thread(name=self.name, target=QEMUHandler._thread,
974                                       args=(self, self.get_test_timeout(), self.build_dir,
975                                             self.log_fn, self.fifo_fn,
976                                             self.pid_fn, self.results, harness,
977                                             self.ignore_unexpected_eof))
978
979        self.thread.daemon = True
980        logger.debug("Spawning QEMUHandler Thread for %s" % self.name)
981        self.thread.start()
982        if sys.stdout.isatty():
983            subprocess.call(["stty", "sane"], stdin=sys.stdout)
984
985        logger.debug("Running %s (%s)" % (self.name, self.type_str))
986
987        is_timeout = False
988        qemu_pid = None
989
990        with subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=self.build_dir) as proc:
991            logger.debug("Spawning QEMUHandler Thread for %s" % self.name)
992
993            try:
994                proc.wait(self.get_test_timeout())
995            except subprocess.TimeoutExpired:
996                # sometimes QEMU can't handle SIGTERM signal correctly
997                # in that case kill -9 QEMU process directly and leave
998                # twister to judge testing result by console output
999
1000                is_timeout = True
1001                self.terminate(proc)
1002                if harness.state == "passed":
1003                    self.returncode = 0
1004                else:
1005                    self.returncode = proc.returncode
1006            else:
1007                if os.path.exists(self.pid_fn):
1008                    qemu_pid = int(open(self.pid_fn).read())
1009                logger.debug(f"No timeout, return code from QEMU ({qemu_pid}): {proc.returncode}")
1010                self.returncode = proc.returncode
1011            # Need to wait for harness to finish processing
1012            # output from QEMU. Otherwise it might miss some
1013            # error messages.
1014            self.thread.join(0)
1015            if self.thread.is_alive():
1016                logger.debug("Timed out while monitoring QEMU output")
1017
1018            if os.path.exists(self.pid_fn):
1019                qemu_pid = int(open(self.pid_fn).read())
1020                os.unlink(self.pid_fn)
1021
1022        logger.debug(f"return code from QEMU ({qemu_pid}): {self.returncode}")
1023
1024        self._update_instance_info(harness.state, is_timeout)
1025
1026        self._final_handle_actions(harness, 0)
1027
1028    def get_fifo(self):
1029        return self.fifo_fn
1030