1#!/usr/bin/env python3
2# vim: set syntax=python ts=4 :
3#
4# Copyright (c) 2018 Intel Corporation
5# SPDX-License-Identifier: Apache-2.0
6
7import os
8import contextlib
9import string
10import mmap
11import sys
12import re
13import subprocess
14import select
15import shutil
16import shlex
17import signal
18import threading
19import concurrent.futures
20from collections import OrderedDict
21import queue
22import time
23import csv
24import glob
25import concurrent
26import xml.etree.ElementTree as ET
27import logging
28import pty
29from pathlib import Path
30from distutils.spawn import find_executable
31from colorama import Fore
32import pickle
33import platform
34import yaml
35import json
36from multiprocessing import Lock, Process, Value
37
38try:
39    # Use the C LibYAML parser if available, rather than the Python parser.
40    # It's much faster.
41    from yaml import CSafeLoader as SafeLoader
42    from yaml import CDumper as Dumper
43except ImportError:
44    from yaml import SafeLoader, Dumper
45
46try:
47    import serial
48except ImportError:
49    print("Install pyserial python module with pip to use --device-testing option.")
50
51try:
52    from tabulate import tabulate
53except ImportError:
54    print("Install tabulate python module with pip to use --device-testing option.")
55
56try:
57    import psutil
58except ImportError:
59    print("Install psutil python module with pip to run in Qemu.")
60
61ZEPHYR_BASE = os.getenv("ZEPHYR_BASE")
62if not ZEPHYR_BASE:
63    sys.exit("$ZEPHYR_BASE environment variable undefined")
64
65# This is needed to load edt.pickle files.
66sys.path.insert(0, os.path.join(ZEPHYR_BASE, "scripts", "dts",
67                                "python-devicetree", "src"))
68from devicetree import edtlib  # pylint: disable=unused-import
69
70# Use this for internal comparisons; that's what canonicalization is
71# for. Don't use it when invoking other components of the build system
72# to avoid confusing and hard to trace inconsistencies in error messages
73# and logs, generated Makefiles, etc. compared to when users invoke these
74# components directly.
75# Note "normalization" is different from canonicalization, see os.path.
76canonical_zephyr_base = os.path.realpath(ZEPHYR_BASE)
77
78sys.path.insert(0, os.path.join(ZEPHYR_BASE, "scripts/"))
79
80import scl
81import expr_parser
82
83logger = logging.getLogger('twister')
84logger.setLevel(logging.DEBUG)
85
86
87class ExecutionCounter(object):
88    def __init__(self, total=0):
89        self._done = Value('i', 0)
90        self._passed = Value('i', 0)
91        self._skipped_configs = Value('i', 0)
92        self._skipped_runtime = Value('i', 0)
93        self._skipped_cases = Value('i', 0)
94        self._error = Value('i', 0)
95        self._failed = Value('i', 0)
96        self._total = Value('i', total)
97        self._cases = Value('i', 0)
98
99
100        self.lock = Lock()
101
102    @property
103    def cases(self):
104        with self._cases.get_lock():
105            return self._cases.value
106
107    @cases.setter
108    def cases(self, value):
109        with self._cases.get_lock():
110            self._cases.value = value
111
112    @property
113    def skipped_cases(self):
114        with self._skipped_cases.get_lock():
115            return self._skipped_cases.value
116
117    @skipped_cases.setter
118    def skipped_cases(self, value):
119        with self._skipped_cases.get_lock():
120            self._skipped_cases.value = value
121
122    @property
123    def error(self):
124        with self._error.get_lock():
125            return self._error.value
126
127    @error.setter
128    def error(self, value):
129        with self._error.get_lock():
130            self._error.value = value
131
132    @property
133    def done(self):
134        with self._done.get_lock():
135            return self._done.value
136
137    @done.setter
138    def done(self, value):
139        with self._done.get_lock():
140            self._done.value = value
141
142    @property
143    def passed(self):
144        with self._passed.get_lock():
145            return self._passed.value
146
147    @passed.setter
148    def passed(self, value):
149        with self._passed.get_lock():
150            self._passed.value = value
151
152    @property
153    def skipped_configs(self):
154        with self._skipped_configs.get_lock():
155            return self._skipped_configs.value
156
157    @skipped_configs.setter
158    def skipped_configs(self, value):
159        with self._skipped_configs.get_lock():
160            self._skipped_configs.value = value
161
162    @property
163    def skipped_runtime(self):
164        with self._skipped_runtime.get_lock():
165            return self._skipped_runtime.value
166
167    @skipped_runtime.setter
168    def skipped_runtime(self, value):
169        with self._skipped_runtime.get_lock():
170            self._skipped_runtime.value = value
171
172    @property
173    def failed(self):
174        with self._failed.get_lock():
175            return self._failed.value
176
177    @failed.setter
178    def failed(self, value):
179        with self._failed.get_lock():
180            self._failed.value = value
181
182    @property
183    def total(self):
184        with self._total.get_lock():
185            return self._total.value
186
187class CMakeCacheEntry:
188    '''Represents a CMake cache entry.
189
190    This class understands the type system in a CMakeCache.txt, and
191    converts the following cache types to Python types:
192
193    Cache Type    Python type
194    ----------    -------------------------------------------
195    FILEPATH      str
196    PATH          str
197    STRING        str OR list of str (if ';' is in the value)
198    BOOL          bool
199    INTERNAL      str OR list of str (if ';' is in the value)
200    ----------    -------------------------------------------
201    '''
202
203    # Regular expression for a cache entry.
204    #
205    # CMake variable names can include escape characters, allowing a
206    # wider set of names than is easy to match with a regular
207    # expression. To be permissive here, use a non-greedy match up to
208    # the first colon (':'). This breaks if the variable name has a
209    # colon inside, but it's good enough.
210    CACHE_ENTRY = re.compile(
211        r'''(?P<name>.*?)                               # name
212         :(?P<type>FILEPATH|PATH|STRING|BOOL|INTERNAL)  # type
213         =(?P<value>.*)                                 # value
214        ''', re.X)
215
216    @classmethod
217    def _to_bool(cls, val):
218        # Convert a CMake BOOL string into a Python bool.
219        #
220        #   "True if the constant is 1, ON, YES, TRUE, Y, or a
221        #   non-zero number. False if the constant is 0, OFF, NO,
222        #   FALSE, N, IGNORE, NOTFOUND, the empty string, or ends in
223        #   the suffix -NOTFOUND. Named boolean constants are
224        #   case-insensitive. If the argument is not one of these
225        #   constants, it is treated as a variable."
226        #
227        # https://cmake.org/cmake/help/v3.0/command/if.html
228        val = val.upper()
229        if val in ('ON', 'YES', 'TRUE', 'Y'):
230            return 1
231        elif val in ('OFF', 'NO', 'FALSE', 'N', 'IGNORE', 'NOTFOUND', ''):
232            return 0
233        elif val.endswith('-NOTFOUND'):
234            return 0
235        else:
236            try:
237                v = int(val)
238                return v != 0
239            except ValueError as exc:
240                raise ValueError('invalid bool {}'.format(val)) from exc
241
242    @classmethod
243    def from_line(cls, line, line_no):
244        # Comments can only occur at the beginning of a line.
245        # (The value of an entry could contain a comment character).
246        if line.startswith('//') or line.startswith('#'):
247            return None
248
249        # Whitespace-only lines do not contain cache entries.
250        if not line.strip():
251            return None
252
253        m = cls.CACHE_ENTRY.match(line)
254        if not m:
255            return None
256
257        name, type_, value = (m.group(g) for g in ('name', 'type', 'value'))
258        if type_ == 'BOOL':
259            try:
260                value = cls._to_bool(value)
261            except ValueError as exc:
262                args = exc.args + ('on line {}: {}'.format(line_no, line),)
263                raise ValueError(args) from exc
264        elif type_ in ['STRING', 'INTERNAL']:
265            # If the value is a CMake list (i.e. is a string which
266            # contains a ';'), convert to a Python list.
267            if ';' in value:
268                value = value.split(';')
269
270        return CMakeCacheEntry(name, value)
271
272    def __init__(self, name, value):
273        self.name = name
274        self.value = value
275
276    def __str__(self):
277        fmt = 'CMakeCacheEntry(name={}, value={})'
278        return fmt.format(self.name, self.value)
279
280
281class CMakeCache:
282    '''Parses and represents a CMake cache file.'''
283
284    @staticmethod
285    def from_file(cache_file):
286        return CMakeCache(cache_file)
287
288    def __init__(self, cache_file):
289        self.cache_file = cache_file
290        self.load(cache_file)
291
292    def load(self, cache_file):
293        entries = []
294        with open(cache_file, 'r') as cache:
295            for line_no, line in enumerate(cache):
296                entry = CMakeCacheEntry.from_line(line, line_no)
297                if entry:
298                    entries.append(entry)
299        self._entries = OrderedDict((e.name, e) for e in entries)
300
301    def get(self, name, default=None):
302        entry = self._entries.get(name)
303        if entry is not None:
304            return entry.value
305        else:
306            return default
307
308    def get_list(self, name, default=None):
309        if default is None:
310            default = []
311        entry = self._entries.get(name)
312        if entry is not None:
313            value = entry.value
314            if isinstance(value, list):
315                return value
316            elif isinstance(value, str):
317                return [value] if value else []
318            else:
319                msg = 'invalid value {} type {}'
320                raise RuntimeError(msg.format(value, type(value)))
321        else:
322            return default
323
324    def __contains__(self, name):
325        return name in self._entries
326
327    def __getitem__(self, name):
328        return self._entries[name].value
329
330    def __setitem__(self, name, entry):
331        if not isinstance(entry, CMakeCacheEntry):
332            msg = 'improper type {} for value {}, expecting CMakeCacheEntry'
333            raise TypeError(msg.format(type(entry), entry))
334        self._entries[name] = entry
335
336    def __delitem__(self, name):
337        del self._entries[name]
338
339    def __iter__(self):
340        return iter(self._entries.values())
341
342
343class TwisterException(Exception):
344    pass
345
346
347class TwisterRuntimeError(TwisterException):
348    pass
349
350
351class ConfigurationError(TwisterException):
352    def __init__(self, cfile, message):
353        TwisterException.__init__(self, cfile + ": " + message)
354
355
356class BuildError(TwisterException):
357    pass
358
359
360class ExecutionError(TwisterException):
361    pass
362
363
364class HarnessImporter:
365
366    def __init__(self, name):
367        sys.path.insert(0, os.path.join(ZEPHYR_BASE, "scripts/pylib/twister"))
368        module = __import__("harness")
369        if name:
370            my_class = getattr(module, name)
371        else:
372            my_class = getattr(module, "Test")
373
374        self.instance = my_class()
375
376
377class Handler:
378    def __init__(self, instance, type_str="build"):
379        """Constructor
380
381        """
382        self.state = "waiting"
383        self.run = False
384        self.duration = 0
385        self.type_str = type_str
386
387        self.binary = None
388        self.pid_fn = None
389        self.call_make_run = False
390
391        self.name = instance.name
392        self.instance = instance
393        self.timeout = instance.testcase.timeout
394        self.sourcedir = instance.testcase.source_dir
395        self.build_dir = instance.build_dir
396        self.log = os.path.join(self.build_dir, "handler.log")
397        self.returncode = 0
398        self.set_state("running", self.duration)
399        self.generator = None
400        self.generator_cmd = None
401
402        self.args = []
403        self.terminated = False
404
405    def set_state(self, state, duration):
406        self.state = state
407        self.duration = duration
408
409    def get_state(self):
410        ret = (self.state, self.duration)
411        return ret
412
413    def record(self, harness):
414        if harness.recording:
415            filename = os.path.join(self.build_dir, "recording.csv")
416            with open(filename, "at") as csvfile:
417                cw = csv.writer(csvfile, harness.fieldnames, lineterminator=os.linesep)
418                cw.writerow(harness.fieldnames)
419                for instance in harness.recording:
420                    cw.writerow(instance)
421
422    def terminate(self, proc):
423        # encapsulate terminate functionality so we do it consistently where ever
424        # we might want to terminate the proc.  We need try_kill_process_by_pid
425        # because of both how newer ninja (1.6.0 or greater) and .NET / renode
426        # work.  Newer ninja's don't seem to pass SIGTERM down to the children
427        # so we need to use try_kill_process_by_pid.
428        for child in psutil.Process(proc.pid).children(recursive=True):
429            try:
430                os.kill(child.pid, signal.SIGTERM)
431            except ProcessLookupError:
432                pass
433        proc.terminate()
434        # sleep for a while before attempting to kill
435        time.sleep(0.5)
436        proc.kill()
437        self.terminated = True
438
439    def add_missing_testscases(self, harness):
440        """
441        If testsuite was broken by some error (e.g. timeout) it is necessary to
442        add information about next testcases, which were not be
443        performed due to this error.
444        """
445        for c in self.instance.testcase.cases:
446            if c not in harness.tests:
447                harness.tests[c] = "BLOCK"
448
449
450class BinaryHandler(Handler):
451    def __init__(self, instance, type_str):
452        """Constructor
453
454        @param instance Test Instance
455        """
456        super().__init__(instance, type_str)
457
458        self.call_west_flash = False
459
460        # Tool options
461        self.valgrind = False
462        self.lsan = False
463        self.asan = False
464        self.ubsan = False
465        self.coverage = False
466
467    def try_kill_process_by_pid(self):
468        if self.pid_fn:
469            pid = int(open(self.pid_fn).read())
470            os.unlink(self.pid_fn)
471            self.pid_fn = None  # clear so we don't try to kill the binary twice
472            try:
473                os.kill(pid, signal.SIGTERM)
474            except ProcessLookupError:
475                pass
476
477    def _output_reader(self, proc):
478        self.line = proc.stdout.readline()
479
480    def _output_handler(self, proc, harness):
481        if harness.is_pytest:
482            harness.handle(None)
483            return
484
485        log_out_fp = open(self.log, "wt")
486        timeout_extended = False
487        timeout_time = time.time() + self.timeout
488        while True:
489            this_timeout = timeout_time - time.time()
490            if this_timeout < 0:
491                break
492            reader_t = threading.Thread(target=self._output_reader, args=(proc,), daemon=True)
493            reader_t.start()
494            reader_t.join(this_timeout)
495            if not reader_t.is_alive():
496                line = self.line
497                logger.debug("OUTPUT: {0}".format(line.decode('utf-8').rstrip()))
498                log_out_fp.write(line.decode('utf-8'))
499                log_out_fp.flush()
500                harness.handle(line.decode('utf-8').rstrip())
501                if harness.state:
502                    if not timeout_extended or harness.capture_coverage:
503                        timeout_extended = True
504                        if harness.capture_coverage:
505                            timeout_time = time.time() + 30
506                        else:
507                            timeout_time = time.time() + 2
508            else:
509                reader_t.join(0)
510                break
511        try:
512            # POSIX arch based ztests end on their own,
513            # so let's give it up to 100ms to do so
514            proc.wait(0.1)
515        except subprocess.TimeoutExpired:
516            self.terminate(proc)
517
518        log_out_fp.close()
519
520    def handle(self):
521
522        harness_name = self.instance.testcase.harness.capitalize()
523        harness_import = HarnessImporter(harness_name)
524        harness = harness_import.instance
525        harness.configure(self.instance)
526
527        if self.call_make_run:
528            command = [self.generator_cmd, "run"]
529        elif self.call_west_flash:
530            command = ["west", "flash", "--skip-rebuild", "-d", self.build_dir]
531        else:
532            command = [self.binary]
533
534        run_valgrind = False
535        if self.valgrind and shutil.which("valgrind"):
536            command = ["valgrind", "--error-exitcode=2",
537                       "--leak-check=full",
538                       "--suppressions=" + ZEPHYR_BASE + "/scripts/valgrind.supp",
539                       "--log-file=" + self.build_dir + "/valgrind.log"
540                       ] + command
541            run_valgrind = True
542
543        logger.debug("Spawning process: " +
544                     " ".join(shlex.quote(word) for word in command) + os.linesep +
545                     "in directory: " + self.build_dir)
546
547        start_time = time.time()
548
549        env = os.environ.copy()
550        if self.asan:
551            env["ASAN_OPTIONS"] = "log_path=stdout:" + \
552                                  env.get("ASAN_OPTIONS", "")
553            if not self.lsan:
554                env["ASAN_OPTIONS"] += "detect_leaks=0"
555
556        if self.ubsan:
557            env["UBSAN_OPTIONS"] = "log_path=stdout:halt_on_error=1:" + \
558                                  env.get("UBSAN_OPTIONS", "")
559
560        with subprocess.Popen(command, stdout=subprocess.PIPE,
561                              stderr=subprocess.PIPE, cwd=self.build_dir, env=env) as proc:
562            logger.debug("Spawning BinaryHandler Thread for %s" % self.name)
563            t = threading.Thread(target=self._output_handler, args=(proc, harness,), daemon=True)
564            t.start()
565            t.join()
566            if t.is_alive():
567                self.terminate(proc)
568                t.join()
569            proc.wait()
570            self.returncode = proc.returncode
571            self.try_kill_process_by_pid()
572
573        handler_time = time.time() - start_time
574
575        if self.coverage:
576            subprocess.call(["GCOV_PREFIX=" + self.build_dir,
577                             "gcov", self.sourcedir, "-b", "-s", self.build_dir], shell=True)
578
579        # FIXME: This is needed when killing the simulator, the console is
580        # garbled and needs to be reset. Did not find a better way to do that.
581        if sys.stdout.isatty():
582            subprocess.call(["stty", "sane"])
583
584        if harness.is_pytest:
585            harness.pytest_run(self.log)
586        self.instance.results = harness.tests
587
588        if not self.terminated and self.returncode != 0:
589            # When a process is killed, the default handler returns 128 + SIGTERM
590            # so in that case the return code itself is not meaningful
591            self.set_state("failed", handler_time)
592            self.instance.reason = "Failed"
593        elif run_valgrind and self.returncode == 2:
594            self.set_state("failed", handler_time)
595            self.instance.reason = "Valgrind error"
596        elif harness.state:
597            self.set_state(harness.state, handler_time)
598            if harness.state == "failed":
599                self.instance.reason = "Failed"
600        else:
601            self.set_state("timeout", handler_time)
602            self.instance.reason = "Timeout"
603            self.add_missing_testscases(harness)
604
605        self.record(harness)
606
607
608class DeviceHandler(Handler):
609
610    def __init__(self, instance, type_str):
611        """Constructor
612
613        @param instance Test Instance
614        """
615        super().__init__(instance, type_str)
616
617        self.suite = None
618
619    def monitor_serial(self, ser, halt_fileno, harness):
620        if harness.is_pytest:
621            harness.handle(None)
622            return
623
624        log_out_fp = open(self.log, "wt")
625
626        ser_fileno = ser.fileno()
627        readlist = [halt_fileno, ser_fileno]
628
629        if self.coverage:
630            # Set capture_coverage to True to indicate that right after
631            # test results we should get coverage data, otherwise we exit
632            # from the test.
633            harness.capture_coverage = True
634
635        ser.flush()
636
637        while ser.isOpen():
638            readable, _, _ = select.select(readlist, [], [], self.timeout)
639
640            if halt_fileno in readable:
641                logger.debug('halted')
642                ser.close()
643                break
644            if ser_fileno not in readable:
645                continue  # Timeout.
646
647            serial_line = None
648            try:
649                serial_line = ser.readline()
650            except TypeError:
651                pass
652            except serial.SerialException:
653                ser.close()
654                break
655
656            # Just because ser_fileno has data doesn't mean an entire line
657            # is available yet.
658            if serial_line:
659                sl = serial_line.decode('utf-8', 'ignore').lstrip()
660                logger.debug("DEVICE: {0}".format(sl.rstrip()))
661
662                log_out_fp.write(sl)
663                log_out_fp.flush()
664                harness.handle(sl.rstrip())
665
666            if harness.state:
667                if not harness.capture_coverage:
668                    ser.close()
669                    break
670
671        log_out_fp.close()
672
673    def device_is_available(self, instance):
674        device = instance.platform.name
675        fixture = instance.testcase.harness_config.get("fixture")
676        for d in self.suite.duts:
677            if fixture and fixture not in d.fixtures:
678                continue
679            if d.platform != device or not (d.serial or d.serial_pty):
680                continue
681            d.lock.acquire()
682            avail = False
683            if d.available:
684                d.available = 0
685                d.counter += 1
686                avail = True
687            d.lock.release()
688            if avail:
689                return d
690
691        return None
692
693    def make_device_available(self, serial):
694        for d in self.suite.duts:
695            if d.serial == serial or d.serial_pty:
696                d.available = 1
697
698    @staticmethod
699    def run_custom_script(script, timeout):
700        with subprocess.Popen(script, stderr=subprocess.PIPE, stdout=subprocess.PIPE) as proc:
701            try:
702                stdout, _ = proc.communicate(timeout=timeout)
703                logger.debug(stdout.decode())
704
705            except subprocess.TimeoutExpired:
706                proc.kill()
707                proc.communicate()
708                logger.error("{} timed out".format(script))
709
710    def handle(self):
711        out_state = "failed"
712        runner = None
713
714        hardware = self.device_is_available(self.instance)
715        while not hardware:
716            logger.debug("Waiting for device {} to become available".format(self.instance.platform.name))
717            time.sleep(1)
718            hardware = self.device_is_available(self.instance)
719
720        runner = hardware.runner or self.suite.west_runner
721        serial_pty = hardware.serial_pty
722
723        ser_pty_process = None
724        if serial_pty:
725            master, slave = pty.openpty()
726            try:
727                ser_pty_process = subprocess.Popen(re.split(',| ', serial_pty), stdout=master, stdin=master, stderr=master)
728            except subprocess.CalledProcessError as error:
729                logger.error("Failed to run subprocess {}, error {}".format(serial_pty, error.output))
730                return
731
732            serial_device = os.ttyname(slave)
733        else:
734            serial_device = hardware.serial
735
736        logger.debug("Using serial device {}".format(serial_device))
737
738        if (self.suite.west_flash is not None) or runner:
739            command = ["west", "flash", "--skip-rebuild", "-d", self.build_dir]
740            command_extra_args = []
741
742            # There are three ways this option is used.
743            # 1) bare: --west-flash
744            #    This results in options.west_flash == []
745            # 2) with a value: --west-flash="--board-id=42"
746            #    This results in options.west_flash == "--board-id=42"
747            # 3) Multiple values: --west-flash="--board-id=42,--erase"
748            #    This results in options.west_flash == "--board-id=42 --erase"
749            if self.suite.west_flash and self.suite.west_flash != []:
750                command_extra_args.extend(self.suite.west_flash.split(','))
751
752            if runner:
753                command.append("--runner")
754                command.append(runner)
755
756                board_id = hardware.probe_id or hardware.id
757                product = hardware.product
758                if board_id is not None:
759                    if runner == "pyocd":
760                        command_extra_args.append("--board-id")
761                        command_extra_args.append(board_id)
762                    elif runner == "nrfjprog":
763                        command_extra_args.append("--snr")
764                        command_extra_args.append(board_id)
765                    elif runner == "openocd" and product == "STM32 STLink":
766                        command_extra_args.append("--cmd-pre-init")
767                        command_extra_args.append("hla_serial %s" % (board_id))
768                    elif runner == "openocd" and product == "STLINK-V3":
769                        command_extra_args.append("--cmd-pre-init")
770                        command_extra_args.append("hla_serial %s" % (board_id))
771                    elif runner == "openocd" and product == "EDBG CMSIS-DAP":
772                        command_extra_args.append("--cmd-pre-init")
773                        command_extra_args.append("cmsis_dap_serial %s" % (board_id))
774                    elif runner == "jlink":
775                        command.append("--tool-opt=-SelectEmuBySN  %s" % (board_id))
776                    elif runner == "stm32cubeprogrammer":
777                        command.append("--tool-opt=sn=%s" % (board_id))
778
779            if command_extra_args != []:
780                command.append('--')
781                command.extend(command_extra_args)
782        else:
783            command = [self.generator_cmd, "-C", self.build_dir, "flash"]
784
785        pre_script = hardware.pre_script
786        post_flash_script = hardware.post_flash_script
787        post_script = hardware.post_script
788
789        if pre_script:
790            self.run_custom_script(pre_script, 30)
791
792        try:
793            ser = serial.Serial(
794                serial_device,
795                baudrate=115200,
796                parity=serial.PARITY_NONE,
797                stopbits=serial.STOPBITS_ONE,
798                bytesize=serial.EIGHTBITS,
799                timeout=self.timeout
800            )
801        except serial.SerialException as e:
802            self.set_state("failed", 0)
803            self.instance.reason = "Failed"
804            logger.error("Serial device error: %s" % (str(e)))
805
806            if serial_pty and ser_pty_process:
807                ser_pty_process.terminate()
808                outs, errs = ser_pty_process.communicate()
809                logger.debug("Process {} terminated outs: {} errs {}".format(serial_pty, outs, errs))
810
811            self.make_device_available(serial_device)
812            return
813
814        ser.flush()
815
816        harness_name = self.instance.testcase.harness.capitalize()
817        harness_import = HarnessImporter(harness_name)
818        harness = harness_import.instance
819        harness.configure(self.instance)
820        read_pipe, write_pipe = os.pipe()
821        start_time = time.time()
822
823        t = threading.Thread(target=self.monitor_serial, daemon=True,
824                             args=(ser, read_pipe, harness))
825        t.start()
826
827        d_log = "{}/device.log".format(self.instance.build_dir)
828        logger.debug('Flash command: %s', command)
829        try:
830            stdout = stderr = None
831            with subprocess.Popen(command, stderr=subprocess.PIPE, stdout=subprocess.PIPE) as proc:
832                try:
833                    (stdout, stderr) = proc.communicate(timeout=30)
834                    logger.debug(stdout.decode())
835
836                    if proc.returncode != 0:
837                        self.instance.reason = "Device issue (Flash?)"
838                        with open(d_log, "w") as dlog_fp:
839                            dlog_fp.write(stderr.decode())
840                        os.write(write_pipe, b'x')  # halt the thread
841                        out_state = "flash_error"
842                except subprocess.TimeoutExpired:
843                    proc.kill()
844                    (stdout, stderr) = proc.communicate()
845                    self.instance.reason = "Device issue (Timeout)"
846
847            with open(d_log, "w") as dlog_fp:
848                dlog_fp.write(stderr.decode())
849
850        except subprocess.CalledProcessError:
851            os.write(write_pipe, b'x')  # halt the thread
852
853        if post_flash_script:
854            self.run_custom_script(post_flash_script, 30)
855
856        t.join(self.timeout)
857        if t.is_alive():
858            logger.debug("Timed out while monitoring serial output on {}".format(self.instance.platform.name))
859            out_state = "timeout"
860
861        if ser.isOpen():
862            ser.close()
863
864        if serial_pty:
865            ser_pty_process.terminate()
866            outs, errs = ser_pty_process.communicate()
867            logger.debug("Process {} terminated outs: {} errs {}".format(serial_pty, outs, errs))
868
869        os.close(write_pipe)
870        os.close(read_pipe)
871
872        handler_time = time.time() - start_time
873
874        if out_state in ["timeout", "flash_error"]:
875            self.add_missing_testscases(harness)
876
877            if out_state == "timeout":
878                self.instance.reason = "Timeout"
879            elif out_state == "flash_error":
880                self.instance.reason = "Flash error"
881
882        if harness.is_pytest:
883            harness.pytest_run(self.log)
884        self.instance.results = harness.tests
885
886        # sometimes a test instance hasn't been executed successfully with an
887        # empty dictionary results, in order to include it into final report,
888        # so fill the results as BLOCK
889        if self.instance.results == {}:
890            for k in self.instance.testcase.cases:
891                self.instance.results[k] = 'BLOCK'
892
893        if harness.state:
894            self.set_state(harness.state, handler_time)
895            if harness.state == "failed":
896                self.instance.reason = "Failed"
897        else:
898            self.set_state(out_state, handler_time)
899
900        if post_script:
901            self.run_custom_script(post_script, 30)
902
903        self.make_device_available(serial_device)
904        self.record(harness)
905
906
907class QEMUHandler(Handler):
908    """Spawns a thread to monitor QEMU output from pipes
909
910    We pass QEMU_PIPE to 'make run' and monitor the pipes for output.
911    We need to do this as once qemu starts, it runs forever until killed.
912    Test cases emit special messages to the console as they run, we check
913    for these to collect whether the test passed or failed.
914    """
915
916    def __init__(self, instance, type_str):
917        """Constructor
918
919        @param instance Test instance
920        """
921
922        super().__init__(instance, type_str)
923        self.fifo_fn = os.path.join(instance.build_dir, "qemu-fifo")
924
925        self.pid_fn = os.path.join(instance.build_dir, "qemu.pid")
926
927        if "ignore_qemu_crash" in instance.testcase.tags:
928            self.ignore_qemu_crash = True
929            self.ignore_unexpected_eof = True
930        else:
931            self.ignore_qemu_crash = False
932            self.ignore_unexpected_eof = False
933
934    @staticmethod
935    def _get_cpu_time(pid):
936        """get process CPU time.
937
938        The guest virtual time in QEMU icount mode isn't host time and
939        it's maintained by counting guest instructions, so we use QEMU
940        process exection time to mostly simulate the time of guest OS.
941        """
942        proc = psutil.Process(pid)
943        cpu_time = proc.cpu_times()
944        return cpu_time.user + cpu_time.system
945
946    @staticmethod
947    def _thread(handler, timeout, outdir, logfile, fifo_fn, pid_fn, results, harness,
948                ignore_unexpected_eof=False):
949        fifo_in = fifo_fn + ".in"
950        fifo_out = fifo_fn + ".out"
951
952        # These in/out nodes are named from QEMU's perspective, not ours
953        if os.path.exists(fifo_in):
954            os.unlink(fifo_in)
955        os.mkfifo(fifo_in)
956        if os.path.exists(fifo_out):
957            os.unlink(fifo_out)
958        os.mkfifo(fifo_out)
959
960        # We don't do anything with out_fp but we need to open it for
961        # writing so that QEMU doesn't block, due to the way pipes work
962        out_fp = open(fifo_in, "wb")
963        # Disable internal buffering, we don't
964        # want read() or poll() to ever block if there is data in there
965        in_fp = open(fifo_out, "rb", buffering=0)
966        log_out_fp = open(logfile, "wt")
967
968        start_time = time.time()
969        timeout_time = start_time + timeout
970        p = select.poll()
971        p.register(in_fp, select.POLLIN)
972        out_state = None
973
974        line = ""
975        timeout_extended = False
976
977        pid = 0
978        if os.path.exists(pid_fn):
979            pid = int(open(pid_fn).read())
980
981        while True:
982            this_timeout = int((timeout_time - time.time()) * 1000)
983            if this_timeout < 0 or not p.poll(this_timeout):
984                try:
985                    if pid and this_timeout > 0:
986                        #there's possibility we polled nothing because
987                        #of not enough CPU time scheduled by host for
988                        #QEMU process during p.poll(this_timeout)
989                        cpu_time = QEMUHandler._get_cpu_time(pid)
990                        if cpu_time < timeout and not out_state:
991                            timeout_time = time.time() + (timeout - cpu_time)
992                            continue
993                except ProcessLookupError:
994                    out_state = "failed"
995                    break
996
997                if not out_state:
998                    out_state = "timeout"
999                break
1000
1001            if pid == 0 and os.path.exists(pid_fn):
1002                pid = int(open(pid_fn).read())
1003
1004            if harness.is_pytest:
1005                harness.handle(None)
1006                out_state = harness.state
1007                break
1008
1009            try:
1010                c = in_fp.read(1).decode("utf-8")
1011            except UnicodeDecodeError:
1012                # Test is writing something weird, fail
1013                out_state = "unexpected byte"
1014                break
1015
1016            if c == "":
1017                # EOF, this shouldn't happen unless QEMU crashes
1018                if not ignore_unexpected_eof:
1019                    out_state = "unexpected eof"
1020                break
1021            line = line + c
1022            if c != "\n":
1023                continue
1024
1025            # line contains a full line of data output from QEMU
1026            log_out_fp.write(line)
1027            log_out_fp.flush()
1028            line = line.strip()
1029            logger.debug(f"QEMU ({pid}): {line}")
1030
1031            harness.handle(line)
1032            if harness.state:
1033                # if we have registered a fail make sure the state is not
1034                # overridden by a false success message coming from the
1035                # testsuite
1036                if out_state not in ['failed', 'unexpected eof', 'unexpected byte']:
1037                    out_state = harness.state
1038
1039                # if we get some state, that means test is doing well, we reset
1040                # the timeout and wait for 2 more seconds to catch anything
1041                # printed late. We wait much longer if code
1042                # coverage is enabled since dumping this information can
1043                # take some time.
1044                if not timeout_extended or harness.capture_coverage:
1045                    timeout_extended = True
1046                    if harness.capture_coverage:
1047                        timeout_time = time.time() + 30
1048                    else:
1049                        timeout_time = time.time() + 2
1050            line = ""
1051
1052        if harness.is_pytest:
1053            harness.pytest_run(logfile)
1054            out_state = harness.state
1055
1056        handler.record(harness)
1057
1058        handler_time = time.time() - start_time
1059        logger.debug(f"QEMU ({pid}) complete ({out_state}) after {handler_time} seconds")
1060
1061        if out_state == "timeout":
1062            handler.instance.reason = "Timeout"
1063            handler.set_state("failed", handler_time)
1064        elif out_state == "failed":
1065            handler.instance.reason = "Failed"
1066            handler.set_state("failed", handler_time)
1067        elif out_state in ['unexpected eof', 'unexpected byte']:
1068            handler.instance.reason = out_state
1069            handler.set_state("failed", handler_time)
1070        else:
1071            handler.set_state(out_state, handler_time)
1072
1073        log_out_fp.close()
1074        out_fp.close()
1075        in_fp.close()
1076        if pid:
1077            try:
1078                if pid:
1079                    os.kill(pid, signal.SIGTERM)
1080            except ProcessLookupError:
1081                # Oh well, as long as it's dead! User probably sent Ctrl-C
1082                pass
1083
1084        os.unlink(fifo_in)
1085        os.unlink(fifo_out)
1086
1087    def handle(self):
1088        self.results = {}
1089        self.run = True
1090
1091        # We pass this to QEMU which looks for fifos with .in and .out
1092        # suffixes.
1093
1094        self.fifo_fn = os.path.join(self.instance.build_dir, "qemu-fifo")
1095        self.pid_fn = os.path.join(self.instance.build_dir, "qemu.pid")
1096
1097        if os.path.exists(self.pid_fn):
1098            os.unlink(self.pid_fn)
1099
1100        self.log_fn = self.log
1101
1102        harness_import = HarnessImporter(self.instance.testcase.harness.capitalize())
1103        harness = harness_import.instance
1104        harness.configure(self.instance)
1105
1106        self.thread = threading.Thread(name=self.name, target=QEMUHandler._thread,
1107                                       args=(self, self.timeout, self.build_dir,
1108                                             self.log_fn, self.fifo_fn,
1109                                             self.pid_fn, self.results, harness,
1110                                             self.ignore_unexpected_eof))
1111
1112        self.instance.results = harness.tests
1113        self.thread.daemon = True
1114        logger.debug("Spawning QEMUHandler Thread for %s" % self.name)
1115        self.thread.start()
1116        if sys.stdout.isatty():
1117            subprocess.call(["stty", "sane"])
1118
1119        logger.debug("Running %s (%s)" % (self.name, self.type_str))
1120        command = [self.generator_cmd]
1121        command += ["-C", self.build_dir, "run"]
1122
1123        is_timeout = False
1124        qemu_pid = None
1125
1126        with subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=self.build_dir) as proc:
1127            logger.debug("Spawning QEMUHandler Thread for %s" % self.name)
1128
1129            try:
1130                proc.wait(self.timeout)
1131            except subprocess.TimeoutExpired:
1132                # sometimes QEMU can't handle SIGTERM signal correctly
1133                # in that case kill -9 QEMU process directly and leave
1134                # twister to judge testing result by console output
1135
1136                is_timeout = True
1137                self.terminate(proc)
1138                if harness.state == "passed":
1139                    self.returncode = 0
1140                else:
1141                    self.returncode = proc.returncode
1142            else:
1143                if os.path.exists(self.pid_fn):
1144                    qemu_pid = int(open(self.pid_fn).read())
1145                logger.debug(f"No timeout, return code from QEMU ({qemu_pid}): {proc.returncode}")
1146                self.returncode = proc.returncode
1147            # Need to wait for harness to finish processing
1148            # output from QEMU. Otherwise it might miss some
1149            # error messages.
1150            self.thread.join(0)
1151            if self.thread.is_alive():
1152                logger.debug("Timed out while monitoring QEMU output")
1153
1154            if os.path.exists(self.pid_fn):
1155                qemu_pid = int(open(self.pid_fn).read())
1156                os.unlink(self.pid_fn)
1157
1158        logger.debug(f"return code from QEMU ({qemu_pid}): {self.returncode}")
1159
1160        if (self.returncode != 0 and not self.ignore_qemu_crash) or not harness.state:
1161            self.set_state("failed", 0)
1162            if is_timeout:
1163                self.instance.reason = "Timeout"
1164            else:
1165                self.instance.reason = "Exited with {}".format(self.returncode)
1166            self.add_missing_testscases(harness)
1167
1168    def get_fifo(self):
1169        return self.fifo_fn
1170
1171
1172class SizeCalculator:
1173    alloc_sections = [
1174        "bss",
1175        "noinit",
1176        "app_bss",
1177        "app_noinit",
1178        "ccm_bss",
1179        "ccm_noinit"
1180    ]
1181
1182    rw_sections = [
1183        "datas",
1184        "initlevel",
1185        "exceptions",
1186        "initshell",
1187        "_static_thread_data_area",
1188        "k_timer_area",
1189        "k_mem_slab_area",
1190        "k_mem_pool_area",
1191        "sw_isr_table",
1192        "k_sem_area",
1193        "k_mutex_area",
1194        "app_shmem_regions",
1195        "_k_fifo_area",
1196        "_k_lifo_area",
1197        "k_stack_area",
1198        "k_msgq_area",
1199        "k_mbox_area",
1200        "k_pipe_area",
1201        "net_if_area",
1202        "net_if_dev_area",
1203        "net_l2_area",
1204        "net_l2_data",
1205        "k_queue_area",
1206        "_net_buf_pool_area",
1207        "app_datas",
1208        "kobject_data",
1209        "mmu_tables",
1210        "app_pad",
1211        "priv_stacks",
1212        "ccm_data",
1213        "usb_descriptor",
1214        "usb_data", "usb_bos_desc",
1215        "uart_mux",
1216        'log_backends_sections',
1217        'log_dynamic_sections',
1218        'log_const_sections',
1219        "app_smem",
1220        'shell_root_cmds_sections',
1221        'log_const_sections',
1222        "font_entry_sections",
1223        "priv_stacks_noinit",
1224        "_GCOV_BSS_SECTION_NAME",
1225        "gcov",
1226        "nocache",
1227        "devices",
1228        "k_heap_area",
1229    ]
1230
1231    # These get copied into RAM only on non-XIP
1232    ro_sections = [
1233        "rom_start",
1234        "text",
1235        "ctors",
1236        "init_array",
1237        "reset",
1238        "z_object_assignment_area",
1239        "rodata",
1240        "net_l2",
1241        "vector",
1242        "sw_isr_table",
1243        "settings_handler_static_area",
1244        "bt_l2cap_fixed_chan_area",
1245        "bt_l2cap_br_fixed_chan_area",
1246        "bt_gatt_service_static_area",
1247        "vectors",
1248        "net_socket_register_area",
1249        "net_ppp_proto",
1250        "shell_area",
1251        "tracing_backend_area",
1252        "ppp_protocol_handler_area",
1253    ]
1254
1255    def __init__(self, filename, extra_sections):
1256        """Constructor
1257
1258        @param filename Path to the output binary
1259            The <filename> is parsed by objdump to determine section sizes
1260        """
1261        # Make sure this is an ELF binary
1262        with open(filename, "rb") as f:
1263            magic = f.read(4)
1264
1265        try:
1266            if magic != b'\x7fELF':
1267                raise TwisterRuntimeError("%s is not an ELF binary" % filename)
1268        except Exception as e:
1269            print(str(e))
1270            sys.exit(2)
1271
1272        # Search for CONFIG_XIP in the ELF's list of symbols using NM and AWK.
1273        # GREP can not be used as it returns an error if the symbol is not
1274        # found.
1275        is_xip_command = "nm " + filename + \
1276                         " | awk '/CONFIG_XIP/ { print $3 }'"
1277        is_xip_output = subprocess.check_output(
1278            is_xip_command, shell=True, stderr=subprocess.STDOUT).decode(
1279            "utf-8").strip()
1280        try:
1281            if is_xip_output.endswith("no symbols"):
1282                raise TwisterRuntimeError("%s has no symbol information" % filename)
1283        except Exception as e:
1284            print(str(e))
1285            sys.exit(2)
1286
1287        self.is_xip = (len(is_xip_output) != 0)
1288
1289        self.filename = filename
1290        self.sections = []
1291        self.rom_size = 0
1292        self.ram_size = 0
1293        self.extra_sections = extra_sections
1294
1295        self._calculate_sizes()
1296
1297    def get_ram_size(self):
1298        """Get the amount of RAM the application will use up on the device
1299
1300        @return amount of RAM, in bytes
1301        """
1302        return self.ram_size
1303
1304    def get_rom_size(self):
1305        """Get the size of the data that this application uses on device's flash
1306
1307        @return amount of ROM, in bytes
1308        """
1309        return self.rom_size
1310
1311    def unrecognized_sections(self):
1312        """Get a list of sections inside the binary that weren't recognized
1313
1314        @return list of unrecognized section names
1315        """
1316        slist = []
1317        for v in self.sections:
1318            if not v["recognized"]:
1319                slist.append(v["name"])
1320        return slist
1321
1322    def _calculate_sizes(self):
1323        """ Calculate RAM and ROM usage by section """
1324        objdump_command = "objdump -h " + self.filename
1325        objdump_output = subprocess.check_output(
1326            objdump_command, shell=True).decode("utf-8").splitlines()
1327
1328        for line in objdump_output:
1329            words = line.split()
1330
1331            if not words:  # Skip lines that are too short
1332                continue
1333
1334            index = words[0]
1335            if not index[0].isdigit():  # Skip lines that do not start
1336                continue  # with a digit
1337
1338            name = words[1]  # Skip lines with section names
1339            if name[0] == '.':  # starting with '.'
1340                continue
1341
1342            # TODO this doesn't actually reflect the size in flash or RAM as
1343            # it doesn't include linker-imposed padding between sections.
1344            # It is close though.
1345            size = int(words[2], 16)
1346            if size == 0:
1347                continue
1348
1349            load_addr = int(words[4], 16)
1350            virt_addr = int(words[3], 16)
1351
1352            # Add section to memory use totals (for both non-XIP and XIP scenarios)
1353            # Unrecognized section names are not included in the calculations.
1354            recognized = True
1355            if name in SizeCalculator.alloc_sections:
1356                self.ram_size += size
1357                stype = "alloc"
1358            elif name in SizeCalculator.rw_sections:
1359                self.ram_size += size
1360                self.rom_size += size
1361                stype = "rw"
1362            elif name in SizeCalculator.ro_sections:
1363                self.rom_size += size
1364                if not self.is_xip:
1365                    self.ram_size += size
1366                stype = "ro"
1367            else:
1368                stype = "unknown"
1369                if name not in self.extra_sections:
1370                    recognized = False
1371
1372            self.sections.append({"name": name, "load_addr": load_addr,
1373                                  "size": size, "virt_addr": virt_addr,
1374                                  "type": stype, "recognized": recognized})
1375
1376
1377
1378class TwisterConfigParser:
1379    """Class to read test case files with semantic checking
1380    """
1381
1382    def __init__(self, filename, schema):
1383        """Instantiate a new TwisterConfigParser object
1384
1385        @param filename Source .yaml file to read
1386        """
1387        self.data = {}
1388        self.schema = schema
1389        self.filename = filename
1390        self.tests = {}
1391        self.common = {}
1392
1393    def load(self):
1394        self.data = scl.yaml_load_verify(self.filename, self.schema)
1395
1396        if 'tests' in self.data:
1397            self.tests = self.data['tests']
1398        if 'common' in self.data:
1399            self.common = self.data['common']
1400
1401    def _cast_value(self, value, typestr):
1402        if isinstance(value, str):
1403            v = value.strip()
1404        if typestr == "str":
1405            return v
1406
1407        elif typestr == "float":
1408            return float(value)
1409
1410        elif typestr == "int":
1411            return int(value)
1412
1413        elif typestr == "bool":
1414            return value
1415
1416        elif typestr.startswith("list") and isinstance(value, list):
1417            return value
1418        elif typestr.startswith("list") and isinstance(value, str):
1419            vs = v.split()
1420            if len(typestr) > 4 and typestr[4] == ":":
1421                return [self._cast_value(vsi, typestr[5:]) for vsi in vs]
1422            else:
1423                return vs
1424
1425        elif typestr.startswith("set"):
1426            vs = v.split()
1427            if len(typestr) > 3 and typestr[3] == ":":
1428                return {self._cast_value(vsi, typestr[4:]) for vsi in vs}
1429            else:
1430                return set(vs)
1431
1432        elif typestr.startswith("map"):
1433            return value
1434        else:
1435            raise ConfigurationError(
1436                self.filename, "unknown type '%s'" % value)
1437
1438    def get_test(self, name, valid_keys):
1439        """Get a dictionary representing the keys/values within a test
1440
1441        @param name The test in the .yaml file to retrieve data from
1442        @param valid_keys A dictionary representing the intended semantics
1443            for this test. Each key in this dictionary is a key that could
1444            be specified, if a key is given in the .yaml file which isn't in
1445            here, it will generate an error. Each value in this dictionary
1446            is another dictionary containing metadata:
1447
1448                "default" - Default value if not given
1449                "type" - Data type to convert the text value to. Simple types
1450                    supported are "str", "float", "int", "bool" which will get
1451                    converted to respective Python data types. "set" and "list"
1452                    may also be specified which will split the value by
1453                    whitespace (but keep the elements as strings). finally,
1454                    "list:<type>" and "set:<type>" may be given which will
1455                    perform a type conversion after splitting the value up.
1456                "required" - If true, raise an error if not defined. If false
1457                    and "default" isn't specified, a type conversion will be
1458                    done on an empty string
1459        @return A dictionary containing the test key-value pairs with
1460            type conversion and default values filled in per valid_keys
1461        """
1462
1463        d = {}
1464        for k, v in self.common.items():
1465            d[k] = v
1466
1467        for k, v in self.tests[name].items():
1468            if k in d:
1469                if isinstance(d[k], str):
1470                    # By default, we just concatenate string values of keys
1471                    # which appear both in "common" and per-test sections,
1472                    # but some keys are handled in adhoc way based on their
1473                    # semantics.
1474                    if k == "filter":
1475                        d[k] = "(%s) and (%s)" % (d[k], v)
1476                    else:
1477                        d[k] += " " + v
1478            else:
1479                d[k] = v
1480
1481        for k, kinfo in valid_keys.items():
1482            if k not in d:
1483                if "required" in kinfo:
1484                    required = kinfo["required"]
1485                else:
1486                    required = False
1487
1488                if required:
1489                    raise ConfigurationError(
1490                        self.filename,
1491                        "missing required value for '%s' in test '%s'" %
1492                        (k, name))
1493                else:
1494                    if "default" in kinfo:
1495                        default = kinfo["default"]
1496                    else:
1497                        default = self._cast_value("", kinfo["type"])
1498                    d[k] = default
1499            else:
1500                try:
1501                    d[k] = self._cast_value(d[k], kinfo["type"])
1502                except ValueError:
1503                    raise ConfigurationError(
1504                        self.filename, "bad %s value '%s' for key '%s' in name '%s'" %
1505                                       (kinfo["type"], d[k], k, name))
1506
1507        return d
1508
1509
1510class Platform:
1511    """Class representing metadata for a particular platform
1512
1513    Maps directly to BOARD when building"""
1514
1515    platform_schema = scl.yaml_load(os.path.join(ZEPHYR_BASE,
1516                                                 "scripts", "schemas", "twister", "platform-schema.yaml"))
1517
1518    def __init__(self):
1519        """Constructor.
1520
1521        """
1522
1523        self.name = ""
1524        self.twister = True
1525        # if no RAM size is specified by the board, take a default of 128K
1526        self.ram = 128
1527
1528        self.ignore_tags = []
1529        self.only_tags = []
1530        self.default = False
1531        # if no flash size is specified by the board, take a default of 512K
1532        self.flash = 512
1533        self.supported = set()
1534
1535        self.arch = ""
1536        self.type = "na"
1537        self.simulation = "na"
1538        self.supported_toolchains = []
1539        self.env = []
1540        self.env_satisfied = True
1541        self.filter_data = dict()
1542
1543    def load(self, platform_file):
1544        scp = TwisterConfigParser(platform_file, self.platform_schema)
1545        scp.load()
1546        data = scp.data
1547
1548        self.name = data['identifier']
1549        self.twister = data.get("twister", True)
1550        # if no RAM size is specified by the board, take a default of 128K
1551        self.ram = data.get("ram", 128)
1552        testing = data.get("testing", {})
1553        self.ignore_tags = testing.get("ignore_tags", [])
1554        self.only_tags = testing.get("only_tags", [])
1555        self.default = testing.get("default", False)
1556        # if no flash size is specified by the board, take a default of 512K
1557        self.flash = data.get("flash", 512)
1558        self.supported = set()
1559        for supp_feature in data.get("supported", []):
1560            for item in supp_feature.split(":"):
1561                self.supported.add(item)
1562
1563        self.arch = data['arch']
1564        self.type = data.get('type', "na")
1565        self.simulation = data.get('simulation', "na")
1566        self.supported_toolchains = data.get("toolchain", [])
1567        self.env = data.get("env", [])
1568        self.env_satisfied = True
1569        for env in self.env:
1570            if not os.environ.get(env, None):
1571                self.env_satisfied = False
1572
1573    def __repr__(self):
1574        return "<%s on %s>" % (self.name, self.arch)
1575
1576
1577class DisablePyTestCollectionMixin(object):
1578    __test__ = False
1579
1580
1581class TestCase(DisablePyTestCollectionMixin):
1582    """Class representing a test application
1583    """
1584
1585    def __init__(self, testcase_root, workdir, name):
1586        """TestCase constructor.
1587
1588        This gets called by TestSuite as it finds and reads test yaml files.
1589        Multiple TestCase instances may be generated from a single testcase.yaml,
1590        each one corresponds to an entry within that file.
1591
1592        We need to have a unique name for every single test case. Since
1593        a testcase.yaml can define multiple tests, the canonical name for
1594        the test case is <workdir>/<name>.
1595
1596        @param testcase_root os.path.abspath() of one of the --testcase-root
1597        @param workdir Sub-directory of testcase_root where the
1598            .yaml test configuration file was found
1599        @param name Name of this test case, corresponding to the entry name
1600            in the test case configuration file. For many test cases that just
1601            define one test, can be anything and is usually "test". This is
1602            really only used to distinguish between different cases when
1603            the testcase.yaml defines multiple tests
1604        """
1605
1606
1607        self.source_dir = ""
1608        self.yamlfile = ""
1609        self.cases = []
1610        self.name = self.get_unique(testcase_root, workdir, name)
1611        self.id = name
1612
1613        self.type = None
1614        self.tags = set()
1615        self.extra_args = None
1616        self.extra_configs = None
1617        self.arch_allow = None
1618        self.arch_exclude = None
1619        self.skip = False
1620        self.platform_exclude = None
1621        self.platform_allow = None
1622        self.toolchain_exclude = None
1623        self.toolchain_allow = None
1624        self.tc_filter = None
1625        self.timeout = 60
1626        self.harness = ""
1627        self.harness_config = {}
1628        self.build_only = True
1629        self.build_on_all = False
1630        self.slow = False
1631        self.min_ram = -1
1632        self.depends_on = None
1633        self.min_flash = -1
1634        self.extra_sections = None
1635        self.integration_platforms = []
1636
1637    @staticmethod
1638    def get_unique(testcase_root, workdir, name):
1639
1640        canonical_testcase_root = os.path.realpath(testcase_root)
1641        if Path(canonical_zephyr_base) in Path(canonical_testcase_root).parents:
1642            # This is in ZEPHYR_BASE, so include path in name for uniqueness
1643            # FIXME: We should not depend on path of test for unique names.
1644            relative_tc_root = os.path.relpath(canonical_testcase_root,
1645                                               start=canonical_zephyr_base)
1646        else:
1647            relative_tc_root = ""
1648
1649        # workdir can be "."
1650        unique = os.path.normpath(os.path.join(relative_tc_root, workdir, name))
1651        check = name.split(".")
1652        if len(check) < 2:
1653            raise TwisterException(f"""bad test name '{name}' in {testcase_root}/{workdir}. \
1654Tests should reference the category and subsystem with a dot as a separator.
1655                    """
1656                    )
1657        return unique
1658
1659    @staticmethod
1660    def scan_file(inf_name):
1661        suite_regex = re.compile(
1662            # do not match until end-of-line, otherwise we won't allow
1663            # stc_regex below to catch the ones that are declared in the same
1664            # line--as we only search starting the end of this match
1665            br"^\s*ztest_test_suite\(\s*(?P<suite_name>[a-zA-Z0-9_]+)\s*,",
1666            re.MULTILINE)
1667        stc_regex = re.compile(
1668            br"^\s*"  # empy space at the beginning is ok
1669            # catch the case where it is declared in the same sentence, e.g:
1670            #
1671            # ztest_test_suite(mutex_complex, ztest_user_unit_test(TESTNAME));
1672            br"(?:ztest_test_suite\([a-zA-Z0-9_]+,\s*)?"
1673            # Catch ztest[_user]_unit_test-[_setup_teardown](TESTNAME)
1674            br"ztest_(?:1cpu_)?(?:user_)?unit_test(?:_setup_teardown)?"
1675            # Consume the argument that becomes the extra testcse
1676            br"\(\s*"
1677            br"(?P<stc_name>[a-zA-Z0-9_]+)"
1678            # _setup_teardown() variant has two extra arguments that we ignore
1679            br"(?:\s*,\s*[a-zA-Z0-9_]+\s*,\s*[a-zA-Z0-9_]+)?"
1680            br"\s*\)",
1681            # We don't check how it finishes; we don't care
1682            re.MULTILINE)
1683        suite_run_regex = re.compile(
1684            br"^\s*ztest_run_test_suite\((?P<suite_name>[a-zA-Z0-9_]+)\)",
1685            re.MULTILINE)
1686        achtung_regex = re.compile(
1687            br"(#ifdef|#endif)",
1688            re.MULTILINE)
1689        warnings = None
1690
1691        with open(inf_name) as inf:
1692            if os.name == 'nt':
1693                mmap_args = {'fileno': inf.fileno(), 'length': 0, 'access': mmap.ACCESS_READ}
1694            else:
1695                mmap_args = {'fileno': inf.fileno(), 'length': 0, 'flags': mmap.MAP_PRIVATE, 'prot': mmap.PROT_READ,
1696                             'offset': 0}
1697
1698            with contextlib.closing(mmap.mmap(**mmap_args)) as main_c:
1699                suite_regex_match = suite_regex.search(main_c)
1700                if not suite_regex_match:
1701                    # can't find ztest_test_suite, maybe a client, because
1702                    # it includes ztest.h
1703                    return None, None
1704
1705                suite_run_match = suite_run_regex.search(main_c)
1706                if not suite_run_match:
1707                    raise ValueError("can't find ztest_run_test_suite")
1708
1709                achtung_matches = re.findall(
1710                    achtung_regex,
1711                    main_c[suite_regex_match.end():suite_run_match.start()])
1712                if achtung_matches:
1713                    warnings = "found invalid %s in ztest_test_suite()" \
1714                               % ", ".join(sorted({match.decode() for match in achtung_matches},reverse = True))
1715                _matches = re.findall(
1716                    stc_regex,
1717                    main_c[suite_regex_match.end():suite_run_match.start()])
1718                for match in _matches:
1719                    if not match.decode().startswith("test_"):
1720                        warnings = "Found a test that does not start with test_"
1721                matches = [match.decode().replace("test_", "", 1) for match in _matches]
1722                return matches, warnings
1723
1724    def scan_path(self, path):
1725        subcases = []
1726        for filename in glob.glob(os.path.join(path, "src", "*.c*")):
1727            try:
1728                _subcases, warnings = self.scan_file(filename)
1729                if warnings:
1730                    logger.error("%s: %s" % (filename, warnings))
1731                    raise TwisterRuntimeError("%s: %s" % (filename, warnings))
1732                if _subcases:
1733                    subcases += _subcases
1734            except ValueError as e:
1735                logger.error("%s: can't find: %s" % (filename, e))
1736
1737        for filename in glob.glob(os.path.join(path, "*.c")):
1738            try:
1739                _subcases, warnings = self.scan_file(filename)
1740                if warnings:
1741                    logger.error("%s: %s" % (filename, warnings))
1742                if _subcases:
1743                    subcases += _subcases
1744            except ValueError as e:
1745                logger.error("%s: can't find: %s" % (filename, e))
1746        return subcases
1747
1748    def parse_subcases(self, test_path):
1749        results = self.scan_path(test_path)
1750        for sub in results:
1751            name = "{}.{}".format(self.id, sub)
1752            self.cases.append(name)
1753
1754        if not results:
1755            self.cases.append(self.id)
1756
1757    def __str__(self):
1758        return self.name
1759
1760
1761class TestInstance(DisablePyTestCollectionMixin):
1762    """Class representing the execution of a particular TestCase on a platform
1763
1764    @param test The TestCase object we want to build/execute
1765    @param platform Platform object that we want to build and run against
1766    @param base_outdir Base directory for all test results. The actual
1767        out directory used is <outdir>/<platform>/<test case name>
1768    """
1769
1770    def __init__(self, testcase, platform, outdir):
1771
1772        self.testcase = testcase
1773        self.platform = platform
1774
1775        self.status = None
1776        self.reason = "Unknown"
1777        self.metrics = dict()
1778        self.handler = None
1779        self.outdir = outdir
1780
1781        self.name = os.path.join(platform.name, testcase.name)
1782        self.build_dir = os.path.join(outdir, platform.name, testcase.name)
1783
1784        self.run = False
1785
1786        self.results = {}
1787
1788    def __getstate__(self):
1789        d = self.__dict__.copy()
1790        return d
1791
1792    def __setstate__(self, d):
1793        self.__dict__.update(d)
1794
1795    def __lt__(self, other):
1796        return self.name < other.name
1797
1798
1799    @staticmethod
1800    def testcase_runnable(testcase, fixtures):
1801        can_run = False
1802        # console harness allows us to run the test and capture data.
1803        if testcase.harness in [ 'console', 'ztest', 'pytest']:
1804            can_run = True
1805            # if we have a fixture that is also being supplied on the
1806            # command-line, then we need to run the test, not just build it.
1807            fixture = testcase.harness_config.get('fixture')
1808            if fixture:
1809                can_run = (fixture in fixtures)
1810
1811        elif testcase.harness:
1812            can_run = False
1813        else:
1814            can_run = True
1815
1816        return can_run
1817
1818
1819    # Global testsuite parameters
1820    def check_runnable(self, enable_slow=False, filter='buildable', fixtures=[]):
1821
1822        # right now we only support building on windows. running is still work
1823        # in progress.
1824        if os.name == 'nt':
1825            return False
1826
1827        # we asked for build-only on the command line
1828        if self.testcase.build_only:
1829            return False
1830
1831        # Do not run slow tests:
1832        skip_slow = self.testcase.slow and not enable_slow
1833        if skip_slow:
1834            return False
1835
1836        target_ready = bool(self.testcase.type == "unit" or \
1837                        self.platform.type == "native" or \
1838                        self.platform.simulation in ["mdb-nsim", "nsim", "renode", "qemu", "tsim", "armfvp"] or \
1839                        filter == 'runnable')
1840
1841        if self.platform.simulation == "nsim":
1842            if not find_executable("nsimdrv"):
1843                target_ready = False
1844
1845        if self.platform.simulation == "mdb-nsim":
1846            if not find_executable("mdb"):
1847                target_ready = False
1848
1849        if self.platform.simulation == "renode":
1850            if not find_executable("renode"):
1851                target_ready = False
1852
1853        if self.platform.simulation == "tsim":
1854            if not find_executable("tsim-leon3"):
1855                target_ready = False
1856
1857        testcase_runnable = self.testcase_runnable(self.testcase, fixtures)
1858
1859        return testcase_runnable and target_ready
1860
1861    def create_overlay(self, platform, enable_asan=False, enable_ubsan=False, enable_coverage=False, coverage_platform=[]):
1862        # Create this in a "twister/" subdirectory otherwise this
1863        # will pass this overlay to kconfig.py *twice* and kconfig.cmake
1864        # will silently give that second time precedence over any
1865        # --extra-args=CONFIG_*
1866        subdir = os.path.join(self.build_dir, "twister")
1867
1868        content = ""
1869
1870        if self.testcase.extra_configs:
1871            content = "\n".join(self.testcase.extra_configs)
1872
1873        if enable_coverage:
1874            if platform.name in coverage_platform:
1875                content = content + "\nCONFIG_COVERAGE=y"
1876                content = content + "\nCONFIG_COVERAGE_DUMP=y"
1877
1878        if enable_asan:
1879            if platform.type == "native":
1880                content = content + "\nCONFIG_ASAN=y"
1881
1882        if enable_ubsan:
1883            if platform.type == "native":
1884                content = content + "\nCONFIG_UBSAN=y"
1885
1886        if content:
1887            os.makedirs(subdir, exist_ok=True)
1888            file = os.path.join(subdir, "testcase_extra.conf")
1889            with open(file, "w") as f:
1890                f.write(content)
1891
1892        return content
1893
1894    def calculate_sizes(self):
1895        """Get the RAM/ROM sizes of a test case.
1896
1897        This can only be run after the instance has been executed by
1898        MakeGenerator, otherwise there won't be any binaries to measure.
1899
1900        @return A SizeCalculator object
1901        """
1902        fns = glob.glob(os.path.join(self.build_dir, "zephyr", "*.elf"))
1903        fns.extend(glob.glob(os.path.join(self.build_dir, "zephyr", "*.exe")))
1904        fns = [x for x in fns if not x.endswith('_prebuilt.elf')]
1905        if len(fns) != 1:
1906            raise BuildError("Missing/multiple output ELF binary")
1907
1908        return SizeCalculator(fns[0], self.testcase.extra_sections)
1909
1910    def fill_results_by_status(self):
1911        """Fills results according to self.status
1912
1913        The method is used to propagate the instance level status
1914        to the test cases inside. Useful when the whole instance is skipped
1915        and the info is required also at the test cases level for reporting.
1916        Should be used with caution, e.g. should not be used
1917        to fill all results with passes
1918        """
1919        status_to_verdict = {
1920            'skipped': 'SKIP',
1921            'error': 'BLOCK',
1922            'failure': 'FAILED'
1923        }
1924
1925        for k in self.results:
1926            self.results[k] = status_to_verdict[self.status]
1927
1928    def __repr__(self):
1929        return "<TestCase %s on %s>" % (self.testcase.name, self.platform.name)
1930
1931
1932class CMake():
1933    config_re = re.compile('(CONFIG_[A-Za-z0-9_]+)[=]\"?([^\"]*)\"?$')
1934    dt_re = re.compile('([A-Za-z0-9_]+)[=]\"?([^\"]*)\"?$')
1935
1936    def __init__(self, testcase, platform, source_dir, build_dir):
1937
1938        self.cwd = None
1939        self.capture_output = True
1940
1941        self.defconfig = {}
1942        self.cmake_cache = {}
1943
1944        self.instance = None
1945        self.testcase = testcase
1946        self.platform = platform
1947        self.source_dir = source_dir
1948        self.build_dir = build_dir
1949        self.log = "build.log"
1950        self.generator = None
1951        self.generator_cmd = None
1952
1953    def parse_generated(self):
1954        self.defconfig = {}
1955        return {}
1956
1957    def run_build(self, args=[]):
1958
1959        logger.debug("Building %s for %s" % (self.source_dir, self.platform.name))
1960
1961        cmake_args = []
1962        cmake_args.extend(args)
1963        cmake = shutil.which('cmake')
1964        cmd = [cmake] + cmake_args
1965        kwargs = dict()
1966
1967        if self.capture_output:
1968            kwargs['stdout'] = subprocess.PIPE
1969            # CMake sends the output of message() to stderr unless it's STATUS
1970            kwargs['stderr'] = subprocess.STDOUT
1971
1972        if self.cwd:
1973            kwargs['cwd'] = self.cwd
1974
1975        p = subprocess.Popen(cmd, **kwargs)
1976        out, _ = p.communicate()
1977
1978        results = {}
1979        if p.returncode == 0:
1980            msg = "Finished building %s for %s" % (self.source_dir, self.platform.name)
1981
1982            self.instance.status = "passed"
1983            results = {'msg': msg, "returncode": p.returncode, "instance": self.instance}
1984
1985            if out:
1986                log_msg = out.decode(sys.getdefaultencoding())
1987                with open(os.path.join(self.build_dir, self.log), "a") as log:
1988                    log.write(log_msg)
1989
1990            else:
1991                return None
1992        else:
1993            # A real error occurred, raise an exception
1994            log_msg = ""
1995            if out:
1996                log_msg = out.decode(sys.getdefaultencoding())
1997                with open(os.path.join(self.build_dir, self.log), "a") as log:
1998                    log.write(log_msg)
1999
2000            if log_msg:
2001                res = re.findall("region `(FLASH|ROM|RAM|ICCM|DCCM|SRAM)' overflowed by", log_msg)
2002                if res and not self.overflow_as_errors:
2003                    logger.debug("Test skipped due to {} Overflow".format(res[0]))
2004                    self.instance.status = "skipped"
2005                    self.instance.reason = "{} overflow".format(res[0])
2006                else:
2007                    self.instance.status = "error"
2008                    self.instance.reason = "Build failure"
2009
2010            results = {
2011                "returncode": p.returncode,
2012                "instance": self.instance,
2013            }
2014
2015        return results
2016
2017    def run_cmake(self, args=[]):
2018
2019        if self.warnings_as_errors:
2020            warnings_as_errors = 'y'
2021            gen_defines_args = "--edtlib-Werror"
2022        else:
2023            warnings_as_errors = 'n'
2024            gen_defines_args = ""
2025
2026        logger.debug("Running cmake on %s for %s" % (self.source_dir, self.platform.name))
2027        cmake_args = [
2028            f'-B{self.build_dir}',
2029            f'-S{self.source_dir}',
2030            f'-DCONFIG_COMPILER_WARNINGS_AS_ERRORS={warnings_as_errors}',
2031            f'-DEXTRA_GEN_DEFINES_ARGS={gen_defines_args}',
2032            f'-G{self.generator}'
2033        ]
2034
2035        args = ["-D{}".format(a.replace('"', '')) for a in args]
2036        cmake_args.extend(args)
2037
2038        cmake_opts = ['-DBOARD={}'.format(self.platform.name)]
2039        cmake_args.extend(cmake_opts)
2040
2041
2042        logger.debug("Calling cmake with arguments: {}".format(cmake_args))
2043        cmake = shutil.which('cmake')
2044        cmd = [cmake] + cmake_args
2045        kwargs = dict()
2046
2047        if self.capture_output:
2048            kwargs['stdout'] = subprocess.PIPE
2049            # CMake sends the output of message() to stderr unless it's STATUS
2050            kwargs['stderr'] = subprocess.STDOUT
2051
2052        if self.cwd:
2053            kwargs['cwd'] = self.cwd
2054
2055        p = subprocess.Popen(cmd, **kwargs)
2056        out, _ = p.communicate()
2057
2058        if p.returncode == 0:
2059            filter_results = self.parse_generated()
2060            msg = "Finished building %s for %s" % (self.source_dir, self.platform.name)
2061            logger.debug(msg)
2062            results = {'msg': msg, 'filter': filter_results}
2063
2064        else:
2065            self.instance.status = "error"
2066            self.instance.reason = "Cmake build failure"
2067            self.instance.fill_results_by_status()
2068            logger.error("Cmake build failure: %s for %s" % (self.source_dir, self.platform.name))
2069            results = {"returncode": p.returncode}
2070
2071        if out:
2072            with open(os.path.join(self.build_dir, self.log), "a") as log:
2073                log_msg = out.decode(sys.getdefaultencoding())
2074                log.write(log_msg)
2075
2076        return results
2077
2078    @staticmethod
2079    def run_cmake_script(args=[]):
2080
2081        logger.debug("Running cmake script %s" % (args[0]))
2082
2083        cmake_args = ["-D{}".format(a.replace('"', '')) for a in args[1:]]
2084        cmake_args.extend(['-P', args[0]])
2085
2086        logger.debug("Calling cmake with arguments: {}".format(cmake_args))
2087        cmake = shutil.which('cmake')
2088        if not cmake:
2089            msg = "Unable to find `cmake` in path"
2090            logger.error(msg)
2091            raise Exception(msg)
2092        cmd = [cmake] + cmake_args
2093
2094        kwargs = dict()
2095        kwargs['stdout'] = subprocess.PIPE
2096        # CMake sends the output of message() to stderr unless it's STATUS
2097        kwargs['stderr'] = subprocess.STDOUT
2098
2099        p = subprocess.Popen(cmd, **kwargs)
2100        out, _ = p.communicate()
2101
2102        # It might happen that the environment adds ANSI escape codes like \x1b[0m,
2103        # for instance if twister is executed from inside a makefile. In such a
2104        # scenario it is then necessary to remove them, as otherwise the JSON decoding
2105        # will fail.
2106        ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
2107        out = ansi_escape.sub('', out.decode())
2108
2109        if p.returncode == 0:
2110            msg = "Finished running  %s" % (args[0])
2111            logger.debug(msg)
2112            results = {"returncode": p.returncode, "msg": msg, "stdout": out}
2113
2114        else:
2115            logger.error("Cmake script failure: %s" % (args[0]))
2116            results = {"returncode": p.returncode, "returnmsg": out}
2117
2118        return results
2119
2120
2121class FilterBuilder(CMake):
2122
2123    def __init__(self, testcase, platform, source_dir, build_dir):
2124        super().__init__(testcase, platform, source_dir, build_dir)
2125
2126        self.log = "config-twister.log"
2127
2128    def parse_generated(self):
2129
2130        if self.platform.name == "unit_testing":
2131            return {}
2132
2133        cmake_cache_path = os.path.join(self.build_dir, "CMakeCache.txt")
2134        defconfig_path = os.path.join(self.build_dir, "zephyr", ".config")
2135
2136        with open(defconfig_path, "r") as fp:
2137            defconfig = {}
2138            for line in fp.readlines():
2139                m = self.config_re.match(line)
2140                if not m:
2141                    if line.strip() and not line.startswith("#"):
2142                        sys.stderr.write("Unrecognized line %s\n" % line)
2143                    continue
2144                defconfig[m.group(1)] = m.group(2).strip()
2145
2146        self.defconfig = defconfig
2147
2148        cmake_conf = {}
2149        try:
2150            cache = CMakeCache.from_file(cmake_cache_path)
2151        except FileNotFoundError:
2152            cache = {}
2153
2154        for k in iter(cache):
2155            cmake_conf[k.name] = k.value
2156
2157        self.cmake_cache = cmake_conf
2158
2159        filter_data = {
2160            "ARCH": self.platform.arch,
2161            "PLATFORM": self.platform.name
2162        }
2163        filter_data.update(os.environ)
2164        filter_data.update(self.defconfig)
2165        filter_data.update(self.cmake_cache)
2166
2167        edt_pickle = os.path.join(self.build_dir, "zephyr", "edt.pickle")
2168        if self.testcase and self.testcase.tc_filter:
2169            try:
2170                if os.path.exists(edt_pickle):
2171                    with open(edt_pickle, 'rb') as f:
2172                        edt = pickle.load(f)
2173                else:
2174                    edt = None
2175                res = expr_parser.parse(self.testcase.tc_filter, filter_data, edt)
2176
2177            except (ValueError, SyntaxError) as se:
2178                sys.stderr.write(
2179                    "Failed processing %s\n" % self.testcase.yamlfile)
2180                raise se
2181
2182            if not res:
2183                return {os.path.join(self.platform.name, self.testcase.name): True}
2184            else:
2185                return {os.path.join(self.platform.name, self.testcase.name): False}
2186        else:
2187            self.platform.filter_data = filter_data
2188            return filter_data
2189
2190
2191class ProjectBuilder(FilterBuilder):
2192
2193    def __init__(self, suite, instance, **kwargs):
2194        super().__init__(instance.testcase, instance.platform, instance.testcase.source_dir, instance.build_dir)
2195
2196        self.log = "build.log"
2197        self.instance = instance
2198        self.suite = suite
2199        self.filtered_tests = 0
2200
2201        self.lsan = kwargs.get('lsan', False)
2202        self.asan = kwargs.get('asan', False)
2203        self.ubsan = kwargs.get('ubsan', False)
2204        self.valgrind = kwargs.get('valgrind', False)
2205        self.extra_args = kwargs.get('extra_args', [])
2206        self.device_testing = kwargs.get('device_testing', False)
2207        self.cmake_only = kwargs.get('cmake_only', False)
2208        self.cleanup = kwargs.get('cleanup', False)
2209        self.coverage = kwargs.get('coverage', False)
2210        self.inline_logs = kwargs.get('inline_logs', False)
2211        self.generator = kwargs.get('generator', None)
2212        self.generator_cmd = kwargs.get('generator_cmd', None)
2213        self.verbose = kwargs.get('verbose', None)
2214        self.warnings_as_errors = kwargs.get('warnings_as_errors', True)
2215        self.overflow_as_errors = kwargs.get('overflow_as_errors', False)
2216
2217    @staticmethod
2218    def log_info(filename, inline_logs):
2219        filename = os.path.abspath(os.path.realpath(filename))
2220        if inline_logs:
2221            logger.info("{:-^100}".format(filename))
2222
2223            try:
2224                with open(filename) as fp:
2225                    data = fp.read()
2226            except Exception as e:
2227                data = "Unable to read log data (%s)\n" % (str(e))
2228
2229            logger.error(data)
2230
2231            logger.info("{:-^100}".format(filename))
2232        else:
2233            logger.error("see: " + Fore.YELLOW + filename + Fore.RESET)
2234
2235    def log_info_file(self, inline_logs):
2236        build_dir = self.instance.build_dir
2237        h_log = "{}/handler.log".format(build_dir)
2238        b_log = "{}/build.log".format(build_dir)
2239        v_log = "{}/valgrind.log".format(build_dir)
2240        d_log = "{}/device.log".format(build_dir)
2241
2242        if os.path.exists(v_log) and "Valgrind" in self.instance.reason:
2243            self.log_info("{}".format(v_log), inline_logs)
2244        elif os.path.exists(h_log) and os.path.getsize(h_log) > 0:
2245            self.log_info("{}".format(h_log), inline_logs)
2246        elif os.path.exists(d_log) and os.path.getsize(d_log) > 0:
2247            self.log_info("{}".format(d_log), inline_logs)
2248        else:
2249            self.log_info("{}".format(b_log), inline_logs)
2250
2251    def setup_handler(self):
2252
2253        instance = self.instance
2254        args = []
2255
2256        # FIXME: Needs simplification
2257        if instance.platform.simulation == "qemu":
2258            instance.handler = QEMUHandler(instance, "qemu")
2259            args.append("QEMU_PIPE=%s" % instance.handler.get_fifo())
2260            instance.handler.call_make_run = True
2261        elif instance.testcase.type == "unit":
2262            instance.handler = BinaryHandler(instance, "unit")
2263            instance.handler.binary = os.path.join(instance.build_dir, "testbinary")
2264            if self.coverage:
2265                args.append("COVERAGE=1")
2266        elif instance.platform.type == "native":
2267            handler = BinaryHandler(instance, "native")
2268
2269            handler.asan = self.asan
2270            handler.valgrind = self.valgrind
2271            handler.lsan = self.lsan
2272            handler.ubsan = self.ubsan
2273            handler.coverage = self.coverage
2274
2275            handler.binary = os.path.join(instance.build_dir, "zephyr", "zephyr.exe")
2276            instance.handler = handler
2277        elif instance.platform.simulation == "renode":
2278            if find_executable("renode"):
2279                instance.handler = BinaryHandler(instance, "renode")
2280                instance.handler.pid_fn = os.path.join(instance.build_dir, "renode.pid")
2281                instance.handler.call_make_run = True
2282        elif instance.platform.simulation == "tsim":
2283            instance.handler = BinaryHandler(instance, "tsim")
2284            instance.handler.call_make_run = True
2285        elif self.device_testing:
2286            instance.handler = DeviceHandler(instance, "device")
2287            instance.handler.coverage = self.coverage
2288        elif instance.platform.simulation == "nsim":
2289            if find_executable("nsimdrv"):
2290                instance.handler = BinaryHandler(instance, "nsim")
2291                instance.handler.call_make_run = True
2292        elif instance.platform.simulation == "mdb-nsim":
2293            if find_executable("mdb"):
2294                instance.handler = BinaryHandler(instance, "nsim")
2295                instance.handler.call_make_run = True
2296        elif instance.platform.simulation == "armfvp":
2297            instance.handler = BinaryHandler(instance, "armfvp")
2298            instance.handler.call_make_run = True
2299
2300        if instance.handler:
2301            instance.handler.args = args
2302            instance.handler.generator_cmd = self.generator_cmd
2303            instance.handler.generator = self.generator
2304
2305    def process(self, pipeline, done, message, lock, results):
2306        op = message.get('op')
2307
2308        if not self.instance.handler:
2309            self.setup_handler()
2310
2311        # The build process, call cmake and build with configured generator
2312        if op == "cmake":
2313            res = self.cmake()
2314            if self.instance.status in ["failed", "error"]:
2315                pipeline.put({"op": "report", "test": self.instance})
2316            elif self.cmake_only:
2317                if self.instance.status is None:
2318                    self.instance.status = "passed"
2319                pipeline.put({"op": "report", "test": self.instance})
2320            else:
2321                if self.instance.name in res['filter'] and res['filter'][self.instance.name]:
2322                    logger.debug("filtering %s" % self.instance.name)
2323                    self.instance.status = "skipped"
2324                    self.instance.reason = "filter"
2325                    results.skipped_runtime += 1
2326                    for case in self.instance.testcase.cases:
2327                        self.instance.results.update({case: 'SKIP'})
2328                    pipeline.put({"op": "report", "test": self.instance})
2329                else:
2330                    pipeline.put({"op": "build", "test": self.instance})
2331
2332        elif op == "build":
2333            logger.debug("build test: %s" % self.instance.name)
2334            res = self.build()
2335
2336            if not res:
2337                self.instance.status = "error"
2338                self.instance.reason = "Build Failure"
2339                pipeline.put({"op": "report", "test": self.instance})
2340            else:
2341                # Count skipped cases during build, for example
2342                # due to ram/rom overflow.
2343                inst = res.get("instance", None)
2344                if inst and inst.status == "skipped":
2345                    results.skipped_runtime += 1
2346
2347                if res.get('returncode', 1) > 0:
2348                    pipeline.put({"op": "report", "test": self.instance})
2349                else:
2350                    if self.instance.run and self.instance.handler:
2351                        pipeline.put({"op": "run", "test": self.instance})
2352                    else:
2353                        pipeline.put({"op": "report", "test": self.instance})
2354        # Run the generated binary using one of the supported handlers
2355        elif op == "run":
2356            logger.debug("run test: %s" % self.instance.name)
2357            self.run()
2358            self.instance.status, _ = self.instance.handler.get_state()
2359            logger.debug(f"run status: {self.instance.name} {self.instance.status}")
2360
2361            # to make it work with pickle
2362            self.instance.handler.thread = None
2363            self.instance.handler.suite = None
2364            pipeline.put({
2365                "op": "report",
2366                "test": self.instance,
2367                "status": self.instance.status,
2368                "reason": self.instance.reason
2369                }
2370            )
2371
2372        # Report results and output progress to screen
2373        elif op == "report":
2374            with lock:
2375                done.put(self.instance)
2376                self.report_out(results)
2377
2378            if self.cleanup and not self.coverage and self.instance.status == "passed":
2379                pipeline.put({
2380                    "op": "cleanup",
2381                    "test": self.instance
2382                })
2383
2384        elif op == "cleanup":
2385            if self.device_testing:
2386                self.cleanup_device_testing_artifacts()
2387            else:
2388                self.cleanup_artifacts()
2389
2390    def cleanup_artifacts(self, additional_keep=[]):
2391        logger.debug("Cleaning up {}".format(self.instance.build_dir))
2392        allow = [
2393            'zephyr/.config',
2394            'handler.log',
2395            'build.log',
2396            'device.log',
2397            'recording.csv',
2398            ]
2399
2400        allow += additional_keep
2401
2402        allow = [os.path.join(self.instance.build_dir, file) for file in allow]
2403
2404        for dirpath, dirnames, filenames in os.walk(self.instance.build_dir, topdown=False):
2405            for name in filenames:
2406                path = os.path.join(dirpath, name)
2407                if path not in allow:
2408                    os.remove(path)
2409            # Remove empty directories and symbolic links to directories
2410            for dir in dirnames:
2411                path = os.path.join(dirpath, dir)
2412                if os.path.islink(path):
2413                    os.remove(path)
2414                elif not os.listdir(path):
2415                    os.rmdir(path)
2416
2417    def cleanup_device_testing_artifacts(self):
2418        logger.debug("Cleaning up for Device Testing {}".format(self.instance.build_dir))
2419
2420        sanitizelist = [
2421            'CMakeCache.txt',
2422            'zephyr/runners.yaml',
2423        ]
2424        keep = [
2425            'zephyr/zephyr.hex',
2426            'zephyr/zephyr.bin',
2427            'zephyr/zephyr.elf',
2428            ]
2429
2430        keep += sanitizelist
2431
2432        self.cleanup_artifacts(keep)
2433
2434        # sanitize paths so files are relocatable
2435        for file in sanitizelist:
2436            file = os.path.join(self.instance.build_dir, file)
2437
2438            with open(file, "rt") as fin:
2439                data = fin.read()
2440                data = data.replace(canonical_zephyr_base+"/", "")
2441
2442            with open(file, "wt") as fin:
2443                fin.write(data)
2444
2445    def report_out(self, results):
2446        total_to_do = results.total - results.skipped_configs
2447        total_tests_width = len(str(total_to_do))
2448        results.done += 1
2449        instance = self.instance
2450
2451        if instance.status in ["error", "failed", "timeout", "flash_error"]:
2452            if instance.status == "error":
2453                results.error += 1
2454            results.failed += 1
2455            if self.verbose:
2456                status = Fore.RED + "FAILED " + Fore.RESET + instance.reason
2457            else:
2458                print("")
2459                logger.error(
2460                    "{:<25} {:<50} {}FAILED{}: {}".format(
2461                        instance.platform.name,
2462                        instance.testcase.name,
2463                        Fore.RED,
2464                        Fore.RESET,
2465                        instance.reason))
2466            if not self.verbose:
2467                self.log_info_file(self.inline_logs)
2468        elif instance.status == "skipped":
2469            status = Fore.YELLOW + "SKIPPED" + Fore.RESET
2470        elif instance.status == "passed":
2471            status = Fore.GREEN + "PASSED" + Fore.RESET
2472        else:
2473            logger.debug(f"Unknown status = {instance.status}")
2474            status = Fore.YELLOW + "UNKNOWN" + Fore.RESET
2475
2476        if self.verbose:
2477            if self.cmake_only:
2478                more_info = "cmake"
2479            elif instance.status == "skipped":
2480                more_info = instance.reason
2481            else:
2482                if instance.handler and instance.run:
2483                    more_info = instance.handler.type_str
2484                    htime = instance.handler.duration
2485                    if htime:
2486                        more_info += " {:.3f}s".format(htime)
2487                else:
2488                    more_info = "build"
2489
2490            logger.info("{:>{}}/{} {:<25} {:<50} {} ({})".format(
2491                results.done, total_tests_width, total_to_do, instance.platform.name,
2492                instance.testcase.name, status, more_info))
2493
2494            if instance.status in ["error", "failed", "timeout"]:
2495                self.log_info_file(self.inline_logs)
2496        else:
2497            completed_perc = 0
2498            if total_to_do > 0:
2499                completed_perc = int((float(results.done) / total_to_do) * 100)
2500
2501            skipped = results.skipped_configs + results.skipped_runtime
2502            sys.stdout.write("\rINFO    - Total complete: %s%4d/%4d%s  %2d%%  skipped: %s%4d%s, failed: %s%4d%s" % (
2503                Fore.GREEN,
2504                results.done,
2505                total_to_do,
2506                Fore.RESET,
2507                completed_perc,
2508                Fore.YELLOW if skipped > 0 else Fore.RESET,
2509                skipped,
2510                Fore.RESET,
2511                Fore.RED if results.failed > 0 else Fore.RESET,
2512                results.failed,
2513                Fore.RESET
2514            )
2515                             )
2516        sys.stdout.flush()
2517
2518    def cmake(self):
2519
2520        instance = self.instance
2521        args = self.testcase.extra_args[:]
2522        args += self.extra_args
2523
2524        if instance.handler:
2525            args += instance.handler.args
2526
2527        # merge overlay files into one variable
2528        def extract_overlays(args):
2529            re_overlay = re.compile('OVERLAY_CONFIG=(.*)')
2530            other_args = []
2531            overlays = []
2532            for arg in args:
2533                match = re_overlay.search(arg)
2534                if match:
2535                    overlays.append(match.group(1).strip('\'"'))
2536                else:
2537                    other_args.append(arg)
2538
2539            args[:] = other_args
2540            return overlays
2541
2542        overlays = extract_overlays(args)
2543
2544        if os.path.exists(os.path.join(instance.build_dir,
2545                                       "twister", "testcase_extra.conf")):
2546            overlays.append(os.path.join(instance.build_dir,
2547                                         "twister", "testcase_extra.conf"))
2548
2549        if overlays:
2550            args.append("OVERLAY_CONFIG=\"%s\"" % (" ".join(overlays)))
2551
2552        res = self.run_cmake(args)
2553        return res
2554
2555    def build(self):
2556        res = self.run_build(['--build', self.build_dir])
2557        return res
2558
2559    def run(self):
2560
2561        instance = self.instance
2562
2563        if instance.handler:
2564            if instance.handler.type_str == "device":
2565                instance.handler.suite = self.suite
2566
2567            instance.handler.handle()
2568
2569        sys.stdout.flush()
2570
2571class TestSuite(DisablePyTestCollectionMixin):
2572    config_re = re.compile('(CONFIG_[A-Za-z0-9_]+)[=]\"?([^\"]*)\"?$')
2573    dt_re = re.compile('([A-Za-z0-9_]+)[=]\"?([^\"]*)\"?$')
2574
2575    tc_schema = scl.yaml_load(
2576        os.path.join(ZEPHYR_BASE,
2577                     "scripts", "schemas", "twister", "testcase-schema.yaml"))
2578    quarantine_schema = scl.yaml_load(
2579        os.path.join(ZEPHYR_BASE,
2580                     "scripts", "schemas", "twister", "quarantine-schema.yaml"))
2581
2582    testcase_valid_keys = {"tags": {"type": "set", "required": False},
2583                       "type": {"type": "str", "default": "integration"},
2584                       "extra_args": {"type": "list"},
2585                       "extra_configs": {"type": "list"},
2586                       "build_only": {"type": "bool", "default": False},
2587                       "build_on_all": {"type": "bool", "default": False},
2588                       "skip": {"type": "bool", "default": False},
2589                       "slow": {"type": "bool", "default": False},
2590                       "timeout": {"type": "int", "default": 60},
2591                       "min_ram": {"type": "int", "default": 8},
2592                       "depends_on": {"type": "set"},
2593                       "min_flash": {"type": "int", "default": 32},
2594                       "arch_allow": {"type": "set"},
2595                       "arch_exclude": {"type": "set"},
2596                       "extra_sections": {"type": "list", "default": []},
2597                       "integration_platforms": {"type": "list", "default": []},
2598                       "platform_exclude": {"type": "set"},
2599                       "platform_allow": {"type": "set"},
2600                       "toolchain_exclude": {"type": "set"},
2601                       "toolchain_allow": {"type": "set"},
2602                       "filter": {"type": "str"},
2603                       "harness": {"type": "str"},
2604                       "harness_config": {"type": "map", "default": {}}
2605                       }
2606
2607    RELEASE_DATA = os.path.join(ZEPHYR_BASE, "scripts", "release",
2608                            "twister_last_release.csv")
2609
2610    SAMPLE_FILENAME = 'sample.yaml'
2611    TESTCASE_FILENAME = 'testcase.yaml'
2612
2613    def __init__(self, board_root_list=[], testcase_roots=[], outdir=None):
2614
2615        self.roots = testcase_roots
2616        if not isinstance(board_root_list, list):
2617            self.board_roots = [board_root_list]
2618        else:
2619            self.board_roots = board_root_list
2620
2621        # Testsuite Options
2622        self.coverage_platform = []
2623        self.build_only = False
2624        self.cmake_only = False
2625        self.cleanup = False
2626        self.enable_slow = False
2627        self.device_testing = False
2628        self.fixtures = []
2629        self.enable_coverage = False
2630        self.enable_ubsan = False
2631        self.enable_lsan = False
2632        self.enable_asan = False
2633        self.enable_valgrind = False
2634        self.extra_args = []
2635        self.inline_logs = False
2636        self.enable_sizes_report = False
2637        self.west_flash = None
2638        self.west_runner = None
2639        self.generator = None
2640        self.generator_cmd = None
2641        self.warnings_as_errors = True
2642        self.overflow_as_errors = False
2643        self.quarantine_verify = False
2644
2645        # Keep track of which test cases we've filtered out and why
2646        self.testcases = {}
2647        self.quarantine = {}
2648        self.platforms = []
2649        self.selected_platforms = []
2650        self.filtered_platforms = []
2651        self.default_platforms = []
2652        self.outdir = os.path.abspath(outdir)
2653        self.discards = {}
2654        self.load_errors = 0
2655        self.instances = dict()
2656
2657        self.total_platforms = 0
2658        self.start_time = 0
2659        self.duration = 0
2660        self.warnings = 0
2661
2662        # hardcoded for now
2663        self.duts = []
2664
2665        # run integration tests only
2666        self.integration = False
2667
2668        self.pipeline = None
2669        self.version = "NA"
2670
2671    def check_zephyr_version(self):
2672        try:
2673            subproc = subprocess.run(["git", "describe", "--abbrev=12"],
2674                                     stdout=subprocess.PIPE,
2675                                     universal_newlines=True,
2676                                     cwd=ZEPHYR_BASE)
2677            if subproc.returncode == 0:
2678                self.version = subproc.stdout.strip()
2679                logger.info(f"Zephyr version: {self.version}")
2680        except OSError:
2681            logger.info("Cannot read zephyr version.")
2682
2683    def get_platform_instances(self, platform):
2684        filtered_dict = {k:v for k,v in self.instances.items() if k.startswith(platform + "/")}
2685        return filtered_dict
2686
2687    def config(self):
2688        logger.info("coverage platform: {}".format(self.coverage_platform))
2689
2690    # Debug Functions
2691    @staticmethod
2692    def info(what):
2693        sys.stdout.write(what + "\n")
2694        sys.stdout.flush()
2695
2696    def update_counting(self, results=None, initial=False):
2697        results.skipped_configs = 0
2698        results.skipped_cases = 0
2699        for instance in self.instances.values():
2700            if initial:
2701                results.cases += len(instance.testcase.cases)
2702            if instance.status == 'skipped':
2703                results.skipped_configs += 1
2704                results.skipped_cases += len(instance.testcase.cases)
2705            elif instance.status == "passed":
2706                results.passed += 1
2707                for res in instance.results.values():
2708                    if res == 'SKIP':
2709                        results.skipped_cases += 1
2710
2711    def compare_metrics(self, filename):
2712        # name, datatype, lower results better
2713        interesting_metrics = [("ram_size", int, True),
2714                               ("rom_size", int, True)]
2715
2716        if not os.path.exists(filename):
2717            logger.error("Cannot compare metrics, %s not found" % filename)
2718            return []
2719
2720        results = []
2721        saved_metrics = {}
2722        with open(filename) as fp:
2723            cr = csv.DictReader(fp)
2724            for row in cr:
2725                d = {}
2726                for m, _, _ in interesting_metrics:
2727                    d[m] = row[m]
2728                saved_metrics[(row["test"], row["platform"])] = d
2729
2730        for instance in self.instances.values():
2731            mkey = (instance.testcase.name, instance.platform.name)
2732            if mkey not in saved_metrics:
2733                continue
2734            sm = saved_metrics[mkey]
2735            for metric, mtype, lower_better in interesting_metrics:
2736                if metric not in instance.metrics:
2737                    continue
2738                if sm[metric] == "":
2739                    continue
2740                delta = instance.metrics.get(metric, 0) - mtype(sm[metric])
2741                if delta == 0:
2742                    continue
2743                results.append((instance, metric, instance.metrics.get(metric, 0), delta,
2744                                lower_better))
2745        return results
2746
2747    def footprint_reports(self, report, show_footprint, all_deltas,
2748                          footprint_threshold, last_metrics):
2749        if not report:
2750            return
2751
2752        logger.debug("running footprint_reports")
2753        deltas = self.compare_metrics(report)
2754        warnings = 0
2755        if deltas and show_footprint:
2756            for i, metric, value, delta, lower_better in deltas:
2757                if not all_deltas and ((delta < 0 and lower_better) or
2758                                       (delta > 0 and not lower_better)):
2759                    continue
2760
2761                percentage = 0
2762                if value > delta:
2763                    percentage = (float(delta) / float(value - delta))
2764
2765                if not all_deltas and (percentage < (footprint_threshold / 100.0)):
2766                    continue
2767
2768                logger.info("{:<25} {:<60} {}{}{}: {} {:<+4}, is now {:6} {:+.2%}".format(
2769                    i.platform.name, i.testcase.name, Fore.YELLOW,
2770                    "INFO" if all_deltas else "WARNING", Fore.RESET,
2771                    metric, delta, value, percentage))
2772                warnings += 1
2773
2774        if warnings:
2775            logger.warning("Deltas based on metrics from last %s" %
2776                           ("release" if not last_metrics else "run"))
2777
2778    def summary(self, results, unrecognized_sections):
2779        failed = 0
2780        run = 0
2781        for instance in self.instances.values():
2782            if instance.status == "failed":
2783                failed += 1
2784            elif instance.metrics.get("unrecognized") and not unrecognized_sections:
2785                logger.error("%sFAILED%s: %s has unrecognized binary sections: %s" %
2786                             (Fore.RED, Fore.RESET, instance.name,
2787                              str(instance.metrics.get("unrecognized", []))))
2788                failed += 1
2789
2790            if instance.metrics.get('handler_time', None):
2791                run += 1
2792
2793        if results.total and results.total != results.skipped_configs:
2794            pass_rate = (float(results.passed) / float(results.total - results.skipped_configs))
2795        else:
2796            pass_rate = 0
2797
2798        logger.info(
2799            "{}{} of {}{} test configurations passed ({:.2%}), {}{}{} failed, {} skipped with {}{}{} warnings in {:.2f} seconds".format(
2800                Fore.RED if failed else Fore.GREEN,
2801                results.passed,
2802                results.total - results.skipped_configs,
2803                Fore.RESET,
2804                pass_rate,
2805                Fore.RED if results.failed else Fore.RESET,
2806                results.failed,
2807                Fore.RESET,
2808                results.skipped_configs,
2809                Fore.YELLOW if self.warnings else Fore.RESET,
2810                self.warnings,
2811                Fore.RESET,
2812                self.duration))
2813
2814        self.total_platforms = len(self.platforms)
2815        # if we are only building, do not report about tests being executed.
2816        if self.platforms and not self.build_only:
2817            logger.info("In total {} test cases were executed, {} skipped on {} out of total {} platforms ({:02.2f}%)".format(
2818                results.cases - results.skipped_cases,
2819                results.skipped_cases,
2820                len(self.filtered_platforms),
2821                self.total_platforms,
2822                (100 * len(self.filtered_platforms) / len(self.platforms))
2823            ))
2824
2825        logger.info(f"{Fore.GREEN}{run}{Fore.RESET} test configurations executed on platforms, \
2826{Fore.RED}{results.total - run - results.skipped_configs}{Fore.RESET} test configurations were only built.")
2827
2828    def save_reports(self, name, suffix, report_dir, no_update, release, only_failed, platform_reports, json_report):
2829        if not self.instances:
2830            return
2831
2832        logger.info("Saving reports...")
2833        if name:
2834            report_name = name
2835        else:
2836            report_name = "twister"
2837
2838        if report_dir:
2839            os.makedirs(report_dir, exist_ok=True)
2840            filename = os.path.join(report_dir, report_name)
2841            outdir = report_dir
2842        else:
2843            filename = os.path.join(self.outdir, report_name)
2844            outdir = self.outdir
2845
2846        if suffix:
2847            filename = "{}_{}".format(filename, suffix)
2848
2849        if not no_update:
2850            self.xunit_report(filename + ".xml", full_report=False,
2851                              append=only_failed, version=self.version)
2852            self.xunit_report(filename + "_report.xml", full_report=True,
2853                              append=only_failed, version=self.version)
2854            self.csv_report(filename + ".csv")
2855
2856            if json_report:
2857                self.json_report(filename + ".json", append=only_failed, version=self.version)
2858
2859            if platform_reports:
2860                self.target_report(outdir, suffix, append=only_failed)
2861            if self.discards:
2862                self.discard_report(filename + "_discard.csv")
2863
2864        if release:
2865            self.csv_report(self.RELEASE_DATA)
2866
2867    def add_configurations(self):
2868
2869        for board_root in self.board_roots:
2870            board_root = os.path.abspath(board_root)
2871
2872            logger.debug("Reading platform configuration files under %s..." %
2873                         board_root)
2874
2875            for file in glob.glob(os.path.join(board_root, "*", "*", "*.yaml")):
2876                try:
2877                    platform = Platform()
2878                    platform.load(file)
2879                    if platform.name in [p.name for p in self.platforms]:
2880                        logger.error(f"Duplicate platform {platform.name} in {file}")
2881                        raise Exception(f"Duplicate platform identifier {platform.name} found")
2882                    if platform.twister:
2883                        self.platforms.append(platform)
2884                        if platform.default:
2885                            self.default_platforms.append(platform.name)
2886
2887                except RuntimeError as e:
2888                    logger.error("E: %s: can't load: %s" % (file, e))
2889                    self.load_errors += 1
2890
2891    def get_all_tests(self):
2892        tests = []
2893        for _, tc in self.testcases.items():
2894            for case in tc.cases:
2895                tests.append(case)
2896
2897        return tests
2898
2899    @staticmethod
2900    def get_toolchain():
2901        toolchain_script = Path(ZEPHYR_BASE) / Path('cmake/verify-toolchain.cmake')
2902        result = CMake.run_cmake_script([toolchain_script, "FORMAT=json"])
2903
2904        try:
2905            if result['returncode']:
2906                raise TwisterRuntimeError(f"E: {result['returnmsg']}")
2907        except Exception as e:
2908            print(str(e))
2909            sys.exit(2)
2910        toolchain = json.loads(result['stdout'])['ZEPHYR_TOOLCHAIN_VARIANT']
2911        logger.info(f"Using '{toolchain}' toolchain.")
2912
2913        return toolchain
2914
2915    def add_testcases(self, testcase_filter=[]):
2916        for root in self.roots:
2917            root = os.path.abspath(root)
2918
2919            logger.debug("Reading test case configuration files under %s..." % root)
2920
2921            for dirpath, _, filenames in os.walk(root, topdown=True):
2922                if self.SAMPLE_FILENAME in filenames:
2923                    filename = self.SAMPLE_FILENAME
2924                elif self.TESTCASE_FILENAME in filenames:
2925                    filename = self.TESTCASE_FILENAME
2926                else:
2927                    continue
2928
2929                logger.debug("Found possible test case in " + dirpath)
2930
2931                tc_path = os.path.join(dirpath, filename)
2932
2933                try:
2934                    parsed_data = TwisterConfigParser(tc_path, self.tc_schema)
2935                    parsed_data.load()
2936
2937                    tc_path = os.path.dirname(tc_path)
2938                    workdir = os.path.relpath(tc_path, root)
2939
2940                    for name in parsed_data.tests.keys():
2941                        tc = TestCase(root, workdir, name)
2942
2943                        tc_dict = parsed_data.get_test(name, self.testcase_valid_keys)
2944
2945                        tc.source_dir = tc_path
2946                        tc.yamlfile = tc_path
2947
2948                        tc.type = tc_dict["type"]
2949                        tc.tags = tc_dict["tags"]
2950                        tc.extra_args = tc_dict["extra_args"]
2951                        tc.extra_configs = tc_dict["extra_configs"]
2952                        tc.arch_allow = tc_dict["arch_allow"]
2953                        tc.arch_exclude = tc_dict["arch_exclude"]
2954                        tc.skip = tc_dict["skip"]
2955                        tc.platform_exclude = tc_dict["platform_exclude"]
2956                        tc.platform_allow = tc_dict["platform_allow"]
2957                        tc.toolchain_exclude = tc_dict["toolchain_exclude"]
2958                        tc.toolchain_allow = tc_dict["toolchain_allow"]
2959                        tc.tc_filter = tc_dict["filter"]
2960                        tc.timeout = tc_dict["timeout"]
2961                        tc.harness = tc_dict["harness"]
2962                        tc.harness_config = tc_dict["harness_config"]
2963                        if tc.harness == 'console' and not tc.harness_config:
2964                            raise Exception('Harness config error: console harness defined without a configuration.')
2965                        tc.build_only = tc_dict["build_only"]
2966                        tc.build_on_all = tc_dict["build_on_all"]
2967                        tc.slow = tc_dict["slow"]
2968                        tc.min_ram = tc_dict["min_ram"]
2969                        tc.depends_on = tc_dict["depends_on"]
2970                        tc.min_flash = tc_dict["min_flash"]
2971                        tc.extra_sections = tc_dict["extra_sections"]
2972                        tc.integration_platforms = tc_dict["integration_platforms"]
2973
2974                        tc.parse_subcases(tc_path)
2975
2976                        if testcase_filter:
2977                            if tc.name and tc.name in testcase_filter:
2978                                self.testcases[tc.name] = tc
2979                        else:
2980                            self.testcases[tc.name] = tc
2981
2982                except Exception as e:
2983                    logger.error("%s: can't load (skipping): %s" % (tc_path, e))
2984                    self.load_errors += 1
2985        return len(self.testcases)
2986
2987    def get_platform(self, name):
2988        selected_platform = None
2989        for platform in self.platforms:
2990            if platform.name == name:
2991                selected_platform = platform
2992                break
2993        return selected_platform
2994
2995    def load_quarantine(self, file):
2996        """
2997        Loads quarantine list from the given yaml file. Creates a dictionary
2998        of all tests configurations (platform + scenario: comment) that shall be
2999        skipped due to quarantine
3000        """
3001
3002        # Load yaml into quarantine_yaml
3003        quarantine_yaml = scl.yaml_load_verify(file, self.quarantine_schema)
3004
3005        # Create quarantine_list with a product of the listed
3006        # platforms and scenarios for each entry in quarantine yaml
3007        quarantine_list = []
3008        for quar_dict in quarantine_yaml:
3009            if quar_dict['platforms'][0] == "all":
3010                plat = [p.name for p in self.platforms]
3011            else:
3012                plat = quar_dict['platforms']
3013            comment = quar_dict.get('comment', "NA")
3014            quarantine_list.append([{".".join([p, s]): comment}
3015                                   for p in plat for s in quar_dict['scenarios']])
3016
3017        # Flatten the quarantine_list
3018        quarantine_list = [it for sublist in quarantine_list for it in sublist]
3019        # Change quarantine_list into a dictionary
3020        for d in quarantine_list:
3021            self.quarantine.update(d)
3022
3023    def load_from_file(self, file, filter_status=[], filter_platform=[]):
3024        try:
3025            with open(file, "r") as fp:
3026                cr = csv.DictReader(fp)
3027                instance_list = []
3028                for row in cr:
3029                    if row["status"] in filter_status:
3030                        continue
3031                    test = row["test"]
3032
3033                    platform = self.get_platform(row["platform"])
3034                    if filter_platform and platform.name not in filter_platform:
3035                        continue
3036                    instance = TestInstance(self.testcases[test], platform, self.outdir)
3037                    if self.device_testing:
3038                        tfilter = 'runnable'
3039                    else:
3040                        tfilter = 'buildable'
3041                    instance.run = instance.check_runnable(
3042                        self.enable_slow,
3043                        tfilter,
3044                        self.fixtures
3045                    )
3046                    instance.create_overlay(platform, self.enable_asan, self.enable_ubsan, self.enable_coverage, self.coverage_platform)
3047                    instance_list.append(instance)
3048                self.add_instances(instance_list)
3049
3050        except KeyError as e:
3051            logger.error("Key error while parsing tests file.({})".format(str(e)))
3052            sys.exit(2)
3053
3054        except FileNotFoundError as e:
3055            logger.error("Couldn't find input file with list of tests. ({})".format(e))
3056            sys.exit(2)
3057
3058    def apply_filters(self, **kwargs):
3059
3060        toolchain = self.get_toolchain()
3061
3062        discards = {}
3063        platform_filter = kwargs.get('platform')
3064        exclude_platform = kwargs.get('exclude_platform', [])
3065        testcase_filter = kwargs.get('run_individual_tests', [])
3066        arch_filter = kwargs.get('arch')
3067        tag_filter = kwargs.get('tag')
3068        exclude_tag = kwargs.get('exclude_tag')
3069        all_filter = kwargs.get('all')
3070        runnable = kwargs.get('runnable')
3071        force_toolchain = kwargs.get('force_toolchain')
3072        force_platform = kwargs.get('force_platform')
3073        emu_filter = kwargs.get('emulation_only')
3074
3075        logger.debug("platform filter: " + str(platform_filter))
3076        logger.debug("    arch_filter: " + str(arch_filter))
3077        logger.debug("     tag_filter: " + str(tag_filter))
3078        logger.debug("    exclude_tag: " + str(exclude_tag))
3079
3080        default_platforms = False
3081        emulation_platforms = False
3082
3083
3084        if all_filter:
3085            logger.info("Selecting all possible platforms per test case")
3086            # When --all used, any --platform arguments ignored
3087            platform_filter = []
3088        elif not platform_filter and not emu_filter:
3089            logger.info("Selecting default platforms per test case")
3090            default_platforms = True
3091        elif emu_filter:
3092            logger.info("Selecting emulation platforms per test case")
3093            emulation_platforms = True
3094
3095        if platform_filter:
3096            platforms = list(filter(lambda p: p.name in platform_filter, self.platforms))
3097        elif emu_filter:
3098            platforms = list(filter(lambda p: p.simulation != 'na', self.platforms))
3099        elif arch_filter:
3100            platforms = list(filter(lambda p: p.arch in arch_filter, self.platforms))
3101        elif default_platforms:
3102            platforms = list(filter(lambda p: p.default, self.platforms))
3103        else:
3104            platforms = self.platforms
3105
3106        logger.info("Building initial testcase list...")
3107
3108        for tc_name, tc in self.testcases.items():
3109
3110            if tc.build_on_all and not platform_filter:
3111                platform_scope = self.platforms
3112            elif tc.integration_platforms and self.integration:
3113                platform_scope = list(filter(lambda item: item.name in tc.integration_platforms, \
3114                                         self.platforms))
3115            else:
3116                platform_scope = platforms
3117
3118            integration = self.integration and tc.integration_platforms
3119
3120            # If there isn't any overlap between the platform_allow list and the platform_scope
3121            # we set the scope to the platform_allow list
3122            if tc.platform_allow and not platform_filter and not integration:
3123                a = set(platform_scope)
3124                b = set(filter(lambda item: item.name in tc.platform_allow, self.platforms))
3125                c = a.intersection(b)
3126                if not c:
3127                    platform_scope = list(filter(lambda item: item.name in tc.platform_allow, \
3128                                             self.platforms))
3129
3130            # list of instances per testcase, aka configurations.
3131            instance_list = []
3132            for plat in platform_scope:
3133                instance = TestInstance(tc, plat, self.outdir)
3134                if runnable:
3135                    tfilter = 'runnable'
3136                else:
3137                    tfilter = 'buildable'
3138
3139                instance.run = instance.check_runnable(
3140                    self.enable_slow,
3141                    tfilter,
3142                    self.fixtures
3143                )
3144
3145                for t in tc.cases:
3146                    instance.results[t] = None
3147
3148                if runnable and self.duts:
3149                    for h in self.duts:
3150                        if h.platform == plat.name:
3151                            if tc.harness_config.get('fixture') in h.fixtures:
3152                                instance.run = True
3153
3154                if not force_platform and plat.name in exclude_platform:
3155                    discards[instance] = discards.get(instance, "Platform is excluded on command line.")
3156
3157                if (plat.arch == "unit") != (tc.type == "unit"):
3158                    # Discard silently
3159                    continue
3160
3161                if runnable and not instance.run:
3162                    discards[instance] = discards.get(instance, "Not runnable on device")
3163
3164                if self.integration and tc.integration_platforms and plat.name not in tc.integration_platforms:
3165                    discards[instance] = discards.get(instance, "Not part of integration platforms")
3166
3167                if tc.skip:
3168                    discards[instance] = discards.get(instance, "Skip filter")
3169
3170                if tag_filter and not tc.tags.intersection(tag_filter):
3171                    discards[instance] = discards.get(instance, "Command line testcase tag filter")
3172
3173                if exclude_tag and tc.tags.intersection(exclude_tag):
3174                    discards[instance] = discards.get(instance, "Command line testcase exclude filter")
3175
3176                if testcase_filter and tc_name not in testcase_filter:
3177                    discards[instance] = discards.get(instance, "Testcase name filter")
3178
3179                if arch_filter and plat.arch not in arch_filter:
3180                    discards[instance] = discards.get(instance, "Command line testcase arch filter")
3181
3182                if not force_platform:
3183
3184                    if tc.arch_allow and plat.arch not in tc.arch_allow:
3185                        discards[instance] = discards.get(instance, "Not in test case arch allow list")
3186
3187                    if tc.arch_exclude and plat.arch in tc.arch_exclude:
3188                        discards[instance] = discards.get(instance, "In test case arch exclude")
3189
3190                    if tc.platform_exclude and plat.name in tc.platform_exclude:
3191                        discards[instance] = discards.get(instance, "In test case platform exclude")
3192
3193                if tc.toolchain_exclude and toolchain in tc.toolchain_exclude:
3194                    discards[instance] = discards.get(instance, "In test case toolchain exclude")
3195
3196                if platform_filter and plat.name not in platform_filter:
3197                    discards[instance] = discards.get(instance, "Command line platform filter")
3198
3199                if tc.platform_allow and plat.name not in tc.platform_allow:
3200                    discards[instance] = discards.get(instance, "Not in testcase platform allow list")
3201
3202                if tc.toolchain_allow and toolchain not in tc.toolchain_allow:
3203                    discards[instance] = discards.get(instance, "Not in testcase toolchain allow list")
3204
3205                if not plat.env_satisfied:
3206                    discards[instance] = discards.get(instance, "Environment ({}) not satisfied".format(", ".join(plat.env)))
3207
3208                if not force_toolchain \
3209                        and toolchain and (toolchain not in plat.supported_toolchains) \
3210                        and "host" not in plat.supported_toolchains \
3211                        and tc.type != 'unit':
3212                    discards[instance] = discards.get(instance, "Not supported by the toolchain")
3213
3214                if plat.ram < tc.min_ram:
3215                    discards[instance] = discards.get(instance, "Not enough RAM")
3216
3217                if tc.depends_on:
3218                    dep_intersection = tc.depends_on.intersection(set(plat.supported))
3219                    if dep_intersection != set(tc.depends_on):
3220                        discards[instance] = discards.get(instance, "No hardware support")
3221
3222                if plat.flash < tc.min_flash:
3223                    discards[instance] = discards.get(instance, "Not enough FLASH")
3224
3225                if set(plat.ignore_tags) & tc.tags:
3226                    discards[instance] = discards.get(instance, "Excluded tags per platform (exclude_tags)")
3227
3228                if plat.only_tags and not set(plat.only_tags) & tc.tags:
3229                    discards[instance] = discards.get(instance, "Excluded tags per platform (only_tags)")
3230
3231                test_configuration = ".".join([instance.platform.name,
3232                                               instance.testcase.id])
3233                # skip quarantined tests
3234                if test_configuration in self.quarantine and not self.quarantine_verify:
3235                    discards[instance] = discards.get(instance,
3236                                                      f"Quarantine: {self.quarantine[test_configuration]}")
3237                # run only quarantined test to verify their statuses (skip everything else)
3238                if self.quarantine_verify and test_configuration not in self.quarantine:
3239                    discards[instance] = discards.get(instance, "Not under quarantine")
3240
3241                # if nothing stopped us until now, it means this configuration
3242                # needs to be added.
3243                instance_list.append(instance)
3244
3245            # no configurations, so jump to next testcase
3246            if not instance_list:
3247                continue
3248
3249            # if twister was launched with no platform options at all, we
3250            # take all default platforms
3251            if default_platforms and not tc.build_on_all and not integration:
3252                if tc.platform_allow:
3253                    a = set(self.default_platforms)
3254                    b = set(tc.platform_allow)
3255                    c = a.intersection(b)
3256                    if c:
3257                        aa = list(filter(lambda tc: tc.platform.name in c, instance_list))
3258                        self.add_instances(aa)
3259                    else:
3260                        self.add_instances(instance_list)
3261                else:
3262                    instances = list(filter(lambda tc: tc.platform.default, instance_list))
3263                    self.add_instances(instances)
3264            elif integration:
3265                instances = list(filter(lambda item:  item.platform.name in tc.integration_platforms, instance_list))
3266                self.add_instances(instances)
3267
3268
3269
3270            elif emulation_platforms:
3271                self.add_instances(instance_list)
3272                for instance in list(filter(lambda inst: not inst.platform.simulation != 'na', instance_list)):
3273                    discards[instance] = discards.get(instance, "Not an emulated platform")
3274            else:
3275                self.add_instances(instance_list)
3276
3277        for _, case in self.instances.items():
3278            case.create_overlay(case.platform, self.enable_asan, self.enable_ubsan, self.enable_coverage, self.coverage_platform)
3279
3280        self.discards = discards
3281        self.selected_platforms = set(p.platform.name for p in self.instances.values())
3282
3283        for instance in self.discards:
3284            instance.reason = self.discards[instance]
3285            # If integration mode is on all skips on integration_platforms are treated as errors.
3286            if self.integration and instance.platform.name in instance.testcase.integration_platforms \
3287                and "Quarantine" not in instance.reason:
3288                instance.status = "error"
3289                instance.reason += " but is one of the integration platforms"
3290                instance.fill_results_by_status()
3291                self.instances[instance.name] = instance
3292            else:
3293                instance.status = "skipped"
3294                instance.fill_results_by_status()
3295
3296        self.filtered_platforms = set(p.platform.name for p in self.instances.values()
3297                                      if p.status != "skipped" )
3298
3299        return discards
3300
3301    def add_instances(self, instance_list):
3302        for instance in instance_list:
3303            self.instances[instance.name] = instance
3304
3305    @staticmethod
3306    def calc_one_elf_size(instance):
3307        if instance.status not in ["error", "failed", "skipped"]:
3308            if instance.platform.type != "native":
3309                size_calc = instance.calculate_sizes()
3310                instance.metrics["ram_size"] = size_calc.get_ram_size()
3311                instance.metrics["rom_size"] = size_calc.get_rom_size()
3312                instance.metrics["unrecognized"] = size_calc.unrecognized_sections()
3313            else:
3314                instance.metrics["ram_size"] = 0
3315                instance.metrics["rom_size"] = 0
3316                instance.metrics["unrecognized"] = []
3317
3318            instance.metrics["handler_time"] = instance.handler.duration if instance.handler else 0
3319
3320    def add_tasks_to_queue(self, pipeline, build_only=False, test_only=False):
3321        for instance in self.instances.values():
3322            if build_only:
3323                instance.run = False
3324
3325            if instance.status not in ['passed', 'skipped', 'error']:
3326                logger.debug(f"adding {instance.name}")
3327                instance.status = None
3328                if test_only and instance.run:
3329                    pipeline.put({"op": "run", "test": instance})
3330                else:
3331                    pipeline.put({"op": "cmake", "test": instance})
3332            # If the instance got 'error' status before, proceed to the report stage
3333            if instance.status == "error":
3334                pipeline.put({"op": "report", "test": instance})
3335
3336    def pipeline_mgr(self, pipeline, done_queue, lock, results):
3337        while True:
3338            try:
3339                task = pipeline.get_nowait()
3340            except queue.Empty:
3341                break
3342            else:
3343                test = task['test']
3344                pb = ProjectBuilder(self,
3345                                    test,
3346                                    lsan=self.enable_lsan,
3347                                    asan=self.enable_asan,
3348                                    ubsan=self.enable_ubsan,
3349                                    coverage=self.enable_coverage,
3350                                    extra_args=self.extra_args,
3351                                    device_testing=self.device_testing,
3352                                    cmake_only=self.cmake_only,
3353                                    cleanup=self.cleanup,
3354                                    valgrind=self.enable_valgrind,
3355                                    inline_logs=self.inline_logs,
3356                                    generator=self.generator,
3357                                    generator_cmd=self.generator_cmd,
3358                                    verbose=self.verbose,
3359                                    warnings_as_errors=self.warnings_as_errors,
3360                                    overflow_as_errors=self.overflow_as_errors
3361                                    )
3362                pb.process(pipeline, done_queue, task, lock, results)
3363
3364        return True
3365
3366    def execute(self, pipeline, done, results):
3367        lock = Lock()
3368        logger.info("Adding tasks to the queue...")
3369        self.add_tasks_to_queue(pipeline, self.build_only, self.test_only)
3370        logger.info("Added initial list of jobs to queue")
3371
3372        processes = []
3373        for job in range(self.jobs):
3374            logger.debug(f"Launch process {job}")
3375            p = Process(target=self.pipeline_mgr, args=(pipeline, done, lock, results, ))
3376            processes.append(p)
3377            p.start()
3378
3379        try:
3380            for p in processes:
3381                p.join()
3382        except KeyboardInterrupt:
3383            logger.info("Execution interrupted")
3384            for p in processes:
3385                p.terminate()
3386
3387        # FIXME: This needs to move out.
3388        if self.enable_size_report and not self.cmake_only:
3389            # Parallelize size calculation
3390            executor = concurrent.futures.ThreadPoolExecutor(self.jobs)
3391            futures = [executor.submit(self.calc_one_elf_size, instance)
3392                       for instance in self.instances.values()]
3393            concurrent.futures.wait(futures)
3394        else:
3395            for instance in self.instances.values():
3396                instance.metrics["ram_size"] = 0
3397                instance.metrics["rom_size"] = 0
3398                instance.metrics["handler_time"] = instance.handler.duration if instance.handler else 0
3399                instance.metrics["unrecognized"] = []
3400
3401        return results
3402
3403    def discard_report(self, filename):
3404
3405        try:
3406            if not self.discards:
3407                raise TwisterRuntimeError("apply_filters() hasn't been run!")
3408        except Exception as e:
3409            logger.error(str(e))
3410            sys.exit(2)
3411        with open(filename, "wt") as csvfile:
3412            fieldnames = ["test", "arch", "platform", "reason"]
3413            cw = csv.DictWriter(csvfile, fieldnames, lineterminator=os.linesep)
3414            cw.writeheader()
3415            for instance, reason in sorted(self.discards.items()):
3416                rowdict = {"test": instance.testcase.name,
3417                           "arch": instance.platform.arch,
3418                           "platform": instance.platform.name,
3419                           "reason": reason}
3420                cw.writerow(rowdict)
3421
3422    def target_report(self, outdir, suffix, append=False):
3423        platforms = {inst.platform.name for _, inst in self.instances.items()}
3424        for platform in platforms:
3425            if suffix:
3426                filename = os.path.join(outdir,"{}_{}.xml".format(platform, suffix))
3427            else:
3428                filename = os.path.join(outdir,"{}.xml".format(platform))
3429            self.xunit_report(filename, platform, full_report=True,
3430                              append=append, version=self.version)
3431
3432
3433    @staticmethod
3434    def process_log(log_file):
3435        filtered_string = ""
3436        if os.path.exists(log_file):
3437            with open(log_file, "rb") as f:
3438                log = f.read().decode("utf-8")
3439                filtered_string = ''.join(filter(lambda x: x in string.printable, log))
3440
3441        return filtered_string
3442
3443
3444    def xunit_report(self, filename, platform=None, full_report=False, append=False, version="NA"):
3445        total = 0
3446        fails = passes = errors = skips = 0
3447        if platform:
3448            selected = [platform]
3449            logger.info(f"Writing target report for {platform}...")
3450        else:
3451            logger.info(f"Writing xunit report {filename}...")
3452            selected = self.selected_platforms
3453
3454        if os.path.exists(filename) and append:
3455            tree = ET.parse(filename)
3456            eleTestsuites = tree.getroot()
3457        else:
3458            eleTestsuites = ET.Element('testsuites')
3459
3460        for p in selected:
3461            inst = self.get_platform_instances(p)
3462            fails = 0
3463            passes = 0
3464            errors = 0
3465            skips = 0
3466            duration = 0
3467
3468            for _, instance in inst.items():
3469                handler_time = instance.metrics.get('handler_time', 0)
3470                duration += handler_time
3471                if full_report and instance.run:
3472                    for k in instance.results.keys():
3473                        if instance.results[k] == 'PASS':
3474                            passes += 1
3475                        elif instance.results[k] == 'BLOCK':
3476                            errors += 1
3477                        elif instance.results[k] == 'SKIP' or instance.status in ['skipped']:
3478                            skips += 1
3479                        else:
3480                            fails += 1
3481                else:
3482                    if instance.status in ["error", "failed", "timeout", "flash_error"]:
3483                        if instance.reason in ['build_error', 'handler_crash']:
3484                            errors += 1
3485                        else:
3486                            fails += 1
3487                    elif instance.status == 'skipped':
3488                        skips += 1
3489                    elif instance.status == 'passed':
3490                        passes += 1
3491                    else:
3492                        if instance.status:
3493                            logger.error(f"{instance.name}: Unknown status {instance.status}")
3494                        else:
3495                            logger.error(f"{instance.name}: No status")
3496
3497            total = (errors + passes + fails + skips)
3498            # do not produce a report if no tests were actually run (only built)
3499            if total == 0:
3500                continue
3501
3502            run = p
3503            eleTestsuite = None
3504
3505            # When we re-run the tests, we re-use the results and update only with
3506            # the newly run tests.
3507            if os.path.exists(filename) and append:
3508                ts = eleTestsuites.findall(f'testsuite/[@name="{p}"]')
3509                if ts:
3510                    eleTestsuite = ts[0]
3511                    eleTestsuite.attrib['failures'] = "%d" % fails
3512                    eleTestsuite.attrib['errors'] = "%d" % errors
3513                    eleTestsuite.attrib['skipped'] = "%d" % skips
3514                else:
3515                    logger.info(f"Did not find any existing results for {p}")
3516                    eleTestsuite = ET.SubElement(eleTestsuites, 'testsuite',
3517                                name=run, time="%f" % duration,
3518                                tests="%d" % (total),
3519                                failures="%d" % fails,
3520                                errors="%d" % (errors), skipped="%s" % (skips))
3521                    eleTSPropetries = ET.SubElement(eleTestsuite, 'properties')
3522                    # Multiple 'property' can be added to 'properties'
3523                    # differing by name and value
3524                    ET.SubElement(eleTSPropetries, 'property', name="version", value=version)
3525
3526            else:
3527                eleTestsuite = ET.SubElement(eleTestsuites, 'testsuite',
3528                                             name=run, time="%f" % duration,
3529                                             tests="%d" % (total),
3530                                             failures="%d" % fails,
3531                                             errors="%d" % (errors), skipped="%s" % (skips))
3532                eleTSPropetries = ET.SubElement(eleTestsuite, 'properties')
3533                # Multiple 'property' can be added to 'properties'
3534                # differing by name and value
3535                ET.SubElement(eleTSPropetries, 'property', name="version", value=version)
3536
3537            for _, instance in inst.items():
3538                if full_report:
3539                    tname = os.path.basename(instance.testcase.name)
3540                else:
3541                    tname = instance.testcase.id
3542                handler_time = instance.metrics.get('handler_time', 0)
3543
3544                if full_report:
3545                    for k in instance.results.keys():
3546                        # remove testcases that are being re-run from exiting reports
3547                        for tc in eleTestsuite.findall(f'testcase/[@name="{k}"]'):
3548                            eleTestsuite.remove(tc)
3549
3550                        classname = ".".join(tname.split(".")[:2])
3551                        eleTestcase = ET.SubElement(
3552                            eleTestsuite, 'testcase',
3553                            classname=classname,
3554                            name="%s" % (k), time="%f" % handler_time)
3555                        if instance.results[k] in ['FAIL', 'BLOCK'] or \
3556                            (not instance.run and instance.status in ["error", "failed", "timeout"]):
3557                            if instance.results[k] == 'FAIL':
3558                                el = ET.SubElement(
3559                                    eleTestcase,
3560                                    'failure',
3561                                    type="failure",
3562                                    message="failed")
3563                            else:
3564                                el = ET.SubElement(
3565                                    eleTestcase,
3566                                    'error',
3567                                    type="failure",
3568                                    message=instance.reason)
3569                            log_root = os.path.join(self.outdir, instance.platform.name, instance.testcase.name)
3570                            log_file = os.path.join(log_root, "handler.log")
3571                            el.text = self.process_log(log_file)
3572
3573                        elif instance.results[k] == 'PASS' \
3574                            or (not instance.run and instance.status in ["passed"]):
3575                            pass
3576                        elif instance.results[k] == 'SKIP' or (instance.status in ["skipped"]):
3577                            el = ET.SubElement(eleTestcase, 'skipped', type="skipped", message=instance.reason)
3578                        else:
3579                            el = ET.SubElement(
3580                                eleTestcase,
3581                                'error',
3582                                type="error",
3583                                message=f"{instance.reason}")
3584                else:
3585                    if platform:
3586                        classname = ".".join(instance.testcase.name.split(".")[:2])
3587                    else:
3588                        classname = p + ":" + ".".join(instance.testcase.name.split(".")[:2])
3589
3590                    # remove testcases that are being re-run from exiting reports
3591                    for tc in eleTestsuite.findall(f'testcase/[@classname="{classname}"][@name="{instance.testcase.name}"]'):
3592                        eleTestsuite.remove(tc)
3593
3594                    eleTestcase = ET.SubElement(eleTestsuite, 'testcase',
3595                        classname=classname,
3596                        name="%s" % (instance.testcase.name),
3597                        time="%f" % handler_time)
3598
3599                    if instance.status in ["error", "failed", "timeout", "flash_error"]:
3600                        failure = ET.SubElement(
3601                            eleTestcase,
3602                            'failure',
3603                            type="failure",
3604                            message=instance.reason)
3605
3606                        log_root = ("%s/%s/%s" % (self.outdir, instance.platform.name, instance.testcase.name))
3607                        bl = os.path.join(log_root, "build.log")
3608                        hl = os.path.join(log_root, "handler.log")
3609                        log_file = bl
3610                        if instance.reason != 'Build error':
3611                            if os.path.exists(hl):
3612                                log_file = hl
3613                            else:
3614                                log_file = bl
3615
3616                        failure.text = self.process_log(log_file)
3617
3618                    elif instance.status == "skipped":
3619                        ET.SubElement(eleTestcase, 'skipped', type="skipped", message="Skipped")
3620
3621        result = ET.tostring(eleTestsuites)
3622        with open(filename, 'wb') as report:
3623            report.write(result)
3624
3625        return fails, passes, errors, skips
3626
3627    def csv_report(self, filename):
3628        with open(filename, "wt") as csvfile:
3629            fieldnames = ["test", "arch", "platform", "status",
3630                          "extra_args", "handler", "handler_time", "ram_size",
3631                          "rom_size"]
3632            cw = csv.DictWriter(csvfile, fieldnames, lineterminator=os.linesep)
3633            cw.writeheader()
3634            for instance in self.instances.values():
3635                rowdict = {"test": instance.testcase.name,
3636                           "arch": instance.platform.arch,
3637                           "platform": instance.platform.name,
3638                           "extra_args": " ".join(instance.testcase.extra_args),
3639                           "handler": instance.platform.simulation}
3640
3641                rowdict["status"] = instance.status
3642                if instance.status not in ["error", "failed", "timeout"]:
3643                    if instance.handler:
3644                        rowdict["handler_time"] = instance.metrics.get("handler_time", 0)
3645                    ram_size = instance.metrics.get("ram_size", 0)
3646                    rom_size = instance.metrics.get("rom_size", 0)
3647                    rowdict["ram_size"] = ram_size
3648                    rowdict["rom_size"] = rom_size
3649                cw.writerow(rowdict)
3650
3651    def json_report(self, filename, append=False, version="NA"):
3652        logger.info(f"Writing JSON report {filename}")
3653        report = {}
3654        selected = self.selected_platforms
3655        report["environment"] = {"os": os.name,
3656                                 "zephyr_version": version,
3657                                 "toolchain": self.get_toolchain()
3658                                 }
3659        json_data = {}
3660        if os.path.exists(filename) and append:
3661            with open(filename, 'r') as json_file:
3662                json_data = json.load(json_file)
3663
3664        suites = json_data.get("testsuites", [])
3665        if suites:
3666            suite = suites[0]
3667            testcases = suite.get("testcases", [])
3668        else:
3669            suite = {}
3670            testcases = []
3671
3672        for p in selected:
3673            inst = self.get_platform_instances(p)
3674            for _, instance in inst.items():
3675                testcase = {}
3676                handler_log = os.path.join(instance.build_dir, "handler.log")
3677                build_log = os.path.join(instance.build_dir, "build.log")
3678                device_log = os.path.join(instance.build_dir, "device.log")
3679
3680                handler_time = instance.metrics.get('handler_time', 0)
3681                ram_size = instance.metrics.get ("ram_size", 0)
3682                rom_size  = instance.metrics.get("rom_size",0)
3683                for k in instance.results.keys():
3684                    testcases = list(filter(lambda d: not (d.get('testcase') == k and d.get('platform') == p), testcases ))
3685                    testcase = {"testcase": k,
3686                                "arch": instance.platform.arch,
3687                                "platform": p,
3688                                }
3689                    if ram_size:
3690                        testcase["ram_size"] = ram_size
3691                    if rom_size:
3692                        testcase["rom_size"] = rom_size
3693
3694                    if instance.results[k] in ["PASS"] or instance.status == 'passed':
3695                        testcase["status"] = "passed"
3696                        if instance.handler:
3697                            testcase["execution_time"] =  handler_time
3698
3699                    elif instance.results[k] in ['FAIL', 'BLOCK'] or instance.status in ["error", "failed", "timeout", "flash_error"]:
3700                        testcase["status"] = "failed"
3701                        testcase["reason"] = instance.reason
3702                        testcase["execution_time"] =  handler_time
3703                        if os.path.exists(handler_log):
3704                            testcase["test_output"] = self.process_log(handler_log)
3705                        elif os.path.exists(device_log):
3706                            testcase["device_log"] = self.process_log(device_log)
3707                        else:
3708                            testcase["build_log"] = self.process_log(build_log)
3709                    elif instance.status == 'skipped':
3710                        testcase["status"] = "skipped"
3711                        testcase["reason"] = instance.reason
3712                    testcases.append(testcase)
3713
3714        suites = [ {"testcases": testcases} ]
3715        report["testsuites"] = suites
3716
3717        with open(filename, "wt") as json_file:
3718            json.dump(report, json_file, indent=4, separators=(',',':'))
3719
3720    def get_testcase(self, identifier):
3721        results = []
3722        for _, tc in self.testcases.items():
3723            for case in tc.cases:
3724                if case == identifier:
3725                    results.append(tc)
3726        return results
3727
3728class CoverageTool:
3729    """ Base class for every supported coverage tool
3730    """
3731
3732    def __init__(self):
3733        self.gcov_tool = None
3734        self.base_dir = None
3735
3736    @staticmethod
3737    def factory(tool):
3738        if tool == 'lcov':
3739            t =  Lcov()
3740        elif tool == 'gcovr':
3741            t =  Gcovr()
3742        else:
3743            logger.error("Unsupported coverage tool specified: {}".format(tool))
3744            return None
3745
3746        logger.debug(f"Select {tool} as the coverage tool...")
3747        return t
3748
3749    @staticmethod
3750    def retrieve_gcov_data(input_file):
3751        logger.debug("Working on %s" % input_file)
3752        extracted_coverage_info = {}
3753        capture_data = False
3754        capture_complete = False
3755        with open(input_file, 'r') as fp:
3756            for line in fp.readlines():
3757                if re.search("GCOV_COVERAGE_DUMP_START", line):
3758                    capture_data = True
3759                    continue
3760                if re.search("GCOV_COVERAGE_DUMP_END", line):
3761                    capture_complete = True
3762                    break
3763                # Loop until the coverage data is found.
3764                if not capture_data:
3765                    continue
3766                if line.startswith("*"):
3767                    sp = line.split("<")
3768                    if len(sp) > 1:
3769                        # Remove the leading delimiter "*"
3770                        file_name = sp[0][1:]
3771                        # Remove the trailing new line char
3772                        hex_dump = sp[1][:-1]
3773                    else:
3774                        continue
3775                else:
3776                    continue
3777                extracted_coverage_info.update({file_name: hex_dump})
3778        if not capture_data:
3779            capture_complete = True
3780        return {'complete': capture_complete, 'data': extracted_coverage_info}
3781
3782    @staticmethod
3783    def create_gcda_files(extracted_coverage_info):
3784        logger.debug("Generating gcda files")
3785        for filename, hexdump_val in extracted_coverage_info.items():
3786            # if kobject_hash is given for coverage gcovr fails
3787            # hence skipping it problem only in gcovr v4.1
3788            if "kobject_hash" in filename:
3789                filename = (filename[:-4]) + "gcno"
3790                try:
3791                    os.remove(filename)
3792                except Exception:
3793                    pass
3794                continue
3795
3796            with open(filename, 'wb') as fp:
3797                fp.write(bytes.fromhex(hexdump_val))
3798
3799    def generate(self, outdir):
3800        for filename in glob.glob("%s/**/handler.log" % outdir, recursive=True):
3801            gcov_data = self.__class__.retrieve_gcov_data(filename)
3802            capture_complete = gcov_data['complete']
3803            extracted_coverage_info = gcov_data['data']
3804            if capture_complete:
3805                self.__class__.create_gcda_files(extracted_coverage_info)
3806                logger.debug("Gcov data captured: {}".format(filename))
3807            else:
3808                logger.error("Gcov data capture incomplete: {}".format(filename))
3809
3810        with open(os.path.join(outdir, "coverage.log"), "a") as coveragelog:
3811            ret = self._generate(outdir, coveragelog)
3812            if ret == 0:
3813                logger.info("HTML report generated: {}".format(
3814                    os.path.join(outdir, "coverage", "index.html")))
3815
3816
3817class Lcov(CoverageTool):
3818
3819    def __init__(self):
3820        super().__init__()
3821        self.ignores = []
3822
3823    def add_ignore_file(self, pattern):
3824        self.ignores.append('*' + pattern + '*')
3825
3826    def add_ignore_directory(self, pattern):
3827        self.ignores.append('*/' + pattern + '/*')
3828
3829    def _generate(self, outdir, coveragelog):
3830        coveragefile = os.path.join(outdir, "coverage.info")
3831        ztestfile = os.path.join(outdir, "ztest.info")
3832        cmd = ["lcov", "--gcov-tool", self.gcov_tool,
3833                         "--capture", "--directory", outdir,
3834                         "--rc", "lcov_branch_coverage=1",
3835                         "--output-file", coveragefile]
3836        cmd_str = " ".join(cmd)
3837        logger.debug(f"Running {cmd_str}...")
3838        subprocess.call(cmd, stdout=coveragelog)
3839        # We want to remove tests/* and tests/ztest/test/* but save tests/ztest
3840        subprocess.call(["lcov", "--gcov-tool", self.gcov_tool, "--extract",
3841                         coveragefile,
3842                         os.path.join(self.base_dir, "tests", "ztest", "*"),
3843                         "--output-file", ztestfile,
3844                         "--rc", "lcov_branch_coverage=1"], stdout=coveragelog)
3845
3846        if os.path.exists(ztestfile) and os.path.getsize(ztestfile) > 0:
3847            subprocess.call(["lcov", "--gcov-tool", self.gcov_tool, "--remove",
3848                             ztestfile,
3849                             os.path.join(self.base_dir, "tests/ztest/test/*"),
3850                             "--output-file", ztestfile,
3851                             "--rc", "lcov_branch_coverage=1"],
3852                            stdout=coveragelog)
3853            files = [coveragefile, ztestfile]
3854        else:
3855            files = [coveragefile]
3856
3857        for i in self.ignores:
3858            subprocess.call(
3859                ["lcov", "--gcov-tool", self.gcov_tool, "--remove",
3860                 coveragefile, i, "--output-file",
3861                 coveragefile, "--rc", "lcov_branch_coverage=1"],
3862                stdout=coveragelog)
3863
3864        # The --ignore-errors source option is added to avoid it exiting due to
3865        # samples/application_development/external_lib/
3866        return subprocess.call(["genhtml", "--legend", "--branch-coverage",
3867                                "--ignore-errors", "source",
3868                                "-output-directory",
3869                                os.path.join(outdir, "coverage")] + files,
3870                               stdout=coveragelog)
3871
3872
3873class Gcovr(CoverageTool):
3874
3875    def __init__(self):
3876        super().__init__()
3877        self.ignores = []
3878
3879    def add_ignore_file(self, pattern):
3880        self.ignores.append('.*' + pattern + '.*')
3881
3882    def add_ignore_directory(self, pattern):
3883        self.ignores.append(".*/" + pattern + '/.*')
3884
3885    @staticmethod
3886    def _interleave_list(prefix, list):
3887        tuple_list = [(prefix, item) for item in list]
3888        return [item for sublist in tuple_list for item in sublist]
3889
3890    def _generate(self, outdir, coveragelog):
3891        coveragefile = os.path.join(outdir, "coverage.json")
3892        ztestfile = os.path.join(outdir, "ztest.json")
3893
3894        excludes = Gcovr._interleave_list("-e", self.ignores)
3895
3896        # We want to remove tests/* and tests/ztest/test/* but save tests/ztest
3897        cmd = ["gcovr", "-r", self.base_dir, "--gcov-executable",
3898               self.gcov_tool, "-e", "tests/*"] + excludes + ["--json", "-o",
3899               coveragefile, outdir]
3900        cmd_str = " ".join(cmd)
3901        logger.debug(f"Running {cmd_str}...")
3902        subprocess.call(cmd, stdout=coveragelog)
3903
3904        subprocess.call(["gcovr", "-r", self.base_dir, "--gcov-executable",
3905                         self.gcov_tool, "-f", "tests/ztest", "-e",
3906                         "tests/ztest/test/*", "--json", "-o", ztestfile,
3907                         outdir], stdout=coveragelog)
3908
3909        if os.path.exists(ztestfile) and os.path.getsize(ztestfile) > 0:
3910            files = [coveragefile, ztestfile]
3911        else:
3912            files = [coveragefile]
3913
3914        subdir = os.path.join(outdir, "coverage")
3915        os.makedirs(subdir, exist_ok=True)
3916
3917        tracefiles = self._interleave_list("--add-tracefile", files)
3918
3919        return subprocess.call(["gcovr", "-r", self.base_dir, "--html",
3920                                "--html-details"] + tracefiles +
3921                               ["-o", os.path.join(subdir, "index.html")],
3922                               stdout=coveragelog)
3923
3924class DUT(object):
3925    def __init__(self,
3926                 id=None,
3927                 serial=None,
3928                 platform=None,
3929                 product=None,
3930                 serial_pty=None,
3931                 connected=False,
3932                 pre_script=None,
3933                 post_script=None,
3934                 post_flash_script=None,
3935                 runner=None):
3936
3937        self.serial = serial
3938        self.platform = platform
3939        self.serial_pty = serial_pty
3940        self._counter = Value("i", 0)
3941        self._available = Value("i", 1)
3942        self.connected = connected
3943        self.pre_script = pre_script
3944        self.id = id
3945        self.product = product
3946        self.runner = runner
3947        self.fixtures = []
3948        self.post_flash_script = post_flash_script
3949        self.post_script = post_script
3950        self.pre_script = pre_script
3951        self.probe_id = None
3952        self.notes = None
3953        self.lock = Lock()
3954        self.match = False
3955
3956
3957    @property
3958    def available(self):
3959        with self._available.get_lock():
3960            return self._available.value
3961
3962    @available.setter
3963    def available(self, value):
3964        with self._available.get_lock():
3965            self._available.value = value
3966
3967    @property
3968    def counter(self):
3969        with self._counter.get_lock():
3970            return self._counter.value
3971
3972    @counter.setter
3973    def counter(self, value):
3974        with self._counter.get_lock():
3975            self._counter.value = value
3976
3977    def to_dict(self):
3978        d = {}
3979        exclude = ['_available', '_counter', 'match']
3980        v = vars(self)
3981        for k in v.keys():
3982            if k not in exclude and v[k]:
3983                d[k] = v[k]
3984        return d
3985
3986
3987    def __repr__(self):
3988        return f"<{self.platform} ({self.product}) on {self.serial}>"
3989
3990class HardwareMap:
3991    schema_path = os.path.join(ZEPHYR_BASE, "scripts", "schemas", "twister", "hwmap-schema.yaml")
3992
3993    manufacturer = [
3994        'ARM',
3995        'SEGGER',
3996        'MBED',
3997        'STMicroelectronics',
3998        'Atmel Corp.',
3999        'Texas Instruments',
4000        'Silicon Labs',
4001        'NXP Semiconductors',
4002        'Microchip Technology Inc.',
4003        'FTDI',
4004        'Digilent'
4005    ]
4006
4007    runner_mapping = {
4008        'pyocd': [
4009            'DAPLink CMSIS-DAP',
4010            'MBED CMSIS-DAP'
4011        ],
4012        'jlink': [
4013            'J-Link',
4014            'J-Link OB'
4015        ],
4016        'openocd': [
4017            'STM32 STLink', '^XDS110.*', 'STLINK-V3'
4018        ],
4019        'dediprog': [
4020            'TTL232R-3V3',
4021            'MCP2200 USB Serial Port Emulator'
4022        ]
4023    }
4024
4025    def __init__(self):
4026        self.detected = []
4027        self.duts = []
4028
4029    def add_device(self, serial, platform, pre_script, is_pty):
4030        device = DUT(platform=platform, connected=True, pre_script=pre_script)
4031
4032        if is_pty:
4033            device.serial_pty = serial
4034        else:
4035            device.serial = serial
4036
4037        self.duts.append(device)
4038
4039    def load(self, map_file):
4040        hwm_schema = scl.yaml_load(self.schema_path)
4041        duts = scl.yaml_load_verify(map_file, hwm_schema)
4042        for dut in duts:
4043            pre_script = dut.get('pre_script')
4044            post_script = dut.get('post_script')
4045            post_flash_script = dut.get('post_flash_script')
4046            platform  = dut.get('platform')
4047            id = dut.get('id')
4048            runner = dut.get('runner')
4049            serial = dut.get('serial')
4050            product = dut.get('product')
4051            fixtures = dut.get('fixtures', [])
4052            new_dut = DUT(platform=platform,
4053                          product=product,
4054                          runner=runner,
4055                          id=id,
4056                          serial=serial,
4057                          connected=serial is not None,
4058                          pre_script=pre_script,
4059                          post_script=post_script,
4060                          post_flash_script=post_flash_script)
4061            new_dut.fixtures = fixtures
4062            new_dut.counter = 0
4063            self.duts.append(new_dut)
4064
4065    def scan(self, persistent=False):
4066        from serial.tools import list_ports
4067
4068        if persistent and platform.system() == 'Linux':
4069            # On Linux, /dev/serial/by-id provides symlinks to
4070            # '/dev/ttyACMx' nodes using names which are unique as
4071            # long as manufacturers fill out USB metadata nicely.
4072            #
4073            # This creates a map from '/dev/ttyACMx' device nodes
4074            # to '/dev/serial/by-id/usb-...' symlinks. The symlinks
4075            # go into the hardware map because they stay the same
4076            # even when the user unplugs / replugs the device.
4077            #
4078            # Some inexpensive USB/serial adapters don't result
4079            # in unique names here, though, so use of this feature
4080            # requires explicitly setting persistent=True.
4081            by_id = Path('/dev/serial/by-id')
4082            def readlink(link):
4083                return str((by_id / link).resolve())
4084
4085            persistent_map = {readlink(link): str(link)
4086                              for link in by_id.iterdir()}
4087        else:
4088            persistent_map = {}
4089
4090        serial_devices = list_ports.comports()
4091        logger.info("Scanning connected hardware...")
4092        for d in serial_devices:
4093            if d.manufacturer in self.manufacturer:
4094
4095                # TI XDS110 can have multiple serial devices for a single board
4096                # assume endpoint 0 is the serial, skip all others
4097                if d.manufacturer == 'Texas Instruments' and not d.location.endswith('0'):
4098                    continue
4099                s_dev = DUT(platform="unknown",
4100                                        id=d.serial_number,
4101                                        serial=persistent_map.get(d.device, d.device),
4102                                        product=d.product,
4103                                        runner='unknown',
4104                                        connected=True)
4105
4106                for runner, _ in self.runner_mapping.items():
4107                    products = self.runner_mapping.get(runner)
4108                    if d.product in products:
4109                        s_dev.runner = runner
4110                        continue
4111                    # Try regex matching
4112                    for p in products:
4113                        if re.match(p, d.product):
4114                            s_dev.runner = runner
4115
4116                s_dev.connected = True
4117                self.detected.append(s_dev)
4118            else:
4119                logger.warning("Unsupported device (%s): %s" % (d.manufacturer, d))
4120
4121    def save(self, hwm_file):
4122        # use existing map
4123        self.detected.sort(key=lambda x: x.serial or '')
4124        if os.path.exists(hwm_file):
4125            with open(hwm_file, 'r') as yaml_file:
4126                hwm = yaml.load(yaml_file, Loader=SafeLoader)
4127                if hwm:
4128                    hwm.sort(key=lambda x: x['serial'] or '')
4129
4130                    # disconnect everything
4131                    for h in hwm:
4132                        h['connected'] = False
4133                        h['serial'] = None
4134
4135                    for _detected in self.detected:
4136                        for h in hwm:
4137                            if _detected.id == h['id'] and _detected.product == h['product'] and not _detected.match:
4138                                h['connected'] = True
4139                                h['serial'] = _detected.serial
4140                                _detected.match = True
4141
4142                new_duts = list(filter(lambda d: not d.match, self.detected))
4143                new = []
4144                for d in new_duts:
4145                    new.append(d.to_dict())
4146
4147                if hwm:
4148                    hwm = hwm + new
4149                else:
4150                    hwm = new
4151
4152            with open(hwm_file, 'w') as yaml_file:
4153                yaml.dump(hwm, yaml_file, Dumper=Dumper, default_flow_style=False)
4154
4155            self.load(hwm_file)
4156            logger.info("Registered devices:")
4157            self.dump()
4158
4159        else:
4160            # create new file
4161            dl = []
4162            for _connected in self.detected:
4163                platform  = _connected.platform
4164                id = _connected.id
4165                runner = _connected.runner
4166                serial = _connected.serial
4167                product = _connected.product
4168                d = {
4169                    'platform': platform,
4170                    'id': id,
4171                    'runner': runner,
4172                    'serial': serial,
4173                    'product': product,
4174                    'connected': _connected.connected
4175                }
4176                dl.append(d)
4177            with open(hwm_file, 'w') as yaml_file:
4178                yaml.dump(dl, yaml_file, Dumper=Dumper, default_flow_style=False)
4179            logger.info("Detected devices:")
4180            self.dump(detected=True)
4181
4182    def dump(self, filtered=[], header=[], connected_only=False, detected=False):
4183        print("")
4184        table = []
4185        if detected:
4186            to_show = self.detected
4187        else:
4188            to_show = self.duts
4189
4190        if not header:
4191            header = ["Platform", "ID", "Serial device"]
4192        for p in to_show:
4193            platform = p.platform
4194            connected = p.connected
4195            if filtered and platform not in filtered:
4196                continue
4197
4198            if not connected_only or connected:
4199                table.append([platform, p.id, p.serial])
4200
4201        print(tabulate(table, headers=header, tablefmt="github"))
4202