1#!/usr/bin/env python3
2# vim: set syntax=python ts=4 :
3#
4# Copyright (c) 2018 Intel Corporation
5# SPDX-License-Identifier: Apache-2.0
6import os
7import sys
8import re
9import subprocess
10import glob
11import json
12import collections
13from collections import OrderedDict
14from itertools import islice
15import logging
16import copy
17import shutil
18import random
19import snippets
20from colorama import Fore
21from pathlib import Path
22from argparse import Namespace
23
24logger = logging.getLogger('twister')
25logger.setLevel(logging.DEBUG)
26
27try:
28    from anytree import RenderTree, Node, find
29except ImportError:
30    print("Install the anytree module to use the --test-tree option")
31
32from twisterlib.testsuite import TestSuite, scan_testsuite_path
33from twisterlib.error import TwisterRuntimeError
34from twisterlib.platform import Platform
35from twisterlib.config_parser import TwisterConfigParser
36from twisterlib.testinstance import TestInstance
37from twisterlib.quarantine import Quarantine
38
39import list_boards
40from zephyr_module import parse_modules
41
42ZEPHYR_BASE = os.getenv("ZEPHYR_BASE")
43if not ZEPHYR_BASE:
44    sys.exit("$ZEPHYR_BASE environment variable undefined")
45
46# This is needed to load edt.pickle files.
47sys.path.insert(0, os.path.join(ZEPHYR_BASE, "scripts", "dts",
48                                "python-devicetree", "src"))
49from devicetree import edtlib  # pylint: disable=unused-import
50
51sys.path.insert(0, os.path.join(ZEPHYR_BASE, "scripts/"))
52
53import scl
54class Filters:
55    # platform keys
56    PLATFORM_KEY = 'platform key filter'
57    # filters provided on command line by the user/tester
58    CMD_LINE = 'command line filter'
59    # filters in the testsuite yaml definition
60    TESTSUITE = 'testsuite filter'
61    # filters in the testplan yaml definition
62    TESTPLAN = 'testplan filter'
63    # filters related to platform definition
64    PLATFORM = 'Platform related filter'
65    # in case a test suite was quarantined.
66    QUARANTINE = 'Quarantine filter'
67    # in case a test suite is skipped intentionally .
68    SKIP = 'Skip filter'
69    # in case of incompatibility between selected and allowed toolchains.
70    TOOLCHAIN = 'Toolchain filter'
71    # in case an optional module is not available
72    MODULE = 'Module filter'
73
74
75class TestLevel:
76    name = None
77    levels = []
78    scenarios = []
79
80class TestPlan:
81    config_re = re.compile('(CONFIG_[A-Za-z0-9_]+)[=]\"?([^\"]*)\"?$')
82    dt_re = re.compile('([A-Za-z0-9_]+)[=]\"?([^\"]*)\"?$')
83
84    suite_schema = scl.yaml_load(
85        os.path.join(ZEPHYR_BASE,
86                     "scripts", "schemas", "twister", "testsuite-schema.yaml"))
87    quarantine_schema = scl.yaml_load(
88        os.path.join(ZEPHYR_BASE,
89                     "scripts", "schemas", "twister", "quarantine-schema.yaml"))
90
91    tc_schema_path = os.path.join(ZEPHYR_BASE, "scripts", "schemas", "twister", "test-config-schema.yaml")
92
93    SAMPLE_FILENAME = 'sample.yaml'
94    TESTSUITE_FILENAME = 'testcase.yaml'
95
96    def __init__(self, env=None):
97
98        self.options = env.options
99        self.env = env
100
101        # Keep track of which test cases we've filtered out and why
102        self.testsuites = {}
103        self.quarantine = None
104        self.platforms = []
105        self.platform_names = []
106        self.selected_platforms = []
107        self.filtered_platforms = []
108        self.default_platforms = []
109        self.load_errors = 0
110        self.instances = dict()
111        self.instance_fail_count = 0
112        self.warnings = 0
113
114        self.scenarios = []
115
116        self.hwm = env.hwm
117        # used during creating shorter build paths
118        self.link_dir_counter = 0
119        self.modules = []
120
121        self.run_individual_testsuite = []
122        self.levels = []
123        self.test_config =  {}
124
125
126    def get_level(self, name):
127        level = next((l for l in self.levels if l.name == name), None)
128        return level
129
130    def parse_configuration(self, config_file):
131        if os.path.exists(config_file):
132            tc_schema = scl.yaml_load(self.tc_schema_path)
133            self.test_config = scl.yaml_load_verify(config_file, tc_schema)
134        else:
135            raise TwisterRuntimeError(f"File {config_file} not found.")
136
137        levels = self.test_config.get('levels', [])
138
139        # Do first pass on levels to get initial data.
140        for level in levels:
141            adds = []
142            for s in  level.get('adds', []):
143                r = re.compile(s)
144                adds.extend(list(filter(r.fullmatch, self.scenarios)))
145
146            tl = TestLevel()
147            tl.name = level['name']
148            tl.scenarios = adds
149            tl.levels = level.get('inherits', [])
150            self.levels.append(tl)
151
152        # Go over levels again to resolve inheritance.
153        for level in levels:
154            inherit = level.get('inherits', [])
155            _level = self.get_level(level['name'])
156            if inherit:
157                for inherted_level in inherit:
158                    _inherited = self.get_level(inherted_level)
159                    _inherited_scenarios = _inherited.scenarios
160                    level_scenarios = _level.scenarios
161                    level_scenarios.extend(_inherited_scenarios)
162
163    def find_subtests(self):
164        sub_tests = self.options.sub_test
165        if sub_tests:
166            for subtest in sub_tests:
167                _subtests = self.get_testsuite(subtest)
168                for _subtest in _subtests:
169                    self.run_individual_testsuite.append(_subtest.name)
170
171            if self.run_individual_testsuite:
172                logger.info("Running the following tests:")
173                for test in self.run_individual_testsuite:
174                    print(" - {}".format(test))
175            else:
176                raise TwisterRuntimeError("Tests not found")
177
178    def discover(self):
179        self.handle_modules()
180        if self.options.test:
181            self.run_individual_testsuite = self.options.test
182
183        num = self.add_testsuites(testsuite_filter=self.run_individual_testsuite)
184        if num == 0:
185            raise TwisterRuntimeError("No test cases found at the specified location...")
186
187        self.find_subtests()
188        # get list of scenarios we have parsed into one list
189        for _, ts in self.testsuites.items():
190            self.scenarios.append(ts.id)
191
192        self.report_duplicates()
193
194        self.parse_configuration(config_file=self.env.test_config)
195        self.add_configurations()
196
197        if self.load_errors:
198            raise TwisterRuntimeError("Errors while loading configurations")
199
200        # handle quarantine
201        ql = self.options.quarantine_list
202        qv = self.options.quarantine_verify
203        if qv and not ql:
204            logger.error("No quarantine list given to be verified")
205            raise TwisterRuntimeError("No quarantine list given to be verified")
206        if ql:
207            for quarantine_file in ql:
208                try:
209                    # validate quarantine yaml file against the provided schema
210                    scl.yaml_load_verify(quarantine_file, self.quarantine_schema)
211                except scl.EmptyYamlFileException:
212                    logger.debug(f'Quarantine file {quarantine_file} is empty')
213            self.quarantine = Quarantine(ql)
214
215    def load(self):
216
217        if self.options.report_suffix:
218            last_run = os.path.join(self.options.outdir, "twister_{}.json".format(self.options.report_suffix))
219        else:
220            last_run = os.path.join(self.options.outdir, "twister.json")
221
222        if self.options.only_failed or self.options.report_summary is not None:
223            self.load_from_file(last_run)
224            self.selected_platforms = set(p.platform.name for p in self.instances.values())
225        elif self.options.load_tests:
226            self.load_from_file(self.options.load_tests)
227            self.selected_platforms = set(p.platform.name for p in self.instances.values())
228        elif self.options.test_only:
229            # Get list of connected hardware and filter tests to only be run on connected hardware.
230            # If the platform does not exist in the hardware map or was not specified by --platform,
231            # just skip it.
232            connected_list = self.options.platform
233            if self.options.exclude_platform:
234                for excluded in self.options.exclude_platform:
235                    if excluded in connected_list:
236                        connected_list.remove(excluded)
237            self.load_from_file(last_run, filter_platform=connected_list)
238            self.selected_platforms = set(p.platform.name for p in self.instances.values())
239        else:
240            self.apply_filters()
241
242        if self.options.subset:
243            s =  self.options.subset
244            try:
245                subset, sets = (int(x) for x in s.split("/"))
246            except ValueError:
247                raise TwisterRuntimeError("Bad subset value.")
248
249            if subset > sets:
250                raise TwisterRuntimeError("subset should not exceed the total number of sets")
251
252            if int(subset) > 0 and int(sets) >= int(subset):
253                logger.info("Running only a subset: %s/%s" % (subset, sets))
254            else:
255                raise TwisterRuntimeError(f"You have provided a wrong subset value: {self.options.subset}.")
256
257            self.generate_subset(subset, int(sets))
258
259    def generate_subset(self, subset, sets):
260        # Test instances are sorted depending on the context. For CI runs
261        # the execution order is: "plat1-testA, plat1-testB, ...,
262        # plat1-testZ, plat2-testA, ...". For hardware tests
263        # (device_testing), were multiple physical platforms can run the tests
264        # in parallel, it is more efficient to run in the order:
265        # "plat1-testA, plat2-testA, ..., plat1-testB, plat2-testB, ..."
266        if self.options.device_testing:
267            self.instances = OrderedDict(sorted(self.instances.items(),
268                                key=lambda x: x[0][x[0].find("/") + 1:]))
269        else:
270            self.instances = OrderedDict(sorted(self.instances.items()))
271
272        if self.options.shuffle_tests:
273            seed_value = int.from_bytes(os.urandom(8), byteorder="big")
274            if self.options.shuffle_tests_seed is not None:
275                seed_value = self.options.shuffle_tests_seed
276
277            logger.info(f"Shuffle tests with seed: {seed_value}")
278            random.seed(seed_value)
279            temp_list = list(self.instances.items())
280            random.shuffle(temp_list)
281            self.instances = OrderedDict(temp_list)
282
283        # Do calculation based on what is actually going to be run and evaluated
284        # at runtime, ignore the cases we already know going to be skipped.
285        # This fixes an issue where some sets would get majority of skips and
286        # basically run nothing beside filtering.
287        to_run = {k : v for k,v in self.instances.items() if v.status is None}
288        total = len(to_run)
289        per_set = int(total / sets)
290        num_extra_sets = total - (per_set * sets)
291
292        # Try and be more fair for rounding error with integer division
293        # so the last subset doesn't get overloaded, we add 1 extra to
294        # subsets 1..num_extra_sets.
295        if subset <= num_extra_sets:
296            start = (subset - 1) * (per_set + 1)
297            end = start + per_set + 1
298        else:
299            base = num_extra_sets * (per_set + 1)
300            start = ((subset - num_extra_sets - 1) * per_set) + base
301            end = start + per_set
302
303        sliced_instances = islice(to_run.items(), start, end)
304        skipped = {k : v for k,v in self.instances.items() if v.status == 'skipped'}
305        errors = {k : v for k,v in self.instances.items() if v.status == 'error'}
306        self.instances = OrderedDict(sliced_instances)
307        if subset == 1:
308            # add all pre-filtered tests that are skipped or got error status
309            # to the first set to allow for better distribution among all sets.
310            self.instances.update(skipped)
311            self.instances.update(errors)
312
313
314    def handle_modules(self):
315        # get all enabled west projects
316        modules_meta = parse_modules(ZEPHYR_BASE)
317        self.modules = [module.meta.get('name') for module in modules_meta]
318
319
320    def report(self):
321        if self.options.test_tree:
322            self.report_test_tree()
323            return 0
324        elif self.options.list_tests:
325            self.report_test_list()
326            return 0
327        elif self.options.list_tags:
328            self.report_tag_list()
329            return 0
330
331        return 1
332
333    def report_duplicates(self):
334        dupes = [item for item, count in collections.Counter(self.scenarios).items() if count > 1]
335        if dupes:
336            msg = "Duplicated test scenarios found:\n"
337            for dupe in dupes:
338                msg += ("- {} found in:\n".format(dupe))
339                for dc in self.get_testsuite(dupe):
340                    msg += ("  - {}\n".format(dc.yamlfile))
341            raise TwisterRuntimeError(msg)
342        else:
343            logger.debug("No duplicates found.")
344
345    def report_tag_list(self):
346        tags = set()
347        for _, tc in self.testsuites.items():
348            tags = tags.union(tc.tags)
349
350        for t in tags:
351            print("- {}".format(t))
352
353    def report_test_tree(self):
354        tests_list = self.get_tests_list()
355
356        testsuite = Node("Testsuite")
357        samples = Node("Samples", parent=testsuite)
358        tests = Node("Tests", parent=testsuite)
359
360        for test in sorted(tests_list):
361            if test.startswith("sample."):
362                sec = test.split(".")
363                area = find(samples, lambda node: node.name == sec[1] and node.parent == samples)
364                if not area:
365                    area = Node(sec[1], parent=samples)
366
367                Node(test, parent=area)
368            else:
369                sec = test.split(".")
370                area = find(tests, lambda node: node.name == sec[0] and node.parent == tests)
371                if not area:
372                    area = Node(sec[0], parent=tests)
373
374                if area and len(sec) > 2:
375                    subarea = find(area, lambda node: node.name == sec[1] and node.parent == area)
376                    if not subarea:
377                        subarea = Node(sec[1], parent=area)
378                    Node(test, parent=subarea)
379
380        for pre, _, node in RenderTree(testsuite):
381            print("%s%s" % (pre, node.name))
382
383    def report_test_list(self):
384        tests_list = self.get_tests_list()
385
386        cnt = 0
387        for test in sorted(tests_list):
388            cnt = cnt + 1
389            print(" - {}".format(test))
390        print("{} total.".format(cnt))
391
392
393    # Debug Functions
394    @staticmethod
395    def info(what):
396        sys.stdout.write(what + "\n")
397        sys.stdout.flush()
398
399    def add_configurations(self):
400        board_dirs = set()
401        # Create a list of board roots as defined by the build system in general
402        # Note, internally in twister a board root includes the `boards` folder
403        # but in Zephyr build system, the board root is without the `boards` in folder path.
404        board_roots = [Path(os.path.dirname(root)) for root in self.env.board_roots]
405        lb_args = Namespace(arch_roots=[Path(ZEPHYR_BASE)], soc_roots=[Path(ZEPHYR_BASE),
406                            Path(ZEPHYR_BASE) / 'subsys' / 'testsuite'],
407                            board_roots=board_roots, board=None, board_dir=None)
408        v1_boards = list_boards.find_boards(lb_args)
409        v2_dirs = list_boards.find_v2_board_dirs(lb_args)
410        for b in v1_boards:
411            board_dirs.add(b.dir)
412        board_dirs.update(v2_dirs)
413        logger.debug("Reading platform configuration files under %s..." % self.env.board_roots)
414
415        platform_config = self.test_config.get('platforms', {})
416        for folder in board_dirs:
417            for file in glob.glob(os.path.join(folder, "*.yaml")):
418                # If the user set a platform filter, we can, if no other option would increase
419                # the allowed platform pool, save on time by not loading YAMLs of any boards
420                # that do not start with the required names.
421                if self.options.platform and \
422                    not self.options.all and \
423                    not self.options.integration and \
424                    not any([
425                        os.path.basename(file).startswith(
426                            re.split('[/@]', p)[0]
427                        ) for p in self.options.platform
428                    ]):
429                    continue
430                try:
431                    platform = Platform()
432                    platform.load(file)
433                    if platform.name in [p.name for p in self.platforms]:
434                        logger.error(f"Duplicate platform {platform.name} in {file}")
435                        raise Exception(f"Duplicate platform identifier {platform.name} found")
436
437                    if not platform.twister:
438                        continue
439
440                    self.platforms.append(platform)
441                    if not platform_config.get('override_default_platforms', False):
442                        if platform.default:
443                            self.default_platforms.append(platform.name)
444                    else:
445                        if platform.name in platform_config.get('default_platforms', []):
446                            logger.debug(f"adding {platform.name} to default platforms")
447                            self.default_platforms.append(platform.name)
448
449                    # support board@revision
450                    # if there is already an existed <board>_<revision>.yaml, then use it to
451                    # load platform directly, otherwise, iterate the directory to
452                    # get all valid board revision based on each <board>_<revision>.conf.
453                    if '@' not in platform.name:
454                        tmp_dir = os.listdir(os.path.dirname(file))
455                        for item in tmp_dir:
456                            # Need to make sure the revision matches
457                            # the permitted patterns as described in
458                            # cmake/modules/extensions.cmake.
459                            revision_patterns = ["[A-Z]",
460                                                    "[0-9]+",
461                                                    "(0|[1-9][0-9]*)(_[0-9]+){0,2}"]
462
463                            for pattern in revision_patterns:
464                                result = re.match(f"{platform.name}_(?P<revision>{pattern})\\.conf", item)
465                                if result:
466                                    revision = result.group("revision")
467                                    yaml_file = f"{platform.name}_{revision}.yaml"
468                                    if yaml_file not in tmp_dir:
469                                        platform_revision = copy.deepcopy(platform)
470                                        revision = revision.replace("_", ".")
471                                        platform_revision.name = f"{platform.name}@{revision}"
472                                        platform_revision.normalized_name = platform_revision.name.replace("/", "_")
473                                        platform_revision.default = False
474                                        self.platforms.append(platform_revision)
475
476                                    break
477
478
479                except RuntimeError as e:
480                    logger.error("E: %s: can't load: %s" % (file, e))
481                    self.load_errors += 1
482
483        self.platform_names = [p.name for p in self.platforms]
484
485    def get_all_tests(self):
486        testcases = []
487        for _, ts in self.testsuites.items():
488            for case in ts.testcases:
489                testcases.append(case.name)
490
491        return testcases
492
493    def get_tests_list(self):
494        testcases = []
495        if tag_filter := self.options.tag:
496            for _, ts in self.testsuites.items():
497                if ts.tags.intersection(tag_filter):
498                    for case in ts.testcases:
499                        testcases.append(case.name)
500        else:
501            for _, ts in self.testsuites.items():
502                for case in ts.testcases:
503                    testcases.append(case.name)
504
505        if exclude_tag := self.options.exclude_tag:
506            for _, ts in self.testsuites.items():
507                if ts.tags.intersection(exclude_tag):
508                    for case in ts.testcases:
509                        if case.name in testcases:
510                            testcases.remove(case.name)
511        return testcases
512
513    def add_testsuites(self, testsuite_filter=[]):
514        for root in self.env.test_roots:
515            root = os.path.abspath(root)
516
517            logger.debug("Reading test case configuration files under %s..." % root)
518
519            for dirpath, _, filenames in os.walk(root, topdown=True):
520                if self.SAMPLE_FILENAME in filenames:
521                    filename = self.SAMPLE_FILENAME
522                elif self.TESTSUITE_FILENAME in filenames:
523                    filename = self.TESTSUITE_FILENAME
524                else:
525                    continue
526
527                logger.debug("Found possible testsuite in " + dirpath)
528
529                suite_yaml_path = os.path.join(dirpath, filename)
530                suite_path = os.path.dirname(suite_yaml_path)
531
532                for alt_config_root in self.env.alt_config_root:
533                    alt_config = os.path.join(os.path.abspath(alt_config_root),
534                                              os.path.relpath(suite_path, root),
535                                              filename)
536                    if os.path.exists(alt_config):
537                        logger.info("Using alternative configuration from %s" %
538                                    os.path.normpath(alt_config))
539                        suite_yaml_path = alt_config
540                        break
541
542                try:
543                    parsed_data = TwisterConfigParser(suite_yaml_path, self.suite_schema)
544                    parsed_data.load()
545                    subcases = None
546                    ztest_suite_names = None
547
548                    for name in parsed_data.scenarios.keys():
549                        suite_dict = parsed_data.get_scenario(name)
550                        suite = TestSuite(root, suite_path, name, data=suite_dict, detailed_test_id=self.options.detailed_test_id)
551                        if suite.harness in ['ztest', 'test']:
552                            if subcases is None:
553                                # scan it only once per testsuite
554                                subcases, ztest_suite_names = scan_testsuite_path(suite_path)
555                            suite.add_subcases(suite_dict, subcases, ztest_suite_names)
556                        else:
557                            suite.add_subcases(suite_dict)
558                        if testsuite_filter:
559                            scenario = os.path.basename(suite.name)
560                            if suite.name and (suite.name in testsuite_filter or scenario in testsuite_filter):
561                                self.testsuites[suite.name] = suite
562                        else:
563                            self.testsuites[suite.name] = suite
564
565                except Exception as e:
566                    logger.error(f"{suite_path}: can't load (skipping): {e!r}")
567                    self.load_errors += 1
568        return len(self.testsuites)
569
570    def __str__(self):
571        return self.name
572
573    def get_platform(self, name):
574        selected_platform = None
575        for platform in self.platforms:
576            if platform.name == name:
577                selected_platform = platform
578                break
579        return selected_platform
580
581    def handle_quarantined_tests(self, instance: TestInstance, plat: Platform):
582        if self.quarantine:
583            matched_quarantine = self.quarantine.get_matched_quarantine(
584                instance.testsuite.id, plat.name, plat.arch, plat.simulation
585            )
586            if matched_quarantine and not self.options.quarantine_verify:
587                instance.add_filter("Quarantine: " + matched_quarantine, Filters.QUARANTINE)
588                return
589            if not matched_quarantine and self.options.quarantine_verify:
590                instance.add_filter("Not under quarantine", Filters.QUARANTINE)
591
592    def load_from_file(self, file, filter_platform=[]):
593        try:
594            with open(file, "r") as json_test_plan:
595                jtp = json.load(json_test_plan)
596                instance_list = []
597                for ts in jtp.get("testsuites", []):
598                    logger.debug(f"loading {ts['name']}...")
599                    testsuite = ts["name"]
600
601                    platform = self.get_platform(ts["platform"])
602                    if filter_platform and platform.name not in filter_platform:
603                        continue
604                    instance = TestInstance(self.testsuites[testsuite], platform, self.env.outdir)
605                    if ts.get("run_id"):
606                        instance.run_id = ts.get("run_id")
607
608                    if self.options.device_testing:
609                        tfilter = 'runnable'
610                    else:
611                        tfilter = 'buildable'
612                    instance.run = instance.check_runnable(
613                        self.options.enable_slow,
614                        tfilter,
615                        self.options.fixture,
616                        self.hwm
617                    )
618
619                    instance.metrics['handler_time'] = ts.get('execution_time', 0)
620                    instance.metrics['used_ram'] = ts.get("used_ram", 0)
621                    instance.metrics['used_rom']  = ts.get("used_rom",0)
622                    instance.metrics['available_ram'] = ts.get('available_ram', 0)
623                    instance.metrics['available_rom'] = ts.get('available_rom', 0)
624
625                    status = ts.get('status', None)
626                    reason = ts.get("reason", "Unknown")
627                    if status in ["error", "failed"]:
628                        if self.options.report_summary is not None:
629                            if status == "error": status = "ERROR"
630                            elif status == "failed": status = "FAILED"
631                            instance.status = Fore.RED + status + Fore.RESET
632                            instance.reason = reason
633                            self.instance_fail_count += 1
634                        else:
635                            instance.status = None
636                            instance.reason = None
637                            instance.retries += 1
638                    # test marked as passed (built only) but can run when
639                    # --test-only is used. Reset status to capture new results.
640                    elif status == 'passed' and instance.run and self.options.test_only:
641                        instance.status = None
642                        instance.reason = None
643                    else:
644                        instance.status = status
645                        instance.reason = reason
646
647                    self.handle_quarantined_tests(instance, platform)
648
649                    for tc in ts.get('testcases', []):
650                        identifier = tc['identifier']
651                        tc_status = tc.get('status', None)
652                        tc_reason = None
653                        # we set reason only if status is valid, it might have been
654                        # reset above...
655                        if instance.status:
656                            tc_reason = tc.get('reason')
657                        if tc_status:
658                            case = instance.set_case_status_by_name(identifier, tc_status, tc_reason)
659                            case.duration = tc.get('execution_time', 0)
660                            if tc.get('log'):
661                                case.output = tc.get('log')
662
663
664                    instance.create_overlay(platform, self.options.enable_asan, self.options.enable_ubsan, self.options.enable_coverage, self.options.coverage_platform)
665                    instance_list.append(instance)
666                self.add_instances(instance_list)
667        except FileNotFoundError as e:
668            logger.error(f"{e}")
669            return 1
670
671    def apply_filters(self, **kwargs):
672
673        toolchain = self.env.toolchain
674        platform_filter = self.options.platform
675        vendor_filter = self.options.vendor
676        exclude_platform = self.options.exclude_platform
677        testsuite_filter = self.run_individual_testsuite
678        arch_filter = self.options.arch
679        tag_filter = self.options.tag
680        exclude_tag = self.options.exclude_tag
681        all_filter = self.options.all
682        runnable = (self.options.device_testing or self.options.filter == 'runnable')
683        force_toolchain = self.options.force_toolchain
684        force_platform = self.options.force_platform
685        slow_only = self.options.enable_slow_only
686        ignore_platform_key = self.options.ignore_platform_key
687        emu_filter = self.options.emulation_only
688
689        logger.debug("platform filter: " + str(platform_filter))
690        logger.debug("  vendor filter: " + str(vendor_filter))
691        logger.debug("    arch_filter: " + str(arch_filter))
692        logger.debug("     tag_filter: " + str(tag_filter))
693        logger.debug("    exclude_tag: " + str(exclude_tag))
694
695        default_platforms = False
696        vendor_platforms = False
697        emulation_platforms = False
698
699        if all_filter:
700            logger.info("Selecting all possible platforms per test case")
701            # When --all used, any --platform arguments ignored
702            platform_filter = []
703        elif not platform_filter and not emu_filter and not vendor_filter:
704            logger.info("Selecting default platforms per test case")
705            default_platforms = True
706        elif emu_filter:
707            logger.info("Selecting emulation platforms per test case")
708            emulation_platforms = True
709        elif vendor_filter:
710            vendor_platforms = True
711
712        if platform_filter:
713            self.verify_platforms_existence(platform_filter, f"platform_filter")
714            platforms = list(filter(lambda p: p.name in platform_filter, self.platforms))
715        elif emu_filter:
716            platforms = list(filter(lambda p: p.simulation != 'na', self.platforms))
717        elif vendor_filter:
718            platforms = list(filter(lambda p: p.vendor in vendor_filter, self.platforms))
719            logger.info(f"Selecting platforms by vendors: {','.join(vendor_filter)}")
720        elif arch_filter:
721            platforms = list(filter(lambda p: p.arch in arch_filter, self.platforms))
722        elif default_platforms:
723            _platforms = list(filter(lambda p: p.name in self.default_platforms, self.platforms))
724            platforms = []
725            # default platforms that can't be run are dropped from the list of
726            # the default platforms list. Default platforms should always be
727            # runnable.
728            for p in _platforms:
729                if p.simulation and p.simulation_exec:
730                    if shutil.which(p.simulation_exec):
731                        platforms.append(p)
732                else:
733                    platforms.append(p)
734        else:
735            platforms = self.platforms
736
737        platform_config = self.test_config.get('platforms', {})
738        logger.info("Building initial testsuite list...")
739
740        keyed_tests = {}
741
742        for ts_name, ts in self.testsuites.items():
743            if ts.build_on_all and not platform_filter and platform_config.get('increased_platform_scope', True):
744                platform_scope = self.platforms
745            elif ts.integration_platforms:
746                integration_platforms = list(filter(lambda item: item.name in ts.integration_platforms,
747                                                    self.platforms))
748                if self.options.integration:
749                    self.verify_platforms_existence(
750                        ts.integration_platforms, f"{ts_name} - integration_platforms")
751                    platform_scope = integration_platforms
752                else:
753                    # if not in integration mode, still add integration platforms to the list
754                    if not platform_filter:
755                        self.verify_platforms_existence(
756                            ts.integration_platforms, f"{ts_name} - integration_platforms")
757                        platform_scope = platforms + integration_platforms
758                    else:
759                        platform_scope = platforms
760            else:
761                platform_scope = platforms
762
763            integration = self.options.integration and ts.integration_platforms
764
765            # If there isn't any overlap between the platform_allow list and the platform_scope
766            # we set the scope to the platform_allow list
767            if ts.platform_allow and not platform_filter and not integration and platform_config.get('increased_platform_scope', True):
768                self.verify_platforms_existence(
769                    ts.platform_allow, f"{ts_name} - platform_allow")
770                a = set(platform_scope)
771                b = set(filter(lambda item: item.name in ts.platform_allow, self.platforms))
772                c = a.intersection(b)
773                if not c:
774                    platform_scope = list(filter(lambda item: item.name in ts.platform_allow, \
775                                             self.platforms))
776            # list of instances per testsuite, aka configurations.
777            instance_list = []
778            for plat in platform_scope:
779                instance = TestInstance(ts, plat, self.env.outdir)
780                if runnable:
781                    tfilter = 'runnable'
782                else:
783                    tfilter = 'buildable'
784
785                instance.run = instance.check_runnable(
786                    self.options.enable_slow,
787                    tfilter,
788                    self.options.fixture,
789                    self.hwm
790                )
791
792                if not force_platform and plat.name in exclude_platform:
793                    instance.add_filter("Platform is excluded on command line.", Filters.CMD_LINE)
794
795                if (plat.arch == "unit") != (ts.type == "unit"):
796                    # Discard silently
797                    continue
798
799                if ts.modules and self.modules:
800                    if not set(ts.modules).issubset(set(self.modules)):
801                        instance.add_filter(f"one or more required modules not available: {','.join(ts.modules)}", Filters.MODULE)
802
803                if self.options.level:
804                    tl = self.get_level(self.options.level)
805                    if tl is None:
806                        instance.add_filter(f"Unknown test level '{self.options.level}'", Filters.TESTPLAN)
807                    else:
808                        planned_scenarios = tl.scenarios
809                        if ts.id not in planned_scenarios and not set(ts.levels).intersection(set(tl.levels)):
810                            instance.add_filter("Not part of requested test plan", Filters.TESTPLAN)
811
812                if runnable and not instance.run:
813                    instance.add_filter("Not runnable on device", Filters.CMD_LINE)
814
815                if self.options.integration and ts.integration_platforms and plat.name not in ts.integration_platforms:
816                    instance.add_filter("Not part of integration platforms", Filters.TESTSUITE)
817
818                if ts.skip:
819                    instance.add_filter("Skip filter", Filters.SKIP)
820
821                if tag_filter and not ts.tags.intersection(tag_filter):
822                    instance.add_filter("Command line testsuite tag filter", Filters.CMD_LINE)
823
824                if slow_only and not ts.slow:
825                    instance.add_filter("Not a slow test", Filters.CMD_LINE)
826
827                if exclude_tag and ts.tags.intersection(exclude_tag):
828                    instance.add_filter("Command line testsuite exclude filter", Filters.CMD_LINE)
829
830                if testsuite_filter:
831                    normalized_f = [os.path.basename(_ts) for _ts in testsuite_filter]
832                    if ts.id not in normalized_f:
833                        instance.add_filter("Testsuite name filter", Filters.CMD_LINE)
834
835                if arch_filter and plat.arch not in arch_filter:
836                    instance.add_filter("Command line testsuite arch filter", Filters.CMD_LINE)
837
838                if not force_platform:
839
840                    if ts.arch_allow and plat.arch not in ts.arch_allow:
841                        instance.add_filter("Not in test case arch allow list", Filters.TESTSUITE)
842
843                    if ts.arch_exclude and plat.arch in ts.arch_exclude:
844                        instance.add_filter("In test case arch exclude", Filters.TESTSUITE)
845
846                    if ts.platform_exclude and plat.name in ts.platform_exclude:
847                        instance.add_filter("In test case platform exclude", Filters.TESTSUITE)
848
849                if ts.toolchain_exclude and toolchain in ts.toolchain_exclude:
850                    instance.add_filter("In test case toolchain exclude", Filters.TOOLCHAIN)
851
852                if platform_filter and plat.name not in platform_filter:
853                    instance.add_filter("Command line platform filter", Filters.CMD_LINE)
854
855                if ts.platform_allow \
856                        and plat.name not in ts.platform_allow \
857                        and not (platform_filter and force_platform):
858                    instance.add_filter("Not in testsuite platform allow list", Filters.TESTSUITE)
859
860                if ts.platform_type and plat.type not in ts.platform_type:
861                    instance.add_filter("Not in testsuite platform type list", Filters.TESTSUITE)
862
863                if ts.toolchain_allow and toolchain not in ts.toolchain_allow:
864                    instance.add_filter("Not in testsuite toolchain allow list", Filters.TOOLCHAIN)
865
866                if not plat.env_satisfied:
867                    instance.add_filter("Environment ({}) not satisfied".format(", ".join(plat.env)), Filters.PLATFORM)
868
869                if not force_toolchain \
870                        and toolchain and (toolchain not in plat.supported_toolchains) \
871                        and "host" not in plat.supported_toolchains \
872                        and ts.type != 'unit':
873                    instance.add_filter("Not supported by the toolchain", Filters.PLATFORM)
874
875                if plat.ram < ts.min_ram:
876                    instance.add_filter("Not enough RAM", Filters.PLATFORM)
877
878                if ts.harness:
879                    if ts.harness == 'robot' and plat.simulation != 'renode':
880                        instance.add_filter("No robot support for the selected platform", Filters.SKIP)
881
882                if ts.depends_on:
883                    dep_intersection = ts.depends_on.intersection(set(plat.supported))
884                    if dep_intersection != set(ts.depends_on):
885                        instance.add_filter("No hardware support", Filters.PLATFORM)
886
887                if plat.flash < ts.min_flash:
888                    instance.add_filter("Not enough FLASH", Filters.PLATFORM)
889
890                if set(plat.ignore_tags) & ts.tags:
891                    instance.add_filter("Excluded tags per platform (exclude_tags)", Filters.PLATFORM)
892
893                if plat.only_tags and not set(plat.only_tags) & ts.tags:
894                    instance.add_filter("Excluded tags per platform (only_tags)", Filters.PLATFORM)
895
896                if ts.required_snippets:
897                    missing_snippet = False
898                    snippet_args = {"snippets": ts.required_snippets}
899                    found_snippets = snippets.find_snippets_in_roots(snippet_args, [*self.env.snippet_roots, Path(ts.source_dir)])
900
901                    # Search and check that all required snippet files are found
902                    for this_snippet in snippet_args['snippets']:
903                        if this_snippet not in found_snippets:
904                            logger.error(f"Can't find snippet '%s' for test '%s'", this_snippet, ts.name)
905                            instance.status = "error"
906                            instance.reason = f"Snippet {this_snippet} not found"
907                            missing_snippet = True
908                            break
909
910                    if not missing_snippet:
911                        # Look for required snippets and check that they are applicable for these
912                        # platforms/boards
913                        for this_snippet in snippet_args['snippets']:
914                            matched_snippet_board = False
915
916                            # If the "appends" key is present with at least one entry then this
917                            # snippet applies to all boards and further platform-specific checks
918                            # are not required
919                            if found_snippets[this_snippet].appends:
920                                continue
921
922                            for this_board in found_snippets[this_snippet].board2appends:
923                                if this_board.startswith('/'):
924                                    match = re.search(this_board[1:-1], plat.name)
925                                    if match is not None:
926                                        matched_snippet_board = True
927                                        break
928                                elif this_board == plat.name:
929                                    matched_snippet_board = True
930                                    break
931
932                            if matched_snippet_board is False:
933                                instance.add_filter("Snippet not supported", Filters.PLATFORM)
934                                break
935
936                # handle quarantined tests
937                self.handle_quarantined_tests(instance, plat)
938
939                # platform_key is a list of unique platform attributes that form a unique key a test
940                # will match against to determine if it should be scheduled to run. A key containing a
941                # field name that the platform does not have will filter the platform.
942                #
943                # A simple example is keying on arch and simulation
944                # to run a test once per unique (arch, simulation) platform.
945                if not ignore_platform_key and hasattr(ts, 'platform_key') and len(ts.platform_key) > 0:
946                    key_fields = sorted(set(ts.platform_key))
947                    keys = [getattr(plat, key_field) for key_field in key_fields]
948                    for key in keys:
949                        if key is None or key == 'na':
950                            instance.add_filter(
951                                f"Excluded platform missing key fields demanded by test {key_fields}",
952                                Filters.PLATFORM
953                            )
954                            break
955                    else:
956                        test_keys = copy.deepcopy(keys)
957                        test_keys.append(ts.name)
958                        test_keys = tuple(test_keys)
959                        keyed_test = keyed_tests.get(test_keys)
960                        if keyed_test is not None:
961                            plat_key = {key_field: getattr(keyed_test['plat'], key_field) for key_field in key_fields}
962                            instance.add_filter(f"Already covered for key {tuple(key)} by platform {keyed_test['plat'].name} having key {plat_key}", Filters.PLATFORM_KEY)
963                        else:
964                            # do not add a platform to keyed tests if previously filtered
965                            if not instance.filters:
966                                keyed_tests[test_keys] = {'plat': plat, 'ts': ts}
967                            else:
968                                instance.add_filter(f"Excluded platform missing key fields demanded by test {key_fields}", Filters.PLATFORM)
969
970                # if nothing stopped us until now, it means this configuration
971                # needs to be added.
972                instance_list.append(instance)
973
974            # no configurations, so jump to next testsuite
975            if not instance_list:
976                continue
977
978            # if twister was launched with no platform options at all, we
979            # take all default platforms
980            if default_platforms and not ts.build_on_all and not integration:
981                if ts.platform_allow:
982                    a = set(self.default_platforms)
983                    b = set(ts.platform_allow)
984                    c = a.intersection(b)
985                    if c:
986                        aa = list(filter(lambda ts: ts.platform.name in c, instance_list))
987                        self.add_instances(aa)
988                    else:
989                        self.add_instances(instance_list)
990                else:
991                    # add integration platforms to the list of default
992                    # platforms, even if we are not in integration mode
993                    _platforms = self.default_platforms + ts.integration_platforms
994                    instances = list(filter(lambda ts: ts.platform.name in _platforms, instance_list))
995                    self.add_instances(instances)
996            elif integration:
997                instances = list(filter(lambda item:  item.platform.name in ts.integration_platforms, instance_list))
998                self.add_instances(instances)
999
1000            elif emulation_platforms:
1001                self.add_instances(instance_list)
1002                for instance in list(filter(lambda inst: not inst.platform.simulation != 'na', instance_list)):
1003                    instance.add_filter("Not an emulated platform", Filters.CMD_LINE)
1004            elif vendor_platforms:
1005                self.add_instances(instance_list)
1006                for instance in list(filter(lambda inst: not inst.platform.vendor in vendor_filter, instance_list)):
1007                    instance.add_filter("Not a selected vendor platform", Filters.CMD_LINE)
1008            else:
1009                self.add_instances(instance_list)
1010
1011        for _, case in self.instances.items():
1012            case.create_overlay(case.platform, self.options.enable_asan, self.options.enable_ubsan, self.options.enable_coverage, self.options.coverage_platform)
1013
1014        self.selected_platforms = set(p.platform.name for p in self.instances.values())
1015
1016        filtered_instances = list(filter(lambda item:  item.status == "filtered", self.instances.values()))
1017        for filtered_instance in filtered_instances:
1018            change_skip_to_error_if_integration(self.options, filtered_instance)
1019
1020            filtered_instance.add_missing_case_status(filtered_instance.status)
1021
1022        self.filtered_platforms = set(p.platform.name for p in self.instances.values()
1023                                      if p.status != "skipped" )
1024
1025    def add_instances(self, instance_list):
1026        for instance in instance_list:
1027            self.instances[instance.name] = instance
1028
1029
1030    def get_testsuite(self, identifier):
1031        results = []
1032        for _, ts in self.testsuites.items():
1033            for case in ts.testcases:
1034                if case.name == identifier:
1035                    results.append(ts)
1036        return results
1037
1038    def verify_platforms_existence(self, platform_names_to_verify, log_info=""):
1039        """
1040        Verify if platform name (passed by --platform option, or in yaml file
1041        as platform_allow or integration_platforms options) is correct. If not -
1042        log and raise error.
1043        """
1044        for platform in platform_names_to_verify:
1045            if platform in self.platform_names:
1046                continue
1047            else:
1048                logger.error(f"{log_info} - unrecognized platform - {platform}")
1049                sys.exit(2)
1050
1051    def create_build_dir_links(self):
1052        """
1053        Iterate through all no-skipped instances in suite and create links
1054        for each one build directories. Those links will be passed in the next
1055        steps to the CMake command.
1056        """
1057
1058        links_dir_name = "twister_links"  # folder for all links
1059        links_dir_path = os.path.join(self.env.outdir, links_dir_name)
1060        if not os.path.exists(links_dir_path):
1061            os.mkdir(links_dir_path)
1062
1063        for instance in self.instances.values():
1064            if instance.status != "skipped":
1065                self._create_build_dir_link(links_dir_path, instance)
1066
1067    def _create_build_dir_link(self, links_dir_path, instance):
1068        """
1069        Create build directory with original "long" path. Next take shorter
1070        path and link them with original path - create link. At the end
1071        replace build_dir to created link. This link will be passed to CMake
1072        command. This action helps to limit path length which can be
1073        significant during building by CMake on Windows OS.
1074        """
1075
1076        os.makedirs(instance.build_dir, exist_ok=True)
1077
1078        link_name = f"test_{self.link_dir_counter}"
1079        link_path = os.path.join(links_dir_path, link_name)
1080
1081        if os.name == "nt":  # if OS is Windows
1082            command = ["mklink", "/J", f"{link_path}", os.path.normpath(instance.build_dir)]
1083            subprocess.call(command, shell=True)
1084        else:  # for Linux and MAC OS
1085            os.symlink(instance.build_dir, link_path)
1086
1087        # Here original build directory is replaced with symbolic link. It will
1088        # be passed to CMake command
1089        instance.build_dir = link_path
1090
1091        self.link_dir_counter += 1
1092
1093
1094def change_skip_to_error_if_integration(options, instance):
1095    ''' All skips on integration_platforms are treated as errors.'''
1096    if instance.platform.name in instance.testsuite.integration_platforms:
1097        # Do not treat this as error if filter type is among ignore_filters
1098        filters = {t['type'] for t in instance.filters}
1099        ignore_filters ={Filters.CMD_LINE, Filters.SKIP, Filters.PLATFORM_KEY,
1100                         Filters.TOOLCHAIN, Filters.MODULE, Filters.TESTPLAN,
1101                         Filters.QUARANTINE}
1102        if filters.intersection(ignore_filters):
1103            return
1104        instance.status = "error"
1105        instance.reason += " but is one of the integration platforms"
1106