1#!/usr/bin/env python3
2# vim: set syntax=python ts=4 :
4# Copyright (c) 2018-2024 Intel Corporation
5# Copyright (c) 2024 Arm Limited (or its affiliates). All rights reserved.
7# SPDX-License-Identifier: Apache-2.0
8import collections
9import copy
10import itertools
11import json
12import logging
13import os
14import random
15import re
16import subprocess
17import sys
18from argparse import Namespace
19from collections import OrderedDict
20from itertools import islice
21from pathlib import Path
23import snippets
26    from anytree import Node, RenderTree, find
27except ImportError:
28    print("Install the anytree module to use the --test-tree option")
30import scl
31from twisterlib.config_parser import TwisterConfigParser
32from twisterlib.error import TwisterRuntimeError
33from twisterlib.platform import Platform, generate_platforms
34from twisterlib.quarantine import Quarantine
35from twisterlib.statuses import TwisterStatus
36from twisterlib.testinstance import TestInstance
37from twisterlib.testsuite import TestSuite, scan_testsuite_path
38from zephyr_module import parse_modules
40logger = logging.getLogger('twister')
43ZEPHYR_BASE = os.getenv("ZEPHYR_BASE")
44if not ZEPHYR_BASE:
45    sys.exit("$ZEPHYR_BASE environment variable undefined")
47# This is needed to load edt.pickle files.
48sys.path.insert(0, os.path.join(ZEPHYR_BASE, "scripts", "dts",
49                                "python-devicetree", "src"))
50from devicetree import edtlib  # pylint: disable=unused-import
52sys.path.insert(0, os.path.join(ZEPHYR_BASE, "scripts/"))
54class Filters:
55    # platform keys
56    PLATFORM_KEY = 'platform key filter'
57    # filters provided on command line by the user/tester
58    CMD_LINE = 'command line filter'
59    # filters in the testsuite yaml definition
60    TESTSUITE = 'testsuite filter'
61    # filters in the testplan yaml definition
62    TESTPLAN = 'testplan filter'
63    # filters related to platform definition
64    PLATFORM = 'Platform related filter'
65    # in case a test suite was quarantined.
66    QUARANTINE = 'Quarantine filter'
67    # in case a test suite is skipped intentionally .
68    SKIP = 'Skip filter'
69    # in case of incompatibility between selected and allowed toolchains.
70    TOOLCHAIN = 'Toolchain filter'
71    # in case where an optional module is not available
72    MODULE = 'Module filter'
73    # in case of missing env. variable required for a platform
74    ENVIRONMENT = 'Environment filter'
77class TestLevel:
78    name = None
79    levels = []
80    scenarios = []
83class TestPlan:
84    __test__ = False  # for pytest to skip this class when collects tests
85    config_re = re.compile('(CONFIG_[A-Za-z0-9_]+)[=]\"?([^\"]*)\"?$')
86    dt_re = re.compile('([A-Za-z0-9_]+)[=]\"?([^\"]*)\"?$')
88    suite_schema = scl.yaml_load(
89        os.path.join(ZEPHYR_BASE,
90                     "scripts", "schemas", "twister", "testsuite-schema.yaml"))
91    quarantine_schema = scl.yaml_load(
92        os.path.join(ZEPHYR_BASE,
93                     "scripts", "schemas", "twister", "quarantine-schema.yaml"))
95    tc_schema_path = os.path.join(
96        ZEPHYR_BASE,
97        "scripts",
98        "schemas",
99        "twister",
100        "test-config-schema.yaml"
101    )
103    SAMPLE_FILENAME = 'sample.yaml'
104    TESTSUITE_FILENAME = 'testcase.yaml'
106    def __init__(self, env: Namespace):
108        self.options = env.options
109        self.env = env
111        # Keep track of which test cases we've filtered out and why
112        self.testsuites = {}
113        self.quarantine = None
114        self.platforms = []
115        self.platform_names = []
116        self.selected_platforms = []
117        self.default_platforms = []
118        self.load_errors = 0
119        self.instances = dict()
120        self.instance_fail_count = 0
121        self.warnings = 0
123        self.scenarios = []
125        self.hwm = env.hwm
126        # used during creating shorter build paths
127        self.link_dir_counter = 0
128        self.modules = []
130        self.run_individual_testsuite = []
131        self.levels = []
132        self.test_config =  {}
134        self.name = "unnamed"
136    def get_level(self, name):
137        level = next((lvl for lvl in self.levels if lvl.name == name), None)
138        return level
140    def parse_configuration(self, config_file):
141        if os.path.exists(config_file):
142            tc_schema = scl.yaml_load(self.tc_schema_path)
143            self.test_config = scl.yaml_load_verify(config_file, tc_schema)
144        else:
145            raise TwisterRuntimeError(f"File {config_file} not found.")
147        levels = self.test_config.get('levels', [])
149        # Do first pass on levels to get initial data.
150        for level in levels:
151            adds = []
152            for s in  level.get('adds', []):
153                r = re.compile(s)
154                adds.extend(list(filter(r.fullmatch, self.scenarios)))
156            tl = TestLevel()
157            tl.name = level['name']
158            tl.scenarios = adds
159            tl.levels = level.get('inherits', [])
160            self.levels.append(tl)
162        # Go over levels again to resolve inheritance.
163        for level in levels:
164            inherit = level.get('inherits', [])
165            _level = self.get_level(level['name'])
166            if inherit:
167                for inherted_level in inherit:
168                    _inherited = self.get_level(inherted_level)
169                    assert _inherited, "Unknown inherited level {inherted_level}"
170                    _inherited_scenarios = _inherited.scenarios
171                    level_scenarios = _level.scenarios if _level else []
172                    level_scenarios.extend(_inherited_scenarios)
174    def find_subtests(self):
175        sub_tests = self.options.sub_test
176        if sub_tests:
177            for subtest in sub_tests:
178                _subtests = self.get_testcase(subtest)
179                for _subtest in _subtests:
180                    self.run_individual_testsuite.append(_subtest.name)
182            if self.run_individual_testsuite:
183                logger.info("Running the following tests:")
184                for test in self.run_individual_testsuite:
185                    print(f" - {test}")
186            else:
187                raise TwisterRuntimeError("Tests not found")
189    def discover(self):
190        self.handle_modules()
191        if self.options.test:
192            self.run_individual_testsuite = self.options.test
194        self.add_configurations()
195        num = self.add_testsuites(testsuite_filter=self.run_individual_testsuite)
196        if num == 0:
197            raise TwisterRuntimeError("No testsuites found at the specified location...")
198        if self.load_errors:
199            raise TwisterRuntimeError(
200                f"Found {self.load_errors} errors loading {num} test configurations."
201            )
203        self.find_subtests()
204        # get list of scenarios we have parsed into one list
205        for _, ts in self.testsuites.items():
206            self.scenarios.append(ts.id)
208        self.report_duplicates()
209        self.parse_configuration(config_file=self.env.test_config)
211        # handle quarantine
212        ql = self.options.quarantine_list
213        qv = self.options.quarantine_verify
214        if qv and not ql:
215            logger.error("No quarantine list given to be verified")
216            raise TwisterRuntimeError("No quarantine list given to be verified")
217        if ql:
218            for quarantine_file in ql:
219                try:
220                    # validate quarantine yaml file against the provided schema
221                    scl.yaml_load_verify(quarantine_file, self.quarantine_schema)
222                except scl.EmptyYamlFileException:
223                    logger.debug(f'Quarantine file {quarantine_file} is empty')
224            self.quarantine = Quarantine(ql)
226    def load(self):
228        if self.options.report_suffix:
229            last_run = os.path.join(
230                self.options.outdir,
231                f"twister_{self.options.report_suffix}.json"
232            )
233        else:
234            last_run = os.path.join(self.options.outdir, "twister.json")
236        if self.options.only_failed or self.options.report_summary is not None:
237            self.load_from_file(last_run)
238            self.selected_platforms = set(p.platform.name for p in self.instances.values())
239        elif self.options.load_tests:
240            self.load_from_file(self.options.load_tests)
241            self.selected_platforms = set(p.platform.name for p in self.instances.values())
242        elif self.options.test_only:
243            # Get list of connected hardware and filter tests to only be run on connected hardware.
244            # If the platform does not exist in the hardware map or was not specified by --platform,
245            # just skip it.
247            connected_list = []
248            excluded_list = []
249            for _cp in self.options.platform:
250                if _cp in self.platform_names:
251                    connected_list.append(self.get_platform(_cp).name)
253            if self.options.exclude_platform:
254                for _p in self.options.exclude_platform:
255                    if _p in self.platform_names:
256                        excluded_list.append(self.get_platform(_p).name)
257                for excluded in excluded_list:
258                    if excluded in connected_list:
259                        connected_list.remove(excluded)
261            self.load_from_file(last_run, filter_platform=connected_list)
262            self.selected_platforms = set(p.platform.name for p in self.instances.values())
263        else:
264            self.apply_filters()
266        if self.options.subset:
267            s =  self.options.subset
268            try:
269                subset, sets = (int(x) for x in s.split("/"))
270            except ValueError as err:
271                raise TwisterRuntimeError("Bad subset value.") from err
273            if subset > sets:
274                raise TwisterRuntimeError("subset should not exceed the total number of sets")
276            if int(subset) > 0 and int(sets) >= int(subset):
277                logger.info(f"Running only a subset: {subset}/{sets}")
278            else:
279                raise TwisterRuntimeError(
280                    f"You have provided a wrong subset value: {self.options.subset}."
281                )
283            self.generate_subset(subset, int(sets))
285    def generate_subset(self, subset, sets):
286        # Test instances are sorted depending on the context. For CI runs
287        # the execution order is: "plat1-testA, plat1-testB, ...,
288        # plat1-testZ, plat2-testA, ...". For hardware tests
289        # (device_testing), were multiple physical platforms can run the tests
290        # in parallel, it is more efficient to run in the order:
291        # "plat1-testA, plat2-testA, ..., plat1-testB, plat2-testB, ..."
292        if self.options.device_testing:
293            self.instances = OrderedDict(sorted(self.instances.items(),
294                                key=lambda x: x[0][x[0].find("/") + 1:]))
295        else:
296            self.instances = OrderedDict(sorted(self.instances.items()))
298        if self.options.shuffle_tests:
299            seed_value = int.from_bytes(os.urandom(8), byteorder="big")
300            if self.options.shuffle_tests_seed is not None:
301                seed_value = self.options.shuffle_tests_seed
303            logger.info(f"Shuffle tests with seed: {seed_value}")
304            random.seed(seed_value)
305            temp_list = list(self.instances.items())
306            random.shuffle(temp_list)
307            self.instances = OrderedDict(temp_list)
309        # Do calculation based on what is actually going to be run and evaluated
310        # at runtime, ignore the cases we already know going to be skipped.
311        # This fixes an issue where some sets would get majority of skips and
312        # basically run nothing beside filtering.
313        to_run = {k : v for k,v in self.instances.items() if v.status == TwisterStatus.NONE}
314        total = len(to_run)
315        per_set = int(total / sets)
316        num_extra_sets = total - (per_set * sets)
318        # Try and be more fair for rounding error with integer division
319        # so the last subset doesn't get overloaded, we add 1 extra to
320        # subsets 1..num_extra_sets.
321        if subset <= num_extra_sets:
322            start = (subset - 1) * (per_set + 1)
323            end = start + per_set + 1
324        else:
325            base = num_extra_sets * (per_set + 1)
326            start = ((subset - num_extra_sets - 1) * per_set) + base
327            end = start + per_set
329        sliced_instances = islice(to_run.items(), start, end)
330        skipped = {k : v for k,v in self.instances.items() if v.status == TwisterStatus.SKIP}
331        errors = {k : v for k,v in self.instances.items() if v.status == TwisterStatus.ERROR}
332        self.instances = OrderedDict(sliced_instances)
333        if subset == 1:
334            # add all pre-filtered tests that are skipped or got error status
335            # to the first set to allow for better distribution among all sets.
336            self.instances.update(skipped)
337            self.instances.update(errors)
340    def handle_modules(self):
341        # get all enabled west projects
342        modules_meta = parse_modules(ZEPHYR_BASE)
343        self.modules = [module.meta.get('name') for module in modules_meta]
346    def report(self):
347        if self.options.test_tree:
348            if not self.options.detailed_test_id:
349                logger.info("Test tree is always shown with detailed test-id.")
350            self.report_test_tree()
351            return 0
352        elif self.options.list_tests:
353            if not self.options.detailed_test_id:
354                logger.info("Test list is always shown with detailed test-id.")
355            self.report_test_list()
356            return 0
357        elif self.options.list_tags:
358            self.report_tag_list()
359            return 0
361        return 1
363    def report_duplicates(self):
364        dupes = [item for item, count in collections.Counter(self.scenarios).items() if count > 1]
365        if dupes:
366            msg = "Duplicated test scenarios found:\n"
367            for dupe in dupes:
368                msg += (f"- {dupe} found in:\n")
369                for dc in self.get_testsuite(dupe):
370                    msg += (f"  - {dc.yamlfile}\n")
371            raise TwisterRuntimeError(msg)
372        else:
373            logger.debug("No duplicates found.")
375    def report_tag_list(self):
376        tags = set()
377        for _, tc in self.testsuites.items():
378            tags = tags.union(tc.tags)
380        for t in tags:
381            print(f"- {t}")
383    def report_test_tree(self):
384        tests_list = self.get_tests_list()
386        testsuite = Node("Testsuite")
387        samples = Node("Samples", parent=testsuite)
388        tests = Node("Tests", parent=testsuite)
390        for test in sorted(tests_list):
391            if test.startswith("sample."):
392                sec = test.split(".")
393                area = find(
394                    samples,
395                    lambda node, sname=sec[1]: node.name == sname and node.parent == samples
396                )
397                if not area:
398                    area = Node(sec[1], parent=samples)
400                Node(test, parent=area)
401            else:
402                sec = test.split(".")
403                area = find(
404                    tests,
405                    lambda node, sname=sec[0]: node.name == sname and node.parent == tests
406                )
407                if not area:
408                    area = Node(sec[0], parent=tests)
410                if area and len(sec) > 2:
411                    subarea = find(
412                        area, lambda node, sname=sec[1], sparent=area: node.name == sname
413                        and node.parent == sparent
414                    )
415                    if not subarea:
416                        subarea = Node(sec[1], parent=area)
417                    Node(test, parent=subarea)
419        for pre, _, node in RenderTree(testsuite):
420            print(f"{pre}{node.name}")
422    def report_test_list(self):
423        tests_list = self.get_tests_list()
425        cnt = 0
426        for test in sorted(tests_list):
427            cnt = cnt + 1
428            print(f" - {test}")
429        print(f"{cnt} total.")
432    # Debug Functions
433    @staticmethod
434    def info(what):
435        sys.stdout.write(what + "\n")
436        sys.stdout.flush()
438    def add_configurations(self):
439        # Create a list of board roots as defined by the build system in general
440        # Note, internally in twister a board root includes the `boards` folder
441        # but in Zephyr build system, the board root is without the `boards` in folder path.
442        board_roots = [Path(os.path.dirname(root)) for root in self.env.board_roots]
443        soc_roots = self.env.soc_roots
444        arch_roots = self.env.arch_roots
446        platform_config = self.test_config.get('platforms', {})
448        for platform in generate_platforms(board_roots, soc_roots, arch_roots):
449            if not platform.twister:
450                continue
451            self.platforms.append(platform)
453            if not platform_config.get('override_default_platforms', False):
454                if platform.default:
455                    self.default_platforms.append(platform.name)
456                    #logger.debug(f"adding {platform.name} to default platforms")
457                continue
458            for pp in platform_config.get('default_platforms', []):
459                if pp in platform.aliases:
460                    logger.debug(f"adding {platform.name} to default platforms (override  mode)")
461                    self.default_platforms.append(platform.name)
463        self.platform_names = [a for p in self.platforms for a in p.aliases]
465    def get_all_tests(self):
466        testcases = []
467        for _, ts in self.testsuites.items():
468            for case in ts.testcases:
469                testcases.append(case.name)
471        return testcases
473    def get_tests_list(self):
474        testcases = []
475        if tag_filter := self.options.tag:
476            for _, ts in self.testsuites.items():
477                if ts.tags.intersection(tag_filter):
478                    for case in ts.testcases:
479                        testcases.append(case.detailed_name)
480        else:
481            for _, ts in self.testsuites.items():
482                for case in ts.testcases:
483                    testcases.append(case.detailed_name)
485        if exclude_tag := self.options.exclude_tag:
486            for _, ts in self.testsuites.items():
487                if ts.tags.intersection(exclude_tag):
488                    for case in ts.testcases:
489                        if case.detailed_name in testcases:
490                            testcases.remove(case.detailed_name)
491        return testcases
493    def add_testsuites(self, testsuite_filter=None):
494        if testsuite_filter is None:
495            testsuite_filter = []
496        for root in self.env.test_roots:
497            root = os.path.abspath(root)
499            logger.debug(f"Reading testsuite configuration files under {root}...")
501            for dirpath, _, filenames in os.walk(root, topdown=True):
502                if self.SAMPLE_FILENAME in filenames:
503                    filename = self.SAMPLE_FILENAME
504                elif self.TESTSUITE_FILENAME in filenames:
505                    filename = self.TESTSUITE_FILENAME
506                else:
507                    continue
509                logger.debug("Found possible testsuite in " + dirpath)
511                suite_yaml_path = os.path.join(dirpath, filename)
512                suite_path = os.path.dirname(suite_yaml_path)
514                for alt_config_root in self.env.alt_config_root:
515                    alt_config = os.path.join(os.path.abspath(alt_config_root),
516                                              os.path.relpath(suite_path, root),
517                                              filename)
518                    if os.path.exists(alt_config):
519                        logger.info(
520                            f"Using alternative configuration from {os.path.normpath(alt_config)}"
521                        )
522                        suite_yaml_path = alt_config
523                        break
525                try:
526                    parsed_data = TwisterConfigParser(suite_yaml_path, self.suite_schema)
527                    parsed_data.load()
528                    subcases = None
529                    ztest_suite_names = None
531                    for name in parsed_data.scenarios:
532                        suite_dict = parsed_data.get_scenario(name)
533                        suite = TestSuite(
534                            root,
535                            suite_path,
536                            name,
537                            data=suite_dict,
538                            detailed_test_id=self.options.detailed_test_id
539                        )
541                        # convert to fully qualified names
542                        suite.integration_platforms = self.verify_platforms_existence(
543                                suite.integration_platforms,
544                                f"integration_platforms in {suite.name}")
545                        suite.platform_exclude = self.verify_platforms_existence(
546                                suite.platform_exclude,
547                                f"platform_exclude in {suite.name}")
548                        suite.platform_allow =  self.verify_platforms_existence(
549                                suite.platform_allow,
550                                f"platform_allow in {suite.name}")
552                        if suite.harness in ['ztest', 'test']:
553                            if subcases is None:
554                                # scan it only once per testsuite
555                                subcases, ztest_suite_names = scan_testsuite_path(suite_path)
556                            suite.add_subcases(suite_dict, subcases, ztest_suite_names)
557                        else:
558                            suite.add_subcases(suite_dict)
560                        if testsuite_filter:
561                            scenario = os.path.basename(suite.name)
562                            if (
563                                suite.name
564                                and (suite.name in testsuite_filter or scenario in testsuite_filter)
565                            ):
566                                self.testsuites[suite.name] = suite
567                        elif suite.name in self.testsuites:
568                            msg = (
569                                f"test suite '{suite.name}' in '{suite.yamlfile}' is already added"
570                            )
571                            if suite.yamlfile == self.testsuites[suite.name].yamlfile:
572                                logger.debug(f"Skip - {msg}")
573                            else:
574                                msg = (
575                                    f"Duplicate {msg} from '{self.testsuites[suite.name].yamlfile}'"
576                                )
577                                raise TwisterRuntimeError(msg)
578                        else:
579                            self.testsuites[suite.name] = suite
581                except Exception as e:
582                    logger.error(f"{suite_path}: can't load (skipping): {e!r}")
583                    self.load_errors += 1
584        return len(self.testsuites)
586    def __str__(self):
587        return self.name
589    def get_platform(self, name):
590        selected_platform = None
591        for platform in self.platforms:
592            if name in platform.aliases:
593                selected_platform = platform
594                break
595        return selected_platform
597    def handle_quarantined_tests(self, instance: TestInstance, plat: Platform):
598        if self.quarantine:
599            simulator = plat.simulator_by_name(self.options)
600            matched_quarantine = self.quarantine.get_matched_quarantine(
601                instance.testsuite.id,
602                plat.name,
603                plat.arch,
604                simulator.name if simulator is not None else 'na'
605            )
606            if matched_quarantine and not self.options.quarantine_verify:
607                instance.add_filter("Quarantine: " + matched_quarantine, Filters.QUARANTINE)
608                return
609            if not matched_quarantine and self.options.quarantine_verify:
610                instance.add_filter("Not under quarantine", Filters.QUARANTINE)
612    def load_from_file(self, file, filter_platform=None):
613        if filter_platform is None:
614            filter_platform = []
615        try:
616            with open(file) as json_test_plan:
617                jtp = json.load(json_test_plan)
618                instance_list = []
619                for ts in jtp.get("testsuites", []):
620                    logger.debug(f"loading {ts['name']}...")
621                    testsuite = ts["name"]
622                    toolchain = ts["toolchain"]
624                    platform = self.get_platform(ts["platform"])
625                    if filter_platform and platform.name not in filter_platform:
626                        continue
627                    instance = TestInstance(
628                        self.testsuites[testsuite], platform, toolchain, self.env.outdir
629                    )
630                    if ts.get("run_id"):
631                        instance.run_id = ts.get("run_id")
633                    instance.run = instance.check_runnable(
634                        self.options,
635                        self.hwm
636                    )
638                    if self.options.test_only and not instance.run:
639                        continue
641                    instance.metrics['handler_time'] = ts.get('execution_time', 0)
642                    instance.metrics['used_ram'] = ts.get("used_ram", 0)
643                    instance.metrics['used_rom']  = ts.get("used_rom",0)
644                    instance.metrics['available_ram'] = ts.get('available_ram', 0)
645                    instance.metrics['available_rom'] = ts.get('available_rom', 0)
647                    status = TwisterStatus(ts.get('status'))
648                    reason = ts.get("reason", "Unknown")
649                    if status in [TwisterStatus.ERROR, TwisterStatus.FAIL]:
650                        if self.options.report_summary is not None:
651                            instance.status = status
652                            instance.reason = reason
653                            self.instance_fail_count += 1
654                        else:
655                            instance.status = TwisterStatus.NONE
656                            instance.reason = None
657                            instance.retries += 1
658                    # test marked as built only can run when --test-only is used.
659                    # Reset status to capture new results.
660                    elif status == TwisterStatus.NOTRUN and instance.run and self.options.test_only:
661                        instance.status = TwisterStatus.NONE
662                        instance.reason = None
663                    else:
664                        instance.status = status
665                        instance.reason = reason
667                    self.handle_quarantined_tests(instance, platform)
669                    for tc in ts.get('testcases', []):
670                        identifier = tc['identifier']
671                        tc_status = TwisterStatus(tc.get('status'))
672                        tc_reason = None
673                        # we set reason only if status is valid, it might have been
674                        # reset above...
675                        if instance.status != TwisterStatus.NONE:
676                            tc_reason = tc.get('reason')
677                        if tc_status != TwisterStatus.NONE:
678                            case = instance.set_case_status_by_name(
679                                identifier,
680                                tc_status,
681                                tc_reason
682                            )
683                            case.duration = tc.get('execution_time', 0)
684                            if tc.get('log'):
685                                case.output = tc.get('log')
687                    instance.create_overlay(platform,
688                                            self.options.enable_asan,
689                                            self.options.enable_ubsan,
690                                            self.options.enable_coverage,
691                                            self.options.coverage_platform
692                                            )
693                    instance_list.append(instance)
694                self.add_instances(instance_list)
695        except FileNotFoundError as e:
696            logger.error(f"{e}")
697            return 1
699    def check_platform(self, platform, platform_list):
700        return any(p in platform.aliases for p in platform_list)
702    def apply_filters(self, **kwargs):
704        platform_filter = self.options.platform
705        vendor_filter = self.options.vendor
706        exclude_platform = self.options.exclude_platform
707        testsuite_filter = self.run_individual_testsuite
708        arch_filter = self.options.arch
709        tag_filter = self.options.tag
710        exclude_tag = self.options.exclude_tag
711        all_filter = self.options.all
712        runnable = (self.options.device_testing or self.options.filter == 'runnable')
713        force_toolchain = self.options.force_toolchain
714        force_platform = self.options.force_platform
715        slow_only = self.options.enable_slow_only
716        ignore_platform_key = self.options.ignore_platform_key
717        emu_filter = self.options.emulation_only
719        logger.debug("platform filter: " + str(platform_filter))
720        logger.debug("  vendor filter: " + str(vendor_filter))
721        logger.debug("    arch_filter: " + str(arch_filter))
722        logger.debug("     tag_filter: " + str(tag_filter))
723        logger.debug("    exclude_tag: " + str(exclude_tag))
725        default_platforms = False
726        vendor_platforms = False
727        emulation_platforms = False
729        if all_filter:
730            logger.info("Selecting all possible platforms per testsuite scenario")
731            # When --all used, any --platform arguments ignored
732            platform_filter = []
733        elif not platform_filter and not emu_filter and not vendor_filter:
734            logger.info("Selecting default platforms per testsuite scenario")
735            default_platforms = True
736        elif emu_filter:
737            logger.info("Selecting emulation platforms per testsuite scenraio")
738            emulation_platforms = True
739        elif vendor_filter:
740            vendor_platforms = True
742        _platforms = []
743        if platform_filter:
744            logger.debug(f"Checking platform filter: {platform_filter}")
745            # find in aliases and rename
746            platform_filter = self.verify_platforms_existence(platform_filter, "platform_filter")
747            platforms = list(filter(lambda p: p.name in platform_filter, self.platforms))
748        elif emu_filter:
749            platforms = list(
750                filter(lambda p: bool(p.simulator_by_name(self.options.sim_name)), self.platforms)
751            )
752        elif vendor_filter:
753            platforms = list(filter(lambda p: p.vendor in vendor_filter, self.platforms))
754            logger.info(f"Selecting platforms by vendors: {','.join(vendor_filter)}")
755        elif arch_filter:
756            platforms = list(filter(lambda p: p.arch in arch_filter, self.platforms))
757        elif default_platforms:
758            _platforms = list(filter(lambda p: p.name in self.default_platforms, self.platforms))
759            platforms = []
760            # default platforms that can't be run are dropped from the list of
761            # the default platforms list. Default platforms should always be
762            # runnable.
763            for p in _platforms:
764                sim = p.simulator_by_name(self.options.sim_name)
765                if (not sim) or sim.is_runnable():
766                    platforms.append(p)
767        else:
768            platforms = self.platforms
770        platform_config = self.test_config.get('platforms', {})
771        logger.info("Building initial testsuite list...")
773        keyed_tests = {}
775        for _, ts in self.testsuites.items():
776            if (
777                ts.build_on_all
778                and not platform_filter
779                and platform_config.get('increased_platform_scope', True)
780            ):
781                platform_scope = self.platforms
782            elif ts.integration_platforms:
783                integration_platforms = list(
784                    filter(lambda item: item.name in ts.integration_platforms, self.platforms)
785                )
786                if self.options.integration:
787                    platform_scope = integration_platforms
788                else:
789                    # if not in integration mode, still add integration platforms to the list
790                    if not platform_filter:
791                        platform_scope = platforms + integration_platforms
792                    else:
793                        platform_scope = platforms
794            else:
795                platform_scope = platforms
797            integration = self.options.integration and ts.integration_platforms
799            # If there isn't any overlap between the platform_allow list and the platform_scope
800            # we set the scope to the platform_allow list
801            if (
802                ts.platform_allow
803                and not platform_filter
804                and not integration
805                and platform_config.get('increased_platform_scope', True)
806            ):
807                a = set(platform_scope)
808                b = set(filter(lambda item: item.name in ts.platform_allow, self.platforms))
809                c = a.intersection(b)
810                if not c:
811                    platform_scope = list(
812                        filter(lambda item: item.name in ts.platform_allow, self.platforms)
813                    )
814            # list of instances per testsuite, aka configurations.
815            instance_list = []
816            for itoolchain, plat in itertools.product(
817                ts.integration_toolchains or [None], platform_scope
818            ):
819                if itoolchain:
820                    toolchain = itoolchain
821                elif plat.arch in ['posix', 'unit']:
822                    # workaround until toolchain variant in zephyr is overhauled and improved.
823                    if self.env.toolchain in ['llvm']:
824                        toolchain = 'llvm'
825                    else:
826                        toolchain = 'host'
827                else:
828                    toolchain = "zephyr" if not self.env.toolchain else self.env.toolchain
830                instance = TestInstance(ts, plat, toolchain, self.env.outdir)
831                instance.run = instance.check_runnable(
832                    self.options,
833                    self.hwm
834                )
836                if not force_platform and self.check_platform(plat,exclude_platform):
837                    instance.add_filter("Platform is excluded on command line.", Filters.CMD_LINE)
839                if (plat.arch == "unit") != (ts.type == "unit"):
840                    # Discard silently
841                    continue
843                if ts.modules and self.modules and not set(ts.modules).issubset(set(self.modules)):
844                    instance.add_filter(
845                        f"one or more required modules not available: {','.join(ts.modules)}",
846                        Filters.MODULE
847                    )
849                if self.options.level:
850                    tl = self.get_level(self.options.level)
851                    if tl is None:
852                        instance.add_filter(
853                            f"Unknown test level '{self.options.level}'",
854                            Filters.TESTPLAN
855                        )
856                    else:
857                        planned_scenarios = tl.scenarios
858                        if (
859                            ts.id not in planned_scenarios
860                            and not set(ts.levels).intersection(set(tl.levels))
861                        ):
862                            instance.add_filter("Not part of requested test plan", Filters.TESTPLAN)
864                if runnable and not instance.run:
865                    instance.add_filter("Not runnable on device", Filters.CMD_LINE)
867                if (
868                    self.options.integration
869                    and ts.integration_platforms
870                    and plat.name not in ts.integration_platforms
871                ):
872                    instance.add_filter("Not part of integration platforms", Filters.TESTSUITE)
874                if ts.skip:
875                    instance.add_filter("Skip filter", Filters.SKIP)
877                if tag_filter and not ts.tags.intersection(tag_filter):
878                    instance.add_filter("Command line testsuite tag filter", Filters.CMD_LINE)
880                if slow_only and not ts.slow:
881                    instance.add_filter("Not a slow test", Filters.CMD_LINE)
883                if exclude_tag and ts.tags.intersection(exclude_tag):
884                    instance.add_filter("Command line testsuite exclude filter", Filters.CMD_LINE)
886                if testsuite_filter:
887                    normalized_f = [os.path.basename(_ts) for _ts in testsuite_filter]
888                    if ts.id not in normalized_f:
889                        instance.add_filter("Testsuite name filter", Filters.CMD_LINE)
891                if arch_filter and plat.arch not in arch_filter:
892                    instance.add_filter("Command line testsuite arch filter", Filters.CMD_LINE)
894                if not force_platform:
896                    if ts.arch_allow and plat.arch not in ts.arch_allow:
897                        instance.add_filter("Not in testsuite arch allow list", Filters.TESTSUITE)
899                    if ts.arch_exclude and plat.arch in ts.arch_exclude:
900                        instance.add_filter("In testsuite arch exclude", Filters.TESTSUITE)
902                    if ts.vendor_allow and plat.vendor not in ts.vendor_allow:
903                        instance.add_filter(
904                            "Not in testsuite vendor allow list",
905                            Filters.TESTSUITE
906                        )
908                    if ts.vendor_exclude and plat.vendor in ts.vendor_exclude:
909                        instance.add_filter("In testsuite vendor exclude", Filters.TESTSUITE)
911                    if ts.platform_exclude and plat.name in ts.platform_exclude:
912                        instance.add_filter("In testsuite platform exclude", Filters.TESTSUITE)
914                if ts.toolchain_exclude and toolchain in ts.toolchain_exclude:
915                    instance.add_filter("In testsuite toolchain exclude", Filters.TOOLCHAIN)
917                if platform_filter and plat.name not in platform_filter:
918                    instance.add_filter("Command line platform filter", Filters.CMD_LINE)
920                if ts.platform_allow \
921                        and plat.name not in ts.platform_allow \
922                        and not (platform_filter and force_platform):
923                    instance.add_filter("Not in testsuite platform allow list", Filters.TESTSUITE)
925                if ts.platform_type and plat.type not in ts.platform_type:
926                    instance.add_filter("Not in testsuite platform type list", Filters.TESTSUITE)
928                if ts.toolchain_allow and toolchain not in ts.toolchain_allow:
929                    instance.add_filter("Not in testsuite toolchain allow list", Filters.TOOLCHAIN)
931                if not plat.env_satisfied:
932                    instance.add_filter(
933                        "Environment ({}) not satisfied".format(", ".join(plat.env)),
934                        Filters.ENVIRONMENT
935                    )
936                if plat.type == 'native' and sys.platform != 'linux':
937                    instance.add_filter("Native platform requires Linux", Filters.ENVIRONMENT)
939                if not force_toolchain \
940                        and toolchain and (toolchain not in plat.supported_toolchains):
941                    instance.add_filter(
942                        f"Not supported by the toolchain: {toolchain}",
943                        Filters.PLATFORM
944                    )
946                if plat.ram < ts.min_ram:
947                    instance.add_filter("Not enough RAM", Filters.PLATFORM)
949                if ts.harness:
950                    sim = plat.simulator_by_name(self.options.sim_name)
951                    if ts.harness == 'robot' and not (sim and sim.name == 'renode'):
952                        instance.add_filter(
953                            "No robot support for the selected platform",
954                            Filters.SKIP
955                        )
957                if ts.depends_on:
958                    dep_intersection = ts.depends_on.intersection(set(plat.supported))
959                    if dep_intersection != set(ts.depends_on):
960                        instance.add_filter(
961                            f"No hardware support for {set(ts.depends_on)-dep_intersection}",
962                            Filters.PLATFORM
963                        )
965                if plat.flash < ts.min_flash:
966                    instance.add_filter("Not enough FLASH", Filters.PLATFORM)
968                if set(plat.ignore_tags) & ts.tags:
969                    instance.add_filter(
970                        "Excluded tags per platform (exclude_tags)",
971                        Filters.PLATFORM
972                    )
974                if plat.only_tags and not set(plat.only_tags) & ts.tags:
975                    instance.add_filter("Excluded tags per platform (only_tags)", Filters.PLATFORM)
977                if ts.required_snippets:
978                    missing_snippet = False
979                    snippet_args = {"snippets": ts.required_snippets}
980                    found_snippets = snippets.find_snippets_in_roots(
981                        snippet_args,
982                        [*self.env.snippet_roots, Path(ts.source_dir)]
983                    )
985                    # Search and check that all required snippet files are found
986                    for this_snippet in snippet_args['snippets']:
987                        if this_snippet not in found_snippets:
988                            logger.error(
989                                f"Can't find snippet '{this_snippet}' for test '{ts.name}'"
990                            )
991                            instance.status = TwisterStatus.ERROR
992                            instance.reason = f"Snippet {this_snippet} not found"
993                            missing_snippet = True
994                            break
996                    if not missing_snippet:
997                        # Look for required snippets and check that they are applicable for these
998                        # platforms/boards
999                        for this_snippet in snippet_args['snippets']:
1000                            matched_snippet_board = False
1002                            # If the "appends" key is present with at least one entry then this
1003                            # snippet applies to all boards and further platform-specific checks
1004                            # are not required
1005                            if found_snippets[this_snippet].appends:
1006                                continue
1008                            for this_board in found_snippets[this_snippet].board2appends:
1009                                if this_board.startswith('/'):
1010                                    match = re.search(this_board[1:-1], plat.name)
1011                                    if match is not None:
1012                                        matched_snippet_board = True
1013                                        break
1014                                elif this_board == plat.name:
1015                                    matched_snippet_board = True
1016                                    break
1018                            if matched_snippet_board is False:
1019                                instance.add_filter("Snippet not supported", Filters.PLATFORM)
1020                                break
1022                # handle quarantined tests
1023                self.handle_quarantined_tests(instance, plat)
1025                # platform_key is a list of unique platform attributes that form a unique key
1026                # a test will match against to determine if it should be scheduled to run.
1027                # A key containing a field name that the platform does not have
1028                # will filter the platform.
1029                #
1030                # A simple example is keying on arch and simulation
1031                # to run a test once per unique (arch, simulation) platform.
1032                if (
1033                    not ignore_platform_key
1034                    and hasattr(ts, 'platform_key')
1035                    and len(ts.platform_key) > 0
1036                ):
1037                    key_fields = sorted(set(ts.platform_key))
1038                    keys = [getattr(plat, key_field, None) for key_field in key_fields]
1039                    for key in keys:
1040                        if key is None or key == 'na':
1041                            instance.add_filter(
1042                                "Excluded platform missing key fields"
1043                                f" demanded by test {key_fields}",
1044                                Filters.PLATFORM
1045                            )
1046                            break
1047                    else:
1048                        test_keys = copy.deepcopy(keys)
1049                        test_keys.append(ts.name)
1050                        test_keys = tuple(test_keys)
1051                        keyed_test = keyed_tests.get(test_keys)
1052                        if keyed_test is not None:
1053                            plat_key = {
1054                                key_field: getattr(
1055                                    keyed_test['plat'],
1056                                    key_field
1057                                ) for key_field in key_fields
1058                            }
1059                            instance.add_filter(
1060                                f"Already covered for key {key}"
1061                                f" by platform {keyed_test['plat'].name} having key {plat_key}",
1062                                Filters.PLATFORM_KEY
1063                            )
1064                        else:
1065                            # do not add a platform to keyed tests if previously
1066                            # filtered
1068                            if not instance.filters:
1069                                keyed_tests[test_keys] = {'plat': plat, 'ts': ts}
1071                # if nothing stopped us until now, it means this configuration
1072                # needs to be added.
1073                instance_list.append(instance)
1075            # no configurations, so jump to next testsuite
1076            if not instance_list:
1077                continue
1079            # if twister was launched with no platform options at all, we
1080            # take all default platforms
1081            if default_platforms and not ts.build_on_all and not integration:
1082                if ts.platform_allow:
1083                    _default_p = set(self.default_platforms)
1084                    _platform_allow = set(ts.platform_allow)
1085                    _intersection = _default_p.intersection(_platform_allow)
1086                    if _intersection:
1087                        aa = list(
1088                            filter(
1089                                lambda _scenario: _scenario.platform.name in _intersection,
1090                                instance_list
1091                            )
1092                        )
1093                        self.add_instances(aa)
1094                    else:
1095                        self.add_instances(instance_list)
1096                else:
1097                    # add integration platforms to the list of default
1098                    # platforms, even if we are not in integration mode
1099                    _platforms = self.default_platforms + ts.integration_platforms
1100                    instances = list(
1101                        filter(lambda ts: ts.platform.name in _platforms, instance_list)
1102                    )
1103                    self.add_instances(instances)
1104            elif integration:
1105                instances = list(
1106                    filter(
1107                        lambda item:  item.platform.name in ts.integration_platforms,
1108                        instance_list
1109                    )
1110                )
1111                self.add_instances(instances)
1113            elif emulation_platforms:
1114                self.add_instances(instance_list)
1115                for instance in list(
1116                    filter(
1117                        lambda inst: not inst.platform.simulator_by_name(self.options.sim_name),
1118                        instance_list
1119                    )
1120                ):
1121                    instance.add_filter("Not an emulated platform", Filters.CMD_LINE)
1122            elif vendor_platforms:
1123                self.add_instances(instance_list)
1124                for instance in list(
1125                    filter(
1126                        lambda inst: inst.platform.vendor not in vendor_filter,
1127                        instance_list
1128                    )
1129                ):
1130                    instance.add_filter("Not a selected vendor platform", Filters.CMD_LINE)
1131            else:
1132                self.add_instances(instance_list)
1134        for _, case in self.instances.items():
1135            # Do not create files for filtered instances
1136            if case.status == TwisterStatus.FILTER:
1137                continue
1138            # set run_id for each unfiltered instance
1139            case.setup_run_id()
1140            case.create_overlay(case.platform,
1141                                self.options.enable_asan,
1142                                self.options.enable_ubsan,
1143                                self.options.enable_coverage,
1144                                self.options.coverage_platform)
1146        self.selected_platforms = set(p.platform.name for p in self.instances.values())
1148        filtered_instances = list(
1149            filter(lambda item:  item.status == TwisterStatus.FILTER, self.instances.values())
1150        )
1151        for filtered_instance in filtered_instances:
1152            change_skip_to_error_if_integration(self.options, filtered_instance)
1154            filtered_instance.add_missing_case_status(filtered_instance.status)
1156    def add_instances(self, instance_list):
1157        for instance in instance_list:
1158            self.instances[instance.name] = instance
1161    def get_testsuite(self, identifier):
1162        results = []
1163        for _, ts in self.testsuites.items():
1164            if ts.id == identifier:
1165                results.append(ts)
1166        return results
1168    def get_testcase(self, identifier):
1169        results = []
1170        for _, ts in self.testsuites.items():
1171            for case in ts.testcases:
1172                if case.name == identifier:
1173                    results.append(ts)
1174        return results
1176    def verify_platforms_existence(self, platform_names_to_verify, log_info=""):
1177        """
1178        Verify if platform name (passed by --platform option, or in yaml file
1179        as platform_allow or integration_platforms options) is correct. If not -
1180        log and raise error.
1181        """
1182        _platforms = []
1183        for platform in platform_names_to_verify:
1184            if platform in self.platform_names:
1185                p = self.get_platform(platform)
1186                if p:
1187                    _platforms.append(p.name)
1188            else:
1189                logger.error(f"{log_info} - unrecognized platform - {platform}")
1190                sys.exit(2)
1191        return _platforms
1193    def create_build_dir_links(self):
1194        """
1195        Iterate through all no-skipped instances in suite and create links
1196        for each one build directories. Those links will be passed in the next
1197        steps to the CMake command.
1198        """
1200        links_dir_name = "twister_links"  # folder for all links
1201        links_dir_path = os.path.join(self.env.outdir, links_dir_name)
1202        if not os.path.exists(links_dir_path):
1203            os.mkdir(links_dir_path)
1205        for instance in self.instances.values():
1206            if instance.status != TwisterStatus.SKIP:
1207                self._create_build_dir_link(links_dir_path, instance)
1209    def _create_build_dir_link(self, links_dir_path, instance):
1210        """
1211        Create build directory with original "long" path. Next take shorter
1212        path and link them with original path - create link. At the end
1213        replace build_dir to created link. This link will be passed to CMake
1214        command. This action helps to limit path length which can be
1215        significant during building by CMake on Windows OS.
1216        """
1218        os.makedirs(instance.build_dir, exist_ok=True)
1220        link_name = f"test_{self.link_dir_counter}"
1221        link_path = os.path.join(links_dir_path, link_name)
1223        if os.name == "nt":  # if OS is Windows
1224            command = ["mklink", "/J", f"{link_path}", os.path.normpath(instance.build_dir)]
1225            subprocess.call(command, shell=True)
1226        else:  # for Linux and MAC OS
1227            os.symlink(instance.build_dir, link_path)
1229        # Here original build directory is replaced with symbolic link. It will
1230        # be passed to CMake command
1231        instance.build_dir = link_path
1233        self.link_dir_counter += 1
1236def change_skip_to_error_if_integration(options, instance):
1237    ''' All skips on integration_platforms are treated as errors.'''
1238    if instance.platform.name in instance.testsuite.integration_platforms:
1239        # Do not treat this as error if filter type is among ignore_filters
1240        filters = {t['type'] for t in instance.filters}
1241        ignore_filters ={Filters.CMD_LINE, Filters.SKIP, Filters.PLATFORM_KEY,
1242                         Filters.TOOLCHAIN, Filters.MODULE, Filters.TESTPLAN,
1243                         Filters.QUARANTINE, Filters.ENVIRONMENT}
1244        if filters.intersection(ignore_filters):
1245            return
1246        instance.status = TwisterStatus.ERROR
1247        instance.reason += " but is one of the integration platforms"
1248        logger.debug(
1249            f"Changing status of {instance.name} to ERROR because it is an integration platform"
1250        )