1#!/usr/bin/env python3
2# vim: set syntax=python ts=4 :
3#
4# Copyright (c) 2018-2022 Intel Corporation
5# Copyright 2022 NXP
6# SPDX-License-Identifier: Apache-2.0
7
8import logging
9import math
10import os
11import psutil
12import re
13import select
14import shlex
15import signal
16import subprocess
17import sys
18import threading
19import time
20
21from pathlib import Path
22from queue import Queue, Empty
23from twisterlib.environment import ZEPHYR_BASE, strip_ansi_sequences
24from twisterlib.error import TwisterException
25from twisterlib.platform import Platform
26sys.path.insert(0, os.path.join(ZEPHYR_BASE, "scripts/pylib/build_helpers"))
27from domains import Domains
28
29try:
30    import serial
31except ImportError:
32    print("Install pyserial python module with pip to use --device-testing option.")
33
34try:
35    import pty
36except ImportError as capture_error:
37    if os.name == "nt":  # "nt" means that program is running on Windows OS
38        pass  # "--device-serial-pty" option is not supported on Windows OS
39    else:
40        raise capture_error
41
42logger = logging.getLogger('twister')
43logger.setLevel(logging.DEBUG)
44
45SUPPORTED_SIMS = ["mdb-nsim", "nsim", "renode", "qemu", "tsim", "armfvp", "xt-sim", "native", "custom"]
46SUPPORTED_SIMS_IN_PYTEST = ['native', 'qemu']
47
48
49def terminate_process(proc):
50    """
51    encapsulate terminate functionality so we do it consistently where ever
52    we might want to terminate the proc.  We need try_kill_process_by_pid
53    because of both how newer ninja (1.6.0 or greater) and .NET / renode
54    work.  Newer ninja's don't seem to pass SIGTERM down to the children
55    so we need to use try_kill_process_by_pid.
56    """
57
58    for child in psutil.Process(proc.pid).children(recursive=True):
59        try:
60            os.kill(child.pid, signal.SIGTERM)
61        except (ProcessLookupError, psutil.NoSuchProcess):
62            pass
63    proc.terminate()
64    # sleep for a while before attempting to kill
65    time.sleep(0.5)
66    proc.kill()
67
68
69class Handler:
70    def __init__(self, instance, type_str="build"):
71        """Constructor
72
73        """
74        self.options = None
75
76        self.run = False
77        self.type_str = type_str
78
79        self.pid_fn = None
80        self.call_make_run = True
81
82        self.name = instance.name
83        self.instance = instance
84        self.sourcedir = instance.testsuite.source_dir
85        self.build_dir = instance.build_dir
86        self.log = os.path.join(self.build_dir, "handler.log")
87        self.returncode = 0
88        self.generator_cmd = None
89        self.suite_name_check = True
90        self.ready = False
91
92        self.args = []
93        self.terminated = False
94
95    def get_test_timeout(self):
96        return math.ceil(self.instance.testsuite.timeout *
97                         self.instance.platform.timeout_multiplier *
98                         self.options.timeout_multiplier)
99
100    def terminate(self, proc):
101        terminate_process(proc)
102        self.terminated = True
103
104    def _verify_ztest_suite_name(self, harness_state, detected_suite_names, handler_time):
105        """
106        If test suite names was found in test's C source code, then verify if
107        detected suite names from output correspond to expected suite names
108        (and not in reverse).
109        """
110        expected_suite_names = self.instance.testsuite.ztest_suite_names
111        logger.debug(f"Expected suite names:{expected_suite_names}")
112        logger.debug(f"Detected suite names:{detected_suite_names}")
113        if not expected_suite_names or \
114                not harness_state == "passed":
115            return
116        if not detected_suite_names:
117            self._missing_suite_name(expected_suite_names, handler_time)
118            return
119        # compare the expect and detect from end one by one without order
120        _d_suite = detected_suite_names[-len(expected_suite_names):]
121        if set(_d_suite) != set(expected_suite_names):
122            if not set(_d_suite).issubset(set(expected_suite_names)):
123                self._missing_suite_name(expected_suite_names, handler_time)
124
125    def _missing_suite_name(self, expected_suite_names, handler_time):
126        """
127        Change result of performed test if problem with missing or unpropper
128        suite name was occurred.
129        """
130        self.instance.status = "failed"
131        self.instance.execution_time = handler_time
132        for tc in self.instance.testcases:
133            tc.status = "failed"
134        self.instance.reason = f"Testsuite mismatch"
135        logger.debug("Test suite names were not printed or some of them in " \
136                     "output do not correspond with expected: %s",
137                     str(expected_suite_names))
138
139    def _final_handle_actions(self, harness, handler_time):
140
141        # only for Ztest tests:
142        harness_class_name = type(harness).__name__
143        if self.suite_name_check and harness_class_name == "Test":
144            self._verify_ztest_suite_name(harness.state, harness.detected_suite_names, handler_time)
145            if self.instance.status == 'failed':
146                return
147            if not harness.matched_run_id and harness.run_id_exists:
148                self.instance.status = "failed"
149                self.instance.execution_time = handler_time
150                self.instance.reason = "RunID mismatch"
151                for tc in self.instance.testcases:
152                    tc.status = "failed"
153
154        self.instance.record(harness.recording)
155
156    def get_default_domain_build_dir(self):
157        if self.instance.sysbuild:
158            # Load domain yaml to get default domain build directory
159            # Note: for targets using QEMU, we assume that the target will
160            # have added any additional images to the run target manually
161            domain_path = os.path.join(self.build_dir, "domains.yaml")
162            domains = Domains.from_file(domain_path)
163            logger.debug("Loaded sysbuild domain data from %s" % domain_path)
164            build_dir = domains.get_default_domain().build_dir
165        else:
166            build_dir = self.build_dir
167        return build_dir
168
169
170class BinaryHandler(Handler):
171    def __init__(self, instance, type_str):
172        """Constructor
173
174        @param instance Test Instance
175        """
176        super().__init__(instance, type_str)
177
178        self.seed = None
179        self.extra_test_args = None
180        self.line = b""
181
182    def try_kill_process_by_pid(self):
183        if self.pid_fn:
184            pid = int(open(self.pid_fn).read())
185            os.unlink(self.pid_fn)
186            self.pid_fn = None  # clear so we don't try to kill the binary twice
187            try:
188                os.kill(pid, signal.SIGKILL)
189            except (ProcessLookupError, psutil.NoSuchProcess):
190                pass
191
192    def _output_reader(self, proc):
193        self.line = proc.stdout.readline()
194
195    def _output_handler(self, proc, harness):
196        suffix = '\\r\\n'
197
198        with open(self.log, "wt") as log_out_fp:
199            timeout_extended = False
200            timeout_time = time.time() + self.get_test_timeout()
201            while True:
202                this_timeout = timeout_time - time.time()
203                if this_timeout < 0:
204                    break
205                reader_t = threading.Thread(target=self._output_reader, args=(proc,), daemon=True)
206                reader_t.start()
207                reader_t.join(this_timeout)
208                if not reader_t.is_alive() and self.line != b"":
209                    line_decoded = self.line.decode('utf-8', "replace")
210                    if line_decoded.endswith(suffix):
211                        stripped_line = line_decoded[:-len(suffix)].rstrip()
212                    else:
213                        stripped_line = line_decoded.rstrip()
214                    logger.debug("OUTPUT: %s", stripped_line)
215                    log_out_fp.write(strip_ansi_sequences(line_decoded))
216                    log_out_fp.flush()
217                    harness.handle(stripped_line)
218                    if harness.state:
219                        if not timeout_extended or harness.capture_coverage:
220                            timeout_extended = True
221                            if harness.capture_coverage:
222                                timeout_time = time.time() + 30
223                            else:
224                                timeout_time = time.time() + 2
225                else:
226                    reader_t.join(0)
227                    break
228            try:
229                # POSIX arch based ztests end on their own,
230                # so let's give it up to 100ms to do so
231                proc.wait(0.1)
232            except subprocess.TimeoutExpired:
233                self.terminate(proc)
234
235    def _create_command(self, robot_test):
236        if robot_test:
237            keywords = os.path.join(self.options.coverage_basedir, 'tests/robot/common.robot')
238            elf = os.path.join(self.build_dir, "zephyr/zephyr.elf")
239            command = [self.generator_cmd]
240            resc = ""
241            uart = ""
242            # os.path.join cannot be used on a Mock object, so we are
243            # explicitly checking the type
244            if isinstance(self.instance.platform, Platform):
245                for board_dir in self.options.board_root:
246                    path = os.path.join(Path(board_dir).parent, self.instance.platform.resc)
247                    if os.path.exists(path):
248                        resc = path
249                        break
250                uart = self.instance.platform.uart
251                command = ["renode-test",
252                            "--variable", "KEYWORDS:" + keywords,
253                            "--variable", "ELF:@" + elf,
254                            "--variable", "RESC:@" + resc,
255                            "--variable", "UART:" + uart]
256        elif self.call_make_run:
257            command = [self.generator_cmd, "run"]
258        elif self.instance.testsuite.type == "unit":
259            command = [self.binary]
260        else:
261            binary = os.path.join(self.get_default_domain_build_dir(), "zephyr", "zephyr.exe")
262            command = [binary]
263
264        if self.options.enable_valgrind:
265            command = ["valgrind", "--error-exitcode=2",
266                       "--leak-check=full",
267                       "--suppressions=" + ZEPHYR_BASE + "/scripts/valgrind.supp",
268                       "--log-file=" + self.build_dir + "/valgrind.log",
269                       "--track-origins=yes",
270                       ] + command
271
272        # Only valid for native_sim
273        if self.seed is not None:
274            command.append(f"--seed={self.seed}")
275        if self.extra_test_args is not None:
276            command.extend(self.extra_test_args)
277
278        return command
279
280    def _create_env(self):
281        env = os.environ.copy()
282        if self.options.enable_asan:
283            env["ASAN_OPTIONS"] = "log_path=stdout:" + \
284                                  env.get("ASAN_OPTIONS", "")
285            if not self.options.enable_lsan:
286                env["ASAN_OPTIONS"] += "detect_leaks=0"
287
288        if self.options.enable_ubsan:
289            env["UBSAN_OPTIONS"] = "log_path=stdout:halt_on_error=1:" + \
290                                  env.get("UBSAN_OPTIONS", "")
291
292        return env
293
294    def _update_instance_info(self, harness_state, handler_time):
295        self.instance.execution_time = handler_time
296        if not self.terminated and self.returncode != 0:
297            self.instance.status = "failed"
298            if self.options.enable_valgrind and self.returncode == 2:
299                self.instance.reason = "Valgrind error"
300            else:
301                # When a process is killed, the default handler returns 128 + SIGTERM
302                # so in that case the return code itself is not meaningful
303                self.instance.reason = "Failed"
304        elif harness_state:
305            self.instance.status = harness_state
306            if harness_state == "failed":
307                self.instance.reason = "Failed"
308        else:
309            self.instance.status = "failed"
310            self.instance.reason = "Timeout"
311            self.instance.add_missing_case_status("blocked", "Timeout")
312
313    def handle(self, harness):
314        robot_test = getattr(harness, "is_robot_test", False)
315
316        command = self._create_command(robot_test)
317
318        logger.debug("Spawning process: " +
319                     " ".join(shlex.quote(word) for word in command) + os.linesep +
320                     "in directory: " + self.build_dir)
321
322        start_time = time.time()
323
324        env = self._create_env()
325
326        if robot_test:
327            harness.run_robot_test(command, self)
328            return
329
330        stderr_log = "{}/handler_stderr.log".format(self.instance.build_dir)
331        with open(stderr_log, "w+") as stderr_log_fp, subprocess.Popen(command, stdout=subprocess.PIPE,
332                              stderr=stderr_log_fp, cwd=self.build_dir, env=env) as proc:
333            logger.debug("Spawning BinaryHandler Thread for %s" % self.name)
334            t = threading.Thread(target=self._output_handler, args=(proc, harness,), daemon=True)
335            t.start()
336            t.join()
337            if t.is_alive():
338                self.terminate(proc)
339                t.join()
340            proc.wait()
341            self.returncode = proc.returncode
342            if proc.returncode != 0:
343                self.instance.status = "error"
344                self.instance.reason = "BinaryHandler returned {}".format(proc.returncode)
345            self.try_kill_process_by_pid()
346
347        handler_time = time.time() - start_time
348
349        # FIXME: This is needed when killing the simulator, the console is
350        # garbled and needs to be reset. Did not find a better way to do that.
351        if sys.stdout.isatty():
352            subprocess.call(["stty", "sane"], stdin=sys.stdout)
353
354        self._update_instance_info(harness.state, handler_time)
355
356        self._final_handle_actions(harness, handler_time)
357
358
359class SimulationHandler(BinaryHandler):
360    def __init__(self, instance, type_str):
361        """Constructor
362
363        @param instance Test Instance
364        """
365        super().__init__(instance, type_str)
366
367        if type_str == 'renode':
368            self.pid_fn = os.path.join(instance.build_dir, "renode.pid")
369        elif type_str == 'native':
370            self.call_make_run = False
371            self.ready = True
372
373
374class DeviceHandler(Handler):
375
376    def __init__(self, instance, type_str):
377        """Constructor
378
379        @param instance Test Instance
380        """
381        super().__init__(instance, type_str)
382
383    def get_test_timeout(self):
384        timeout = super().get_test_timeout()
385        if self.options.enable_coverage:
386            # wait more for gcov data to be dumped on console
387            timeout += 120
388        return timeout
389
390    def monitor_serial(self, ser, halt_event, harness):
391        log_out_fp = open(self.log, "wb")
392
393        if self.options.enable_coverage:
394            # Set capture_coverage to True to indicate that right after
395            # test results we should get coverage data, otherwise we exit
396            # from the test.
397            harness.capture_coverage = True
398
399        # Wait for serial connection
400        while not ser.isOpen():
401            time.sleep(0.1)
402
403        # Clear serial leftover.
404        ser.reset_input_buffer()
405
406        while ser.isOpen():
407            if halt_event.is_set():
408                logger.debug('halted')
409                ser.close()
410                break
411
412            try:
413                if not ser.in_waiting:
414                    # no incoming bytes are waiting to be read from
415                    # the serial input buffer, let other threads run
416                    time.sleep(0.001)
417                    continue
418            # maybe the serial port is still in reset
419            # check status may cause error
420            # wait for more time
421            except OSError:
422                time.sleep(0.001)
423                continue
424            except TypeError:
425                # This exception happens if the serial port was closed and
426                # its file descriptor cleared in between of ser.isOpen()
427                # and ser.in_waiting checks.
428                logger.debug("Serial port is already closed, stop reading.")
429                break
430
431            serial_line = None
432            try:
433                serial_line = ser.readline()
434            except TypeError:
435                pass
436            # ignore SerialException which may happen during the serial device
437            # power off/on process.
438            except serial.SerialException:
439                pass
440
441            # Just because ser_fileno has data doesn't mean an entire line
442            # is available yet.
443            if serial_line:
444                sl = serial_line.decode('utf-8', 'ignore').lstrip()
445                logger.debug("DEVICE: {0}".format(sl.rstrip()))
446
447                log_out_fp.write(strip_ansi_sequences(sl).encode('utf-8'))
448                log_out_fp.flush()
449                harness.handle(sl.rstrip())
450
451            if harness.state:
452                if not harness.capture_coverage:
453                    ser.close()
454                    break
455
456        log_out_fp.close()
457
458    def device_is_available(self, instance):
459        device = instance.platform.name
460        fixture = instance.testsuite.harness_config.get("fixture")
461        dut_found = False
462
463        for d in self.duts:
464            if fixture and fixture not in map(lambda f: f.split(sep=':')[0], d.fixtures):
465                continue
466            if d.platform != device or (d.serial is None and d.serial_pty is None):
467                continue
468            dut_found = True
469            d.lock.acquire()
470            avail = False
471            if d.available:
472                d.available = 0
473                d.counter += 1
474                avail = True
475            d.lock.release()
476            if avail:
477                return d
478
479        if not dut_found:
480            raise TwisterException(f"No device to serve as {device} platform.")
481
482        return None
483
484    def make_device_available(self, serial):
485        for d in self.duts:
486            if serial in [d.serial_pty, d.serial]:
487                d.available = 1
488
489    @staticmethod
490    def run_custom_script(script, timeout):
491        with subprocess.Popen(script, stderr=subprocess.PIPE, stdout=subprocess.PIPE) as proc:
492            try:
493                stdout, stderr = proc.communicate(timeout=timeout)
494                logger.debug(stdout.decode())
495                if proc.returncode != 0:
496                    logger.error(f"Custom script failure: {stderr.decode(errors='ignore')}")
497
498            except subprocess.TimeoutExpired:
499                proc.kill()
500                proc.communicate()
501                logger.error("{} timed out".format(script))
502
503    def _create_command(self, runner, hardware):
504        if (self.options.west_flash is not None) or runner:
505            command = ["west", "flash", "--skip-rebuild", "-d", self.build_dir]
506            command_extra_args = []
507
508            # There are three ways this option is used.
509            # 1) bare: --west-flash
510            #    This results in options.west_flash == []
511            # 2) with a value: --west-flash="--board-id=42"
512            #    This results in options.west_flash == "--board-id=42"
513            # 3) Multiple values: --west-flash="--board-id=42,--erase"
514            #    This results in options.west_flash == "--board-id=42 --erase"
515            if self.options.west_flash and self.options.west_flash != []:
516                command_extra_args.extend(self.options.west_flash.split(','))
517
518            if runner:
519                command.append("--runner")
520                command.append(runner)
521
522                board_id = hardware.probe_id or hardware.id
523                product = hardware.product
524                if board_id is not None:
525                    if runner in ("pyocd", "nrfjprog", "nrfutil"):
526                        command_extra_args.append("--dev-id")
527                        command_extra_args.append(board_id)
528                    elif runner == "openocd" and product == "STM32 STLink":
529                        command_extra_args.append("--cmd-pre-init")
530                        command_extra_args.append("hla_serial %s" % board_id)
531                    elif runner == "openocd" and product == "STLINK-V3":
532                        command_extra_args.append("--cmd-pre-init")
533                        command_extra_args.append("hla_serial %s" % board_id)
534                    elif runner == "openocd" and product == "EDBG CMSIS-DAP":
535                        command_extra_args.append("--cmd-pre-init")
536                        command_extra_args.append("cmsis_dap_serial %s" % board_id)
537                    elif runner == "openocd" and product == "LPC-LINK2 CMSIS-DAP":
538                        command_extra_args.append("--cmd-pre-init")
539                        command_extra_args.append("adapter serial %s" % board_id)
540                    elif runner == "jlink":
541                        command.append("--tool-opt=-SelectEmuBySN  %s" % board_id)
542                    elif runner == "linkserver":
543                        # for linkserver
544                        # --probe=#<number> select by probe index
545                        # --probe=<serial number> select by probe serial number
546                        command.append("--probe=%s" % board_id)
547                    elif runner == "stm32cubeprogrammer":
548                        command.append("--tool-opt=sn=%s" % board_id)
549
550                    # Receive parameters from runner_params field.
551                    if hardware.runner_params:
552                        for param in hardware.runner_params:
553                            command.append(param)
554
555            if command_extra_args:
556                command.append('--')
557                command.extend(command_extra_args)
558        else:
559            command = [self.generator_cmd, "-C", self.build_dir, "flash"]
560
561        return command
562
563    def _update_instance_info(self, harness_state, handler_time, flash_error):
564        self.instance.execution_time = handler_time
565        if harness_state:
566            self.instance.status = harness_state
567            if harness_state == "failed":
568                self.instance.reason = "Failed"
569            self.instance.add_missing_case_status("blocked", harness_state)
570        elif not flash_error:
571            self.instance.status = "failed"
572            self.instance.reason = "Timeout"
573
574        if self.instance.status in ["error", "failed"]:
575            self.instance.add_missing_case_status("blocked", self.instance.reason)
576
577    def _terminate_pty(self, ser_pty, ser_pty_process):
578        logger.debug(f"Terminating serial-pty:'{ser_pty}'")
579        terminate_process(ser_pty_process)
580        try:
581            (stdout, stderr) = ser_pty_process.communicate(timeout=self.get_test_timeout())
582            logger.debug(f"Terminated serial-pty:'{ser_pty}', stdout:'{stdout}', stderr:'{stderr}'")
583        except subprocess.TimeoutExpired:
584            logger.debug(f"Terminated serial-pty:'{ser_pty}'")
585    #
586
587    def _create_serial_connection(self, serial_device, hardware_baud,
588                                  flash_timeout, serial_pty, ser_pty_process):
589        try:
590            ser = serial.Serial(
591                serial_device,
592                baudrate=hardware_baud,
593                parity=serial.PARITY_NONE,
594                stopbits=serial.STOPBITS_ONE,
595                bytesize=serial.EIGHTBITS,
596                # the worst case of no serial input
597                timeout=max(flash_timeout, self.get_test_timeout())
598            )
599        except serial.SerialException as e:
600            self.instance.status = "failed"
601            self.instance.reason = "Serial Device Error"
602            logger.error("Serial device error: %s" % (str(e)))
603
604            self.instance.add_missing_case_status("blocked", "Serial Device Error")
605            if serial_pty and ser_pty_process:
606                self._terminate_pty(serial_pty, ser_pty_process)
607
608            if serial_pty:
609                self.make_device_available(serial_pty)
610            else:
611                self.make_device_available(serial_device)
612            raise
613
614        return ser
615
616    def get_hardware(self):
617        hardware = None
618        try:
619            hardware = self.device_is_available(self.instance)
620            while not hardware:
621                time.sleep(1)
622                hardware = self.device_is_available(self.instance)
623        except TwisterException as error:
624            self.instance.status = "failed"
625            self.instance.reason = str(error)
626            logger.error(self.instance.reason)
627        return hardware
628
629    def _get_serial_device(self, serial_pty, hardware_serial):
630        ser_pty_process = None
631        if serial_pty:
632            master, slave = pty.openpty()
633            try:
634                ser_pty_process = subprocess.Popen(
635                    re.split('[, ]', serial_pty),
636                    stdout=master,
637                    stdin=master,
638                    stderr=master
639                )
640            except subprocess.CalledProcessError as error:
641                logger.error(
642                    "Failed to run subprocess {}, error {}".format(
643                        serial_pty,
644                        error.output
645                    )
646                )
647                return
648
649            serial_device = os.ttyname(slave)
650        else:
651            serial_device = hardware_serial
652
653        return serial_device, ser_pty_process
654
655    def handle(self, harness):
656        runner = None
657        hardware = self.get_hardware()
658        if hardware:
659            self.instance.dut = hardware.id
660        if not hardware:
661            return
662
663        runner = hardware.runner or self.options.west_runner
664        serial_pty = hardware.serial_pty
665
666        serial_device, ser_pty_process = self._get_serial_device(serial_pty, hardware.serial)
667
668        logger.debug(f"Using serial device {serial_device} @ {hardware.baud} baud")
669
670        command = self._create_command(runner, hardware)
671
672        pre_script = hardware.pre_script
673        post_flash_script = hardware.post_flash_script
674        post_script = hardware.post_script
675
676        if pre_script:
677            self.run_custom_script(pre_script, 30)
678
679        flash_timeout = hardware.flash_timeout
680        if hardware.flash_with_test:
681            flash_timeout += self.get_test_timeout()
682
683        serial_port = None
684        if hardware.flash_before is False:
685            serial_port = serial_device
686
687        try:
688            ser = self._create_serial_connection(
689                serial_port,
690                hardware.baud,
691                flash_timeout,
692                serial_pty,
693                ser_pty_process
694            )
695        except serial.SerialException:
696            return
697
698        halt_monitor_evt = threading.Event()
699
700        t = threading.Thread(target=self.monitor_serial, daemon=True,
701                             args=(ser, halt_monitor_evt, harness))
702        start_time = time.time()
703        t.start()
704
705        d_log = "{}/device.log".format(self.instance.build_dir)
706        logger.debug('Flash command: %s', command)
707        flash_error = False
708        try:
709            stdout = stderr = None
710            with subprocess.Popen(command, stderr=subprocess.PIPE, stdout=subprocess.PIPE) as proc:
711                try:
712                    (stdout, stderr) = proc.communicate(timeout=flash_timeout)
713                    # ignore unencodable unicode chars
714                    logger.debug(stdout.decode(errors="ignore"))
715
716                    if proc.returncode != 0:
717                        self.instance.status = "error"
718                        self.instance.reason = "Device issue (Flash error?)"
719                        flash_error = True
720                        with open(d_log, "w") as dlog_fp:
721                            dlog_fp.write(stderr.decode())
722                        halt_monitor_evt.set()
723                except subprocess.TimeoutExpired:
724                    logger.warning("Flash operation timed out.")
725                    self.terminate(proc)
726                    (stdout, stderr) = proc.communicate()
727                    self.instance.status = "error"
728                    self.instance.reason = "Device issue (Timeout)"
729                    flash_error = True
730
731            with open(d_log, "w") as dlog_fp:
732                dlog_fp.write(stderr.decode())
733
734        except subprocess.CalledProcessError:
735            halt_monitor_evt.set()
736            self.instance.status = "error"
737            self.instance.reason = "Device issue (Flash error)"
738            flash_error = True
739
740        if post_flash_script:
741            self.run_custom_script(post_flash_script, 30)
742
743        # Connect to device after flashing it
744        if hardware.flash_before:
745            try:
746                logger.debug(f"Attach serial device {serial_device} @ {hardware.baud} baud")
747                ser.port = serial_device
748                ser.open()
749            except serial.SerialException:
750                return
751
752        if not flash_error:
753            # Always wait at most the test timeout here after flashing.
754            t.join(self.get_test_timeout())
755        else:
756            # When the flash error is due exceptions,
757            # twister tell the monitor serial thread
758            # to close the serial. But it is necessary
759            # for this thread being run first and close
760            # have the change to close the serial.
761            t.join(0.1)
762
763        if t.is_alive():
764            logger.debug("Timed out while monitoring serial output on {}".format(self.instance.platform.name))
765
766        if ser.isOpen():
767            ser.close()
768
769        if serial_pty:
770            self._terminate_pty(serial_pty, ser_pty_process)
771
772        handler_time = time.time() - start_time
773
774        self._update_instance_info(harness.state, handler_time, flash_error)
775
776        self._final_handle_actions(harness, handler_time)
777
778        if post_script:
779            self.run_custom_script(post_script, 30)
780
781        if serial_pty:
782            self.make_device_available(serial_pty)
783        else:
784            self.make_device_available(serial_device)
785
786
787class QEMUHandler(Handler):
788    """Spawns a thread to monitor QEMU output from pipes
789
790    We pass QEMU_PIPE to 'make run' and monitor the pipes for output.
791    We need to do this as once qemu starts, it runs forever until killed.
792    Test cases emit special messages to the console as they run, we check
793    for these to collect whether the test passed or failed.
794    """
795
796    def __init__(self, instance, type_str):
797        """Constructor
798
799        @param instance Test instance
800        """
801
802        super().__init__(instance, type_str)
803        self.fifo_fn = os.path.join(instance.build_dir, "qemu-fifo")
804
805        self.pid_fn = os.path.join(instance.build_dir, "qemu.pid")
806
807        self.stdout_fn = os.path.join(instance.build_dir, "qemu.stdout")
808
809        self.stderr_fn = os.path.join(instance.build_dir, "qemu.stderr")
810
811        if instance.testsuite.ignore_qemu_crash:
812            self.ignore_qemu_crash = True
813            self.ignore_unexpected_eof = True
814        else:
815            self.ignore_qemu_crash = False
816            self.ignore_unexpected_eof = False
817
818    @staticmethod
819    def _get_cpu_time(pid):
820        """get process CPU time.
821
822        The guest virtual time in QEMU icount mode isn't host time and
823        it's maintained by counting guest instructions, so we use QEMU
824        process execution time to mostly simulate the time of guest OS.
825        """
826        proc = psutil.Process(pid)
827        cpu_time = proc.cpu_times()
828        return cpu_time.user + cpu_time.system
829
830    @staticmethod
831    def _thread_get_fifo_names(fifo_fn):
832        fifo_in = fifo_fn + ".in"
833        fifo_out = fifo_fn + ".out"
834
835        return fifo_in, fifo_out
836
837    @staticmethod
838    def _thread_open_files(fifo_in, fifo_out, logfile):
839        # These in/out nodes are named from QEMU's perspective, not ours
840        if os.path.exists(fifo_in):
841            os.unlink(fifo_in)
842        os.mkfifo(fifo_in)
843        if os.path.exists(fifo_out):
844            os.unlink(fifo_out)
845        os.mkfifo(fifo_out)
846
847        # We don't do anything with out_fp but we need to open it for
848        # writing so that QEMU doesn't block, due to the way pipes work
849        out_fp = open(fifo_in, "wb")
850        # Disable internal buffering, we don't
851        # want read() or poll() to ever block if there is data in there
852        in_fp = open(fifo_out, "rb", buffering=0)
853        log_out_fp = open(logfile, "wt")
854
855        return out_fp, in_fp, log_out_fp
856
857    @staticmethod
858    def _thread_close_files(fifo_in, fifo_out, pid, out_fp, in_fp, log_out_fp):
859        log_out_fp.close()
860        out_fp.close()
861        in_fp.close()
862        if pid:
863            try:
864                if pid:
865                    os.kill(pid, signal.SIGTERM)
866            except (ProcessLookupError, psutil.NoSuchProcess):
867                # Oh well, as long as it's dead! User probably sent Ctrl-C
868                pass
869
870        os.unlink(fifo_in)
871        os.unlink(fifo_out)
872
873    @staticmethod
874    def _thread_update_instance_info(handler, handler_time, status, reason):
875        handler.instance.execution_time = handler_time
876        handler.instance.status = status
877        if reason:
878            handler.instance.reason = reason
879        else:
880            handler.instance.reason = "Unknown"
881
882    @staticmethod
883    def _thread(handler, timeout, outdir, logfile, fifo_fn, pid_fn,
884                harness, ignore_unexpected_eof=False):
885        fifo_in, fifo_out = QEMUHandler._thread_get_fifo_names(fifo_fn)
886
887        out_fp, in_fp, log_out_fp = QEMUHandler._thread_open_files(
888            fifo_in,
889            fifo_out,
890            logfile
891        )
892
893        start_time = time.time()
894        timeout_time = start_time + timeout
895        p = select.poll()
896        p.register(in_fp, select.POLLIN)
897        _status = None
898        _reason = None
899
900        line = ""
901        timeout_extended = False
902
903        pid = 0
904        if os.path.exists(pid_fn):
905            pid = int(open(pid_fn).read())
906
907        while True:
908            this_timeout = int((timeout_time - time.time()) * 1000)
909            if timeout_extended:
910                # Quit early after timeout extension if no more data is being received
911                this_timeout = min(this_timeout, 1000)
912            if this_timeout < 0 or not p.poll(this_timeout):
913                try:
914                    if pid and this_timeout > 0:
915                        # there's possibility we polled nothing because
916                        # of not enough CPU time scheduled by host for
917                        # QEMU process during p.poll(this_timeout)
918                        cpu_time = QEMUHandler._get_cpu_time(pid)
919                        if cpu_time < timeout and not _status:
920                            timeout_time = time.time() + (timeout - cpu_time)
921                            continue
922                except psutil.NoSuchProcess:
923                    pass
924                except ProcessLookupError:
925                    _status = "failed"
926                    _reason = "Execution error"
927                    break
928
929                if not _status:
930                    _status = "failed"
931                    _reason = "timeout"
932                break
933
934            if pid == 0 and os.path.exists(pid_fn):
935                pid = int(open(pid_fn).read())
936
937            try:
938                c = in_fp.read(1).decode("utf-8")
939            except UnicodeDecodeError:
940                # Test is writing something weird, fail
941                _status = "failed"
942                _reason = "unexpected byte"
943                break
944
945            if c == "":
946                # EOF, this shouldn't happen unless QEMU crashes
947                if not ignore_unexpected_eof:
948                    _status = "failed"
949                    _reason = "unexpected eof"
950                break
951            line = line + c
952            if c != "\n":
953                continue
954
955            # line contains a full line of data output from QEMU
956            log_out_fp.write(strip_ansi_sequences(line))
957            log_out_fp.flush()
958            line = line.rstrip()
959            logger.debug(f"QEMU ({pid}): {line}")
960
961            harness.handle(line)
962            if harness.state:
963                # if we have registered a fail make sure the status is not
964                # overridden by a false success message coming from the
965                # testsuite
966                if _status != 'failed':
967                    _status = harness.state
968                    _reason = harness.reason
969
970                # if we get some status, that means test is doing well, we reset
971                # the timeout and wait for 2 more seconds to catch anything
972                # printed late. We wait much longer if code
973                # coverage is enabled since dumping this information can
974                # take some time.
975                if not timeout_extended or harness.capture_coverage:
976                    timeout_extended = True
977                    if harness.capture_coverage:
978                        timeout_time = time.time() + 30
979                    else:
980                        timeout_time = time.time() + 2
981            line = ""
982
983        handler_time = time.time() - start_time
984        logger.debug(f"QEMU ({pid}) complete with {_status} ({_reason}) after {handler_time} seconds")
985
986        QEMUHandler._thread_update_instance_info(handler, handler_time, _status, _reason)
987
988        QEMUHandler._thread_close_files(fifo_in, fifo_out, pid, out_fp, in_fp, log_out_fp)
989
990    def _set_qemu_filenames(self, sysbuild_build_dir):
991        # We pass this to QEMU which looks for fifos with .in and .out suffixes.
992        # QEMU fifo will use main build dir
993        self.fifo_fn = os.path.join(self.instance.build_dir, "qemu-fifo")
994        # PID file will be created in the main sysbuild app's build dir
995        self.pid_fn = os.path.join(sysbuild_build_dir, "qemu.pid")
996
997        if os.path.exists(self.pid_fn):
998            os.unlink(self.pid_fn)
999
1000        self.log_fn = self.log
1001
1002    def _create_command(self, sysbuild_build_dir):
1003        command = [self.generator_cmd]
1004        command += ["-C", sysbuild_build_dir, "run"]
1005
1006        return command
1007
1008    def _update_instance_info(self, harness_state, is_timeout):
1009        if (self.returncode != 0 and not self.ignore_qemu_crash) or not harness_state:
1010            self.instance.status = "failed"
1011            if is_timeout:
1012                self.instance.reason = "Timeout"
1013            else:
1014                if not self.instance.reason:
1015                    self.instance.reason = "Exited with {}".format(self.returncode)
1016            self.instance.add_missing_case_status("blocked")
1017
1018    def handle(self, harness):
1019        self.run = True
1020
1021        domain_build_dir = self.get_default_domain_build_dir()
1022
1023        command = self._create_command(domain_build_dir)
1024
1025        self._set_qemu_filenames(domain_build_dir)
1026
1027        self.thread = threading.Thread(name=self.name, target=QEMUHandler._thread,
1028                                       args=(self, self.get_test_timeout(), self.build_dir,
1029                                             self.log_fn, self.fifo_fn,
1030                                             self.pid_fn, harness,
1031                                             self.ignore_unexpected_eof))
1032
1033        self.thread.daemon = True
1034        logger.debug("Spawning QEMUHandler Thread for %s" % self.name)
1035        self.thread.start()
1036        thread_max_time = time.time() + self.get_test_timeout()
1037        if sys.stdout.isatty():
1038            subprocess.call(["stty", "sane"], stdin=sys.stdout)
1039
1040        logger.debug("Running %s (%s)" % (self.name, self.type_str))
1041
1042        is_timeout = False
1043        qemu_pid = None
1044
1045        with subprocess.Popen(command, stdout=open(self.stdout_fn, "wt"), stderr=open(self.stderr_fn, "wt"), cwd=self.build_dir) as proc:
1046            logger.debug("Spawning QEMUHandler Thread for %s" % self.name)
1047
1048            try:
1049                proc.wait(self.get_test_timeout())
1050            except subprocess.TimeoutExpired:
1051                # sometimes QEMU can't handle SIGTERM signal correctly
1052                # in that case kill -9 QEMU process directly and leave
1053                # twister to judge testing result by console output
1054
1055                is_timeout = True
1056                self.terminate(proc)
1057                if harness.state == "passed":
1058                    self.returncode = 0
1059                else:
1060                    self.returncode = proc.returncode
1061            else:
1062                if os.path.exists(self.pid_fn):
1063                    qemu_pid = int(open(self.pid_fn).read())
1064                logger.debug(f"No timeout, return code from QEMU ({qemu_pid}): {proc.returncode}")
1065                self.returncode = proc.returncode
1066            # Need to wait for harness to finish processing
1067            # output from QEMU. Otherwise it might miss some
1068            # messages.
1069            self.thread.join(max(thread_max_time - time.time(), 0))
1070            if self.thread.is_alive():
1071                logger.debug("Timed out while monitoring QEMU output")
1072
1073            if os.path.exists(self.pid_fn):
1074                qemu_pid = int(open(self.pid_fn).read())
1075                os.unlink(self.pid_fn)
1076
1077        logger.debug(f"return code from QEMU ({qemu_pid}): {self.returncode}")
1078
1079        self._update_instance_info(harness.state, is_timeout)
1080
1081        self._final_handle_actions(harness, 0)
1082
1083    def get_fifo(self):
1084        return self.fifo_fn
1085
1086
1087class QEMUWinHandler(Handler):
1088    """Spawns a thread to monitor QEMU output from pipes on Windows OS
1089
1090     QEMU creates single duplex pipe at //.pipe/path, where path is fifo_fn.
1091     We need to do this as once qemu starts, it runs forever until killed.
1092     Test cases emit special messages to the console as they run, we check
1093     for these to collect whether the test passed or failed.
1094     """
1095
1096    def __init__(self, instance, type_str):
1097        """Constructor
1098
1099        @param instance Test instance
1100        """
1101
1102        super().__init__(instance, type_str)
1103        self.pid_fn = os.path.join(instance.build_dir, "qemu.pid")
1104        self.fifo_fn = os.path.join(instance.build_dir, "qemu-fifo")
1105        self.pipe_handle = None
1106        self.pid = 0
1107        self.thread = None
1108        self.stop_thread = False
1109
1110        if instance.testsuite.ignore_qemu_crash:
1111            self.ignore_qemu_crash = True
1112            self.ignore_unexpected_eof = True
1113        else:
1114            self.ignore_qemu_crash = False
1115            self.ignore_unexpected_eof = False
1116
1117    @staticmethod
1118    def _get_cpu_time(pid):
1119        """get process CPU time.
1120
1121        The guest virtual time in QEMU icount mode isn't host time and
1122        it's maintained by counting guest instructions, so we use QEMU
1123        process execution time to mostly simulate the time of guest OS.
1124        """
1125        proc = psutil.Process(pid)
1126        cpu_time = proc.cpu_times()
1127        return cpu_time.user + cpu_time.system
1128
1129    @staticmethod
1130    def _open_log_file(logfile):
1131        return open(logfile, "wt")
1132
1133    @staticmethod
1134    def _close_log_file(log_file):
1135        log_file.close()
1136
1137    @staticmethod
1138    def _stop_qemu_process(pid):
1139        if pid:
1140            try:
1141                if pid:
1142                    os.kill(pid, signal.SIGTERM)
1143            except (ProcessLookupError, psutil.NoSuchProcess):
1144                # Oh well, as long as it's dead! User probably sent Ctrl-C
1145                pass
1146            except OSError:
1147                pass
1148
1149    @staticmethod
1150    def _monitor_update_instance_info(handler, handler_time, status, reason):
1151        handler.instance.execution_time = handler_time
1152        handler.instance.status = status
1153        if reason:
1154            handler.instance.reason = reason
1155        else:
1156            handler.instance.reason = "Unknown"
1157
1158    def _set_qemu_filenames(self, sysbuild_build_dir):
1159        # PID file will be created in the main sysbuild app's build dir
1160        self.pid_fn = os.path.join(sysbuild_build_dir, "qemu.pid")
1161
1162        if os.path.exists(self.pid_fn):
1163            os.unlink(self.pid_fn)
1164
1165        self.log_fn = self.log
1166
1167    def _create_command(self, sysbuild_build_dir):
1168        command = [self.generator_cmd]
1169        command += ["-C", sysbuild_build_dir, "run"]
1170
1171        return command
1172
1173    def _update_instance_info(self, harness_state, is_timeout):
1174        if (self.returncode != 0 and not self.ignore_qemu_crash) or not harness_state:
1175            self.instance.status = "failed"
1176            if is_timeout:
1177                self.instance.reason = "Timeout"
1178            else:
1179                if not self.instance.reason:
1180                    self.instance.reason = "Exited with {}".format(self.returncode)
1181            self.instance.add_missing_case_status("blocked")
1182
1183    def _enqueue_char(self, queue):
1184        while not self.stop_thread:
1185            if not self.pipe_handle:
1186                try:
1187                    self.pipe_handle = os.open(r"\\.\pipe\\" + self.fifo_fn, os.O_RDONLY)
1188                except FileNotFoundError as e:
1189                    if e.args[0] == 2:
1190                        # Pipe is not opened yet, try again after a delay.
1191                        time.sleep(1)
1192                continue
1193
1194            c = ""
1195            try:
1196                c = os.read(self.pipe_handle, 1)
1197            finally:
1198                queue.put(c)
1199
1200    def _monitor_output(self, queue, timeout, logfile, pid_fn, harness, ignore_unexpected_eof=False):
1201        start_time = time.time()
1202        timeout_time = start_time + timeout
1203        _status = None
1204        _reason = None
1205        line = ""
1206        timeout_extended = False
1207        self.pid = 0
1208
1209        log_out_fp = self._open_log_file(logfile)
1210
1211        while True:
1212            this_timeout = int((timeout_time - time.time()) * 1000)
1213            if this_timeout < 0:
1214                try:
1215                    if self.pid and this_timeout > 0:
1216                        # there's possibility we polled nothing because
1217                        # of not enough CPU time scheduled by host for
1218                        # QEMU process during p.poll(this_timeout)
1219                        cpu_time = self._get_cpu_time(self.pid)
1220                        if cpu_time < timeout and not _status:
1221                            timeout_time = time.time() + (timeout - cpu_time)
1222                            continue
1223                except psutil.NoSuchProcess:
1224                    pass
1225                except ProcessLookupError:
1226                    _status = "failed"
1227                    _reason = "Execution error"
1228                    break
1229
1230                if not _status:
1231                    _status = "failed"
1232                    _reason = "timeout"
1233                break
1234
1235            if self.pid == 0 and os.path.exists(pid_fn):
1236                try:
1237                    self.pid = int(open(pid_fn).read())
1238                except ValueError:
1239                    # pid file probably not contains pid yet, continue
1240                    pass
1241
1242            try:
1243                c = queue.get_nowait()
1244            except Empty:
1245                continue
1246            try:
1247                c = c.decode("utf-8")
1248            except UnicodeDecodeError:
1249                # Test is writing something weird, fail
1250                _status = "failed"
1251                _reason = "unexpected byte"
1252                break
1253
1254            if c == "":
1255                # EOF, this shouldn't happen unless QEMU crashes
1256                if not ignore_unexpected_eof:
1257                    _status = "failed"
1258                    _reason = "unexpected eof"
1259                break
1260            line = line + c
1261            if c != "\n":
1262                continue
1263
1264            # line contains a full line of data output from QEMU
1265            log_out_fp.write(line)
1266            log_out_fp.flush()
1267            line = line.rstrip()
1268            logger.debug(f"QEMU ({self.pid}): {line}")
1269
1270            harness.handle(line)
1271            if harness.state:
1272                # if we have registered a fail make sure the state is not
1273                # overridden by a false success message coming from the
1274                # testsuite
1275                if _status != 'failed':
1276                    _status = harness.state
1277                    _reason = harness.reason
1278
1279                # if we get some state, that means test is doing well, we reset
1280                # the timeout and wait for 2 more seconds to catch anything
1281                # printed late. We wait much longer if code
1282                # coverage is enabled since dumping this information can
1283                # take some time.
1284                if not timeout_extended or harness.capture_coverage:
1285                    timeout_extended = True
1286                    if harness.capture_coverage:
1287                        timeout_time = time.time() + 30
1288                    else:
1289                        timeout_time = time.time() + 2
1290            line = ""
1291
1292        self.stop_thread = True
1293
1294        handler_time = time.time() - start_time
1295        logger.debug(f"QEMU ({self.pid}) complete with {_status} ({_reason}) after {handler_time} seconds")
1296        self._monitor_update_instance_info(self, handler_time, _status, _reason)
1297        self._close_log_file(log_out_fp)
1298        self._stop_qemu_process(self.pid)
1299
1300    def handle(self, harness):
1301        self.run = True
1302
1303        domain_build_dir = self.get_default_domain_build_dir()
1304        command = self._create_command(domain_build_dir)
1305        self._set_qemu_filenames(domain_build_dir)
1306
1307        logger.debug("Running %s (%s)" % (self.name, self.type_str))
1308        is_timeout = False
1309        self.stop_thread = False
1310        queue = Queue()
1311
1312        with subprocess.Popen(command, stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT,
1313                              cwd=self.build_dir) as proc:
1314            logger.debug("Spawning QEMUHandler Thread for %s" % self.name)
1315
1316            self.thread = threading.Thread(target=self._enqueue_char, args=(queue,))
1317            self.thread.daemon = True
1318            self.thread.start()
1319
1320            thread_max_time = time.time() + self.get_test_timeout()
1321
1322            self._monitor_output(queue, self.get_test_timeout(), self.log_fn, self.pid_fn, harness,
1323                                 self.ignore_unexpected_eof)
1324
1325            if (thread_max_time - time.time()) < 0:
1326                logger.debug("Timed out while monitoring QEMU output")
1327                proc.terminate()
1328                # sleep for a while before attempting to kill
1329                time.sleep(0.5)
1330                proc.kill()
1331
1332            if harness.state == "passed":
1333                self.returncode = 0
1334            else:
1335                self.returncode = proc.returncode
1336
1337            if os.path.exists(self.pid_fn):
1338                os.unlink(self.pid_fn)
1339
1340        logger.debug(f"return code from QEMU ({self.pid}): {self.returncode}")
1341
1342        os.close(self.pipe_handle)
1343        self.pipe_handle = None
1344
1345        self._update_instance_info(harness.state, is_timeout)
1346
1347        self._final_handle_actions(harness, 0)
1348
1349    def get_fifo(self):
1350        return self.fifo_fn
1351