1#!/usr/bin/env python3
2# vim: set syntax=python ts=4 :
3#
4# Copyright (c) 2018-2024 Intel Corporation
5# Copyright (c) 2024 Arm Limited (or its affiliates). All rights reserved.
6#
7# SPDX-License-Identifier: Apache-2.0
8import collections
9import copy
10import glob
11import json
12import logging
13import os
14import random
15import re
16import subprocess
17import sys
18from argparse import Namespace
19from collections import OrderedDict
20from itertools import islice
21from pathlib import Path
22
23import snippets
24
25try:
26    from anytree import Node, RenderTree, find
27except ImportError:
28    print("Install the anytree module to use the --test-tree option")
29
30import list_boards
31import scl
32from twisterlib.config_parser import TwisterConfigParser
33from twisterlib.error import TwisterRuntimeError
34from twisterlib.platform import Platform
35from twisterlib.quarantine import Quarantine
36from twisterlib.statuses import TwisterStatus
37from twisterlib.testinstance import TestInstance
38from twisterlib.testsuite import TestSuite, scan_testsuite_path
39from zephyr_module import parse_modules
40
41logger = logging.getLogger('twister')
42logger.setLevel(logging.DEBUG)
43
44ZEPHYR_BASE = os.getenv("ZEPHYR_BASE")
45if not ZEPHYR_BASE:
46    sys.exit("$ZEPHYR_BASE environment variable undefined")
47
48# This is needed to load edt.pickle files.
49sys.path.insert(0, os.path.join(ZEPHYR_BASE, "scripts", "dts",
50                                "python-devicetree", "src"))
51from devicetree import edtlib  # pylint: disable=unused-import
52
53sys.path.insert(0, os.path.join(ZEPHYR_BASE, "scripts/"))
54
55class Filters:
56    # platform keys
57    PLATFORM_KEY = 'platform key filter'
58    # filters provided on command line by the user/tester
59    CMD_LINE = 'command line filter'
60    # filters in the testsuite yaml definition
61    TESTSUITE = 'testsuite filter'
62    # filters in the testplan yaml definition
63    TESTPLAN = 'testplan filter'
64    # filters related to platform definition
65    PLATFORM = 'Platform related filter'
66    # in case a test suite was quarantined.
67    QUARANTINE = 'Quarantine filter'
68    # in case a test suite is skipped intentionally .
69    SKIP = 'Skip filter'
70    # in case of incompatibility between selected and allowed toolchains.
71    TOOLCHAIN = 'Toolchain filter'
72    # in case where an optional module is not available
73    MODULE = 'Module filter'
74    # in case of missing env. variable required for a platform
75    ENVIRONMENT = 'Environment filter'
76
77
78class TestLevel:
79    name = None
80    levels = []
81    scenarios = []
82
83
84class TestPlan:
85    __test__ = False  # for pytest to skip this class when collects tests
86    config_re = re.compile('(CONFIG_[A-Za-z0-9_]+)[=]\"?([^\"]*)\"?$')
87    dt_re = re.compile('([A-Za-z0-9_]+)[=]\"?([^\"]*)\"?$')
88
89    suite_schema = scl.yaml_load(
90        os.path.join(ZEPHYR_BASE,
91                     "scripts", "schemas", "twister", "testsuite-schema.yaml"))
92    quarantine_schema = scl.yaml_load(
93        os.path.join(ZEPHYR_BASE,
94                     "scripts", "schemas", "twister", "quarantine-schema.yaml"))
95
96    tc_schema_path = os.path.join(
97        ZEPHYR_BASE,
98        "scripts",
99        "schemas",
100        "twister",
101        "test-config-schema.yaml"
102    )
103
104    SAMPLE_FILENAME = 'sample.yaml'
105    TESTSUITE_FILENAME = 'testcase.yaml'
106
107    def __init__(self, env: Namespace):
108
109        self.options = env.options
110        self.env = env
111
112        # Keep track of which test cases we've filtered out and why
113        self.testsuites = {}
114        self.quarantine = None
115        self.platforms = []
116        self.platform_names = []
117        self.selected_platforms = []
118        self.default_platforms = []
119        self.load_errors = 0
120        self.instances = dict()
121        self.instance_fail_count = 0
122        self.warnings = 0
123
124        self.scenarios = []
125
126        self.hwm = env.hwm
127        # used during creating shorter build paths
128        self.link_dir_counter = 0
129        self.modules = []
130
131        self.run_individual_testsuite = []
132        self.levels = []
133        self.test_config =  {}
134
135        self.name = "unnamed"
136
137    def get_level(self, name):
138        level = next((lvl for lvl in self.levels if lvl.name == name), None)
139        return level
140
141    def parse_configuration(self, config_file):
142        if os.path.exists(config_file):
143            tc_schema = scl.yaml_load(self.tc_schema_path)
144            self.test_config = scl.yaml_load_verify(config_file, tc_schema)
145        else:
146            raise TwisterRuntimeError(f"File {config_file} not found.")
147
148        levels = self.test_config.get('levels', [])
149
150        # Do first pass on levels to get initial data.
151        for level in levels:
152            adds = []
153            for s in  level.get('adds', []):
154                r = re.compile(s)
155                adds.extend(list(filter(r.fullmatch, self.scenarios)))
156
157            tl = TestLevel()
158            tl.name = level['name']
159            tl.scenarios = adds
160            tl.levels = level.get('inherits', [])
161            self.levels.append(tl)
162
163        # Go over levels again to resolve inheritance.
164        for level in levels:
165            inherit = level.get('inherits', [])
166            _level = self.get_level(level['name'])
167            if inherit:
168                for inherted_level in inherit:
169                    _inherited = self.get_level(inherted_level)
170                    assert _inherited, "Unknown inherited level {inherted_level}"
171                    _inherited_scenarios = _inherited.scenarios
172                    level_scenarios = _level.scenarios if _level else []
173                    level_scenarios.extend(_inherited_scenarios)
174
175    def find_subtests(self):
176        sub_tests = self.options.sub_test
177        if sub_tests:
178            for subtest in sub_tests:
179                _subtests = self.get_testcase(subtest)
180                for _subtest in _subtests:
181                    self.run_individual_testsuite.append(_subtest.name)
182
183            if self.run_individual_testsuite:
184                logger.info("Running the following tests:")
185                for test in self.run_individual_testsuite:
186                    print(f" - {test}")
187            else:
188                raise TwisterRuntimeError("Tests not found")
189
190    def discover(self):
191        self.handle_modules()
192        if self.options.test:
193            self.run_individual_testsuite = self.options.test
194
195        self.add_configurations()
196        num = self.add_testsuites(testsuite_filter=self.run_individual_testsuite)
197        if num == 0:
198            raise TwisterRuntimeError("No testsuites found at the specified location...")
199        if self.load_errors:
200            raise TwisterRuntimeError(
201                f"Found {self.load_errors} errors loading {num} test configurations."
202            )
203
204        self.find_subtests()
205        # get list of scenarios we have parsed into one list
206        for _, ts in self.testsuites.items():
207            self.scenarios.append(ts.id)
208
209        self.report_duplicates()
210        self.parse_configuration(config_file=self.env.test_config)
211
212        # handle quarantine
213        ql = self.options.quarantine_list
214        qv = self.options.quarantine_verify
215        if qv and not ql:
216            logger.error("No quarantine list given to be verified")
217            raise TwisterRuntimeError("No quarantine list given to be verified")
218        if ql:
219            for quarantine_file in ql:
220                try:
221                    # validate quarantine yaml file against the provided schema
222                    scl.yaml_load_verify(quarantine_file, self.quarantine_schema)
223                except scl.EmptyYamlFileException:
224                    logger.debug(f'Quarantine file {quarantine_file} is empty')
225            self.quarantine = Quarantine(ql)
226
227    def load(self):
228
229        if self.options.report_suffix:
230            last_run = os.path.join(
231                self.options.outdir,
232                f"twister_{self.options.report_suffix}.json"
233            )
234        else:
235            last_run = os.path.join(self.options.outdir, "twister.json")
236
237        if self.options.only_failed or self.options.report_summary is not None:
238            self.load_from_file(last_run)
239            self.selected_platforms = set(p.platform.name for p in self.instances.values())
240        elif self.options.load_tests:
241            self.load_from_file(self.options.load_tests)
242            self.selected_platforms = set(p.platform.name for p in self.instances.values())
243        elif self.options.test_only:
244            # Get list of connected hardware and filter tests to only be run on connected hardware.
245            # If the platform does not exist in the hardware map or was not specified by --platform,
246            # just skip it.
247
248            connected_list = []
249            excluded_list = []
250            for _cp in self.options.platform:
251                if _cp in self.platform_names:
252                    connected_list.append(self.get_platform(_cp).name)
253
254            if self.options.exclude_platform:
255                for _p in self.options.exclude_platform:
256                    if _p in self.platform_names:
257                        excluded_list.append(self.get_platform(_p).name)
258                for excluded in excluded_list:
259                    if excluded in connected_list:
260                        connected_list.remove(excluded)
261
262            self.load_from_file(last_run, filter_platform=connected_list)
263            self.selected_platforms = set(p.platform.name for p in self.instances.values())
264        else:
265            self.apply_filters()
266
267        if self.options.subset:
268            s =  self.options.subset
269            try:
270                subset, sets = (int(x) for x in s.split("/"))
271            except ValueError as err:
272                raise TwisterRuntimeError("Bad subset value.") from err
273
274            if subset > sets:
275                raise TwisterRuntimeError("subset should not exceed the total number of sets")
276
277            if int(subset) > 0 and int(sets) >= int(subset):
278                logger.info(f"Running only a subset: {subset}/{sets}")
279            else:
280                raise TwisterRuntimeError(
281                    f"You have provided a wrong subset value: {self.options.subset}."
282                )
283
284            self.generate_subset(subset, int(sets))
285
286    def generate_subset(self, subset, sets):
287        # Test instances are sorted depending on the context. For CI runs
288        # the execution order is: "plat1-testA, plat1-testB, ...,
289        # plat1-testZ, plat2-testA, ...". For hardware tests
290        # (device_testing), were multiple physical platforms can run the tests
291        # in parallel, it is more efficient to run in the order:
292        # "plat1-testA, plat2-testA, ..., plat1-testB, plat2-testB, ..."
293        if self.options.device_testing:
294            self.instances = OrderedDict(sorted(self.instances.items(),
295                                key=lambda x: x[0][x[0].find("/") + 1:]))
296        else:
297            self.instances = OrderedDict(sorted(self.instances.items()))
298
299        if self.options.shuffle_tests:
300            seed_value = int.from_bytes(os.urandom(8), byteorder="big")
301            if self.options.shuffle_tests_seed is not None:
302                seed_value = self.options.shuffle_tests_seed
303
304            logger.info(f"Shuffle tests with seed: {seed_value}")
305            random.seed(seed_value)
306            temp_list = list(self.instances.items())
307            random.shuffle(temp_list)
308            self.instances = OrderedDict(temp_list)
309
310        # Do calculation based on what is actually going to be run and evaluated
311        # at runtime, ignore the cases we already know going to be skipped.
312        # This fixes an issue where some sets would get majority of skips and
313        # basically run nothing beside filtering.
314        to_run = {k : v for k,v in self.instances.items() if v.status == TwisterStatus.NONE}
315        total = len(to_run)
316        per_set = int(total / sets)
317        num_extra_sets = total - (per_set * sets)
318
319        # Try and be more fair for rounding error with integer division
320        # so the last subset doesn't get overloaded, we add 1 extra to
321        # subsets 1..num_extra_sets.
322        if subset <= num_extra_sets:
323            start = (subset - 1) * (per_set + 1)
324            end = start + per_set + 1
325        else:
326            base = num_extra_sets * (per_set + 1)
327            start = ((subset - num_extra_sets - 1) * per_set) + base
328            end = start + per_set
329
330        sliced_instances = islice(to_run.items(), start, end)
331        skipped = {k : v for k,v in self.instances.items() if v.status == TwisterStatus.SKIP}
332        errors = {k : v for k,v in self.instances.items() if v.status == TwisterStatus.ERROR}
333        self.instances = OrderedDict(sliced_instances)
334        if subset == 1:
335            # add all pre-filtered tests that are skipped or got error status
336            # to the first set to allow for better distribution among all sets.
337            self.instances.update(skipped)
338            self.instances.update(errors)
339
340
341    def handle_modules(self):
342        # get all enabled west projects
343        modules_meta = parse_modules(ZEPHYR_BASE)
344        self.modules = [module.meta.get('name') for module in modules_meta]
345
346
347    def report(self):
348        if self.options.test_tree:
349            if not self.options.detailed_test_id:
350                logger.info("Test tree is always shown with detailed test-id.")
351            self.report_test_tree()
352            return 0
353        elif self.options.list_tests:
354            if not self.options.detailed_test_id:
355                logger.info("Test list is always shown with detailed test-id.")
356            self.report_test_list()
357            return 0
358        elif self.options.list_tags:
359            self.report_tag_list()
360            return 0
361
362        return 1
363
364    def report_duplicates(self):
365        dupes = [item for item, count in collections.Counter(self.scenarios).items() if count > 1]
366        if dupes:
367            msg = "Duplicated test scenarios found:\n"
368            for dupe in dupes:
369                msg += (f"- {dupe} found in:\n")
370                for dc in self.get_testsuite(dupe):
371                    msg += (f"  - {dc.yamlfile}\n")
372            raise TwisterRuntimeError(msg)
373        else:
374            logger.debug("No duplicates found.")
375
376    def report_tag_list(self):
377        tags = set()
378        for _, tc in self.testsuites.items():
379            tags = tags.union(tc.tags)
380
381        for t in tags:
382            print(f"- {t}")
383
384    def report_test_tree(self):
385        tests_list = self.get_tests_list()
386
387        testsuite = Node("Testsuite")
388        samples = Node("Samples", parent=testsuite)
389        tests = Node("Tests", parent=testsuite)
390
391        for test in sorted(tests_list):
392            if test.startswith("sample."):
393                sec = test.split(".")
394                area = find(
395                    samples,
396                    lambda node, sname=sec[1]: node.name == sname and node.parent == samples
397                )
398                if not area:
399                    area = Node(sec[1], parent=samples)
400
401                Node(test, parent=area)
402            else:
403                sec = test.split(".")
404                area = find(
405                    tests,
406                    lambda node, sname=sec[0]: node.name == sname and node.parent == tests
407                )
408                if not area:
409                    area = Node(sec[0], parent=tests)
410
411                if area and len(sec) > 2:
412                    subarea = find(
413                        area, lambda node, sname=sec[1], sparent=area: node.name == sname
414                        and node.parent == sparent
415                    )
416                    if not subarea:
417                        subarea = Node(sec[1], parent=area)
418                    Node(test, parent=subarea)
419
420        for pre, _, node in RenderTree(testsuite):
421            print(f"{pre}{node.name}")
422
423    def report_test_list(self):
424        tests_list = self.get_tests_list()
425
426        cnt = 0
427        for test in sorted(tests_list):
428            cnt = cnt + 1
429            print(f" - {test}")
430        print(f"{cnt} total.")
431
432
433    # Debug Functions
434    @staticmethod
435    def info(what):
436        sys.stdout.write(what + "\n")
437        sys.stdout.flush()
438
439    def find_twister_data(self, board_data_list, board_aliases):
440        """Find the twister data for a board in the list of board data based on the aliases"""
441        for board_data in board_data_list:
442            if board_data.get('identifier') in board_aliases:
443                return board_data
444
445    def add_configurations(self):
446        # Create a list of board roots as defined by the build system in general
447        # Note, internally in twister a board root includes the `boards` folder
448        # but in Zephyr build system, the board root is without the `boards` in folder path.
449        board_roots = [Path(os.path.dirname(root)) for root in self.env.board_roots]
450        lb_args = Namespace(arch_roots=self.env.arch_roots, soc_roots=self.env.soc_roots,
451                            board_roots=board_roots, board=None, board_dir=None)
452
453        known_boards = list_boards.find_v2_boards(lb_args)
454        bdirs = {}
455        platform_config = self.test_config.get('platforms', {})
456
457        # helper function to initialize and add platforms
458        def init_and_add_platforms(data, board, target, qualifier, aliases):
459            platform = Platform()
460            if not new_config_found:
461                data = self.find_twister_data(bdirs[board.dir], aliases)
462                if not data:
463                    return
464            platform.load(board, target, aliases, data)
465            platform.qualifier = qualifier
466            if platform.name in [p.name for p in self.platforms]:
467                logger.error(f"Duplicate platform {platform.name} in {board.dir}")
468                raise Exception(f"Duplicate platform identifier {platform.name} found")
469            if not platform.twister:
470                return
471            self.platforms.append(platform)
472
473        for board in known_boards.values():
474            new_config_found = False
475            # don't load the same board data twice
476            if not bdirs.get(board.dir):
477                datas = []
478                for file in glob.glob(os.path.join(board.dir, "*.yaml")):
479                    if os.path.basename(file) == "twister.yaml":
480                        continue
481                    try:
482                        scp = TwisterConfigParser(file, Platform.platform_schema)
483                        sdata = scp.load()
484                        datas.append(sdata)
485                    except Exception as e:
486                        logger.error(f"Error loading {file}: {e!r}")
487                        self.load_errors += 1
488                        continue
489                bdirs[board.dir] = datas
490            data = {}
491            if os.path.exists(board.dir / 'twister.yaml'):
492                try:
493                    scp = TwisterConfigParser(board.dir / 'twister.yaml', Platform.platform_schema)
494                    data = scp.load()
495                except Exception as e:
496                    logger.error(f"Error loading {board.dir / 'twister.yaml'}: {e!r}")
497                    self.load_errors += 1
498                    continue
499                new_config_found = True
500
501
502
503            for qual in list_boards.board_v2_qualifiers(board):
504
505                if board.revisions:
506                    for rev in board.revisions:
507                        if rev.name:
508                            target = f"{board.name}@{rev.name}/{qual}"
509                            aliases = [target]
510                            if rev.name == board.revision_default:
511                                aliases.append(f"{board.name}/{qual}")
512                            if '/' not in qual and len(board.socs) == 1:
513                                if rev.name == board.revision_default:
514                                    aliases.append(f"{board.name}")
515                                aliases.append(f"{board.name}@{rev.name}")
516                        else:
517                            target = f"{board.name}/{qual}"
518                            aliases = [target]
519                            if '/' not in qual and len(board.socs) == 1 \
520                                    and rev.name == board.revision_default:
521                                aliases.append(f"{board.name}")
522
523                        init_and_add_platforms(data, board, target, qual, aliases)
524                else:
525                    target = f"{board.name}/{qual}"
526                    aliases = [target]
527                    if '/' not in qual and len(board.socs) == 1:
528                        aliases.append(board.name)
529                    init_and_add_platforms(data, board, target, qual, aliases)
530
531        for platform in self.platforms:
532            if not platform_config.get('override_default_platforms', False):
533                if platform.default:
534                    self.default_platforms.append(platform.name)
535                    #logger.debug(f"adding {platform.name} to default platforms")
536                continue
537            for pp in platform_config.get('default_platforms', []):
538                if pp in platform.aliases:
539                    logger.debug(f"adding {platform.name} to default platforms (override  mode)")
540                    self.default_platforms.append(platform.name)
541
542        self.platform_names = [a for p in self.platforms for a in p.aliases]
543
544    def get_all_tests(self):
545        testcases = []
546        for _, ts in self.testsuites.items():
547            for case in ts.testcases:
548                testcases.append(case.name)
549
550        return testcases
551
552    def get_tests_list(self):
553        testcases = []
554        if tag_filter := self.options.tag:
555            for _, ts in self.testsuites.items():
556                if ts.tags.intersection(tag_filter):
557                    for case in ts.testcases:
558                        testcases.append(case.detailed_name)
559        else:
560            for _, ts in self.testsuites.items():
561                for case in ts.testcases:
562                    testcases.append(case.detailed_name)
563
564        if exclude_tag := self.options.exclude_tag:
565            for _, ts in self.testsuites.items():
566                if ts.tags.intersection(exclude_tag):
567                    for case in ts.testcases:
568                        if case.detailed_name in testcases:
569                            testcases.remove(case.detailed_name)
570        return testcases
571
572    def add_testsuites(self, testsuite_filter=None):
573        if testsuite_filter is None:
574            testsuite_filter = []
575        for root in self.env.test_roots:
576            root = os.path.abspath(root)
577
578            logger.debug(f"Reading testsuite configuration files under {root}...")
579
580            for dirpath, _, filenames in os.walk(root, topdown=True):
581                if self.SAMPLE_FILENAME in filenames:
582                    filename = self.SAMPLE_FILENAME
583                elif self.TESTSUITE_FILENAME in filenames:
584                    filename = self.TESTSUITE_FILENAME
585                else:
586                    continue
587
588                logger.debug("Found possible testsuite in " + dirpath)
589
590                suite_yaml_path = os.path.join(dirpath, filename)
591                suite_path = os.path.dirname(suite_yaml_path)
592
593                for alt_config_root in self.env.alt_config_root:
594                    alt_config = os.path.join(os.path.abspath(alt_config_root),
595                                              os.path.relpath(suite_path, root),
596                                              filename)
597                    if os.path.exists(alt_config):
598                        logger.info(
599                            f"Using alternative configuration from {os.path.normpath(alt_config)}"
600                        )
601                        suite_yaml_path = alt_config
602                        break
603
604                try:
605                    parsed_data = TwisterConfigParser(suite_yaml_path, self.suite_schema)
606                    parsed_data.load()
607                    subcases = None
608                    ztest_suite_names = None
609
610                    for name in parsed_data.scenarios:
611                        suite_dict = parsed_data.get_scenario(name)
612                        suite = TestSuite(
613                            root,
614                            suite_path,
615                            name,
616                            data=suite_dict,
617                            detailed_test_id=self.options.detailed_test_id
618                        )
619
620                        # convert to fully qualified names
621                        suite.integration_platforms = self.verify_platforms_existence(
622                                suite.integration_platforms,
623                                f"integration_platforms in {suite.name}")
624                        suite.platform_exclude = self.verify_platforms_existence(
625                                suite.platform_exclude,
626                                f"platform_exclude in {suite.name}")
627                        suite.platform_allow =  self.verify_platforms_existence(
628                                suite.platform_allow,
629                                f"platform_allow in {suite.name}")
630
631                        if suite.harness in ['ztest', 'test']:
632                            if subcases is None:
633                                # scan it only once per testsuite
634                                subcases, ztest_suite_names = scan_testsuite_path(suite_path)
635                            suite.add_subcases(suite_dict, subcases, ztest_suite_names)
636                        else:
637                            suite.add_subcases(suite_dict)
638
639                        if testsuite_filter:
640                            scenario = os.path.basename(suite.name)
641                            if (
642                                suite.name
643                                and (suite.name in testsuite_filter or scenario in testsuite_filter)
644                            ):
645                                self.testsuites[suite.name] = suite
646                        elif suite.name in self.testsuites:
647                            msg = (
648                                f"test suite '{suite.name}' in '{suite.yamlfile}' is already added"
649                            )
650                            if suite.yamlfile == self.testsuites[suite.name].yamlfile:
651                                logger.debug(f"Skip - {msg}")
652                            else:
653                                msg = (
654                                    f"Duplicate {msg} from '{self.testsuites[suite.name].yamlfile}'"
655                                )
656                                raise TwisterRuntimeError(msg)
657                        else:
658                            self.testsuites[suite.name] = suite
659
660                except Exception as e:
661                    logger.error(f"{suite_path}: can't load (skipping): {e!r}")
662                    self.load_errors += 1
663        return len(self.testsuites)
664
665    def __str__(self):
666        return self.name
667
668    def get_platform(self, name):
669        selected_platform = None
670        for platform in self.platforms:
671            if name in platform.aliases:
672                selected_platform = platform
673                break
674        return selected_platform
675
676    def handle_quarantined_tests(self, instance: TestInstance, plat: Platform):
677        if self.quarantine:
678            simulator = plat.simulator_by_name(self.options)
679            matched_quarantine = self.quarantine.get_matched_quarantine(
680                instance.testsuite.id,
681                plat.name,
682                plat.arch,
683                simulator.name if simulator is not None else 'na'
684            )
685            if matched_quarantine and not self.options.quarantine_verify:
686                instance.add_filter("Quarantine: " + matched_quarantine, Filters.QUARANTINE)
687                return
688            if not matched_quarantine and self.options.quarantine_verify:
689                instance.add_filter("Not under quarantine", Filters.QUARANTINE)
690
691    def load_from_file(self, file, filter_platform=None):
692        if filter_platform is None:
693            filter_platform = []
694        try:
695            with open(file) as json_test_plan:
696                jtp = json.load(json_test_plan)
697                instance_list = []
698                for ts in jtp.get("testsuites", []):
699                    logger.debug(f"loading {ts['name']}...")
700                    testsuite = ts["name"]
701
702                    platform = self.get_platform(ts["platform"])
703                    if filter_platform and platform.name not in filter_platform:
704                        continue
705                    instance = TestInstance(self.testsuites[testsuite], platform, self.env.outdir)
706                    if ts.get("run_id"):
707                        instance.run_id = ts.get("run_id")
708
709                    instance.run = instance.check_runnable(
710                        self.options,
711                        self.hwm
712                    )
713
714                    if self.options.test_only and not instance.run:
715                        continue
716
717                    instance.metrics['handler_time'] = ts.get('execution_time', 0)
718                    instance.metrics['used_ram'] = ts.get("used_ram", 0)
719                    instance.metrics['used_rom']  = ts.get("used_rom",0)
720                    instance.metrics['available_ram'] = ts.get('available_ram', 0)
721                    instance.metrics['available_rom'] = ts.get('available_rom', 0)
722
723                    status = TwisterStatus(ts.get('status'))
724                    reason = ts.get("reason", "Unknown")
725                    if status in [TwisterStatus.ERROR, TwisterStatus.FAIL]:
726                        if self.options.report_summary is not None:
727                            instance.status = status
728                            instance.reason = reason
729                            self.instance_fail_count += 1
730                        else:
731                            instance.status = TwisterStatus.NONE
732                            instance.reason = None
733                            instance.retries += 1
734                    # test marked as built only can run when --test-only is used.
735                    # Reset status to capture new results.
736                    elif status == TwisterStatus.NOTRUN and instance.run and self.options.test_only:
737                        instance.status = TwisterStatus.NONE
738                        instance.reason = None
739                    else:
740                        instance.status = status
741                        instance.reason = reason
742
743                    self.handle_quarantined_tests(instance, platform)
744
745                    for tc in ts.get('testcases', []):
746                        identifier = tc['identifier']
747                        tc_status = TwisterStatus(tc.get('status'))
748                        tc_reason = None
749                        # we set reason only if status is valid, it might have been
750                        # reset above...
751                        if instance.status != TwisterStatus.NONE:
752                            tc_reason = tc.get('reason')
753                        if tc_status != TwisterStatus.NONE:
754                            case = instance.set_case_status_by_name(
755                                identifier,
756                                tc_status,
757                                tc_reason
758                            )
759                            case.duration = tc.get('execution_time', 0)
760                            if tc.get('log'):
761                                case.output = tc.get('log')
762
763                    instance.create_overlay(platform,
764                                            self.options.enable_asan,
765                                            self.options.enable_ubsan,
766                                            self.options.enable_coverage,
767                                            self.options.coverage_platform
768                                            )
769                    instance_list.append(instance)
770                self.add_instances(instance_list)
771        except FileNotFoundError as e:
772            logger.error(f"{e}")
773            return 1
774
775    def check_platform(self, platform, platform_list):
776        return any(p in platform.aliases for p in platform_list)
777
778    def apply_filters(self, **kwargs):
779
780        toolchain = self.env.toolchain
781        platform_filter = self.options.platform
782        vendor_filter = self.options.vendor
783        exclude_platform = self.options.exclude_platform
784        testsuite_filter = self.run_individual_testsuite
785        arch_filter = self.options.arch
786        tag_filter = self.options.tag
787        exclude_tag = self.options.exclude_tag
788        all_filter = self.options.all
789        runnable = (self.options.device_testing or self.options.filter == 'runnable')
790        force_toolchain = self.options.force_toolchain
791        force_platform = self.options.force_platform
792        slow_only = self.options.enable_slow_only
793        ignore_platform_key = self.options.ignore_platform_key
794        emu_filter = self.options.emulation_only
795
796        logger.debug("platform filter: " + str(platform_filter))
797        logger.debug("  vendor filter: " + str(vendor_filter))
798        logger.debug("    arch_filter: " + str(arch_filter))
799        logger.debug("     tag_filter: " + str(tag_filter))
800        logger.debug("    exclude_tag: " + str(exclude_tag))
801
802        default_platforms = False
803        vendor_platforms = False
804        emulation_platforms = False
805
806        if all_filter:
807            logger.info("Selecting all possible platforms per testsuite scenario")
808            # When --all used, any --platform arguments ignored
809            platform_filter = []
810        elif not platform_filter and not emu_filter and not vendor_filter:
811            logger.info("Selecting default platforms per testsuite scenario")
812            default_platforms = True
813        elif emu_filter:
814            logger.info("Selecting emulation platforms per testsuite scenraio")
815            emulation_platforms = True
816        elif vendor_filter:
817            vendor_platforms = True
818
819        _platforms = []
820        if platform_filter:
821            logger.debug(f"Checking platform filter: {platform_filter}")
822            # find in aliases and rename
823            platform_filter = self.verify_platforms_existence(platform_filter, "platform_filter")
824            platforms = list(filter(lambda p: p.name in platform_filter, self.platforms))
825        elif emu_filter:
826            platforms = list(
827                filter(lambda p: bool(p.simulator_by_name(self.options.sim_name)), self.platforms)
828            )
829        elif vendor_filter:
830            platforms = list(filter(lambda p: p.vendor in vendor_filter, self.platforms))
831            logger.info(f"Selecting platforms by vendors: {','.join(vendor_filter)}")
832        elif arch_filter:
833            platforms = list(filter(lambda p: p.arch in arch_filter, self.platforms))
834        elif default_platforms:
835            _platforms = list(filter(lambda p: p.name in self.default_platforms, self.platforms))
836            platforms = []
837            # default platforms that can't be run are dropped from the list of
838            # the default platforms list. Default platforms should always be
839            # runnable.
840            for p in _platforms:
841                sim = p.simulator_by_name(self.options.sim_name)
842                if (not sim) or sim.is_runnable():
843                    platforms.append(p)
844        else:
845            platforms = self.platforms
846
847        platform_config = self.test_config.get('platforms', {})
848        logger.info("Building initial testsuite list...")
849
850        keyed_tests = {}
851
852        for _, ts in self.testsuites.items():
853            if (
854                ts.build_on_all
855                and not platform_filter
856                and platform_config.get('increased_platform_scope', True)
857            ):
858                platform_scope = self.platforms
859            elif ts.integration_platforms:
860                integration_platforms = list(
861                    filter(lambda item: item.name in ts.integration_platforms, self.platforms)
862                )
863                if self.options.integration:
864                    platform_scope = integration_platforms
865                else:
866                    # if not in integration mode, still add integration platforms to the list
867                    if not platform_filter:
868                        platform_scope = platforms + integration_platforms
869                    else:
870                        platform_scope = platforms
871            else:
872                platform_scope = platforms
873
874            integration = self.options.integration and ts.integration_platforms
875
876            # If there isn't any overlap between the platform_allow list and the platform_scope
877            # we set the scope to the platform_allow list
878            if (
879                ts.platform_allow
880                and not platform_filter
881                and not integration
882                and platform_config.get('increased_platform_scope', True)
883            ):
884                a = set(platform_scope)
885                b = set(filter(lambda item: item.name in ts.platform_allow, self.platforms))
886                c = a.intersection(b)
887                if not c:
888                    platform_scope = list(
889                        filter(lambda item: item.name in ts.platform_allow, self.platforms)
890                    )
891            # list of instances per testsuite, aka configurations.
892            instance_list = []
893            for plat in platform_scope:
894                instance = TestInstance(ts, plat, self.env.outdir)
895                instance.run = instance.check_runnable(
896                    self.options,
897                    self.hwm
898                )
899
900                if not force_platform and self.check_platform(plat,exclude_platform):
901                    instance.add_filter("Platform is excluded on command line.", Filters.CMD_LINE)
902
903                if (plat.arch == "unit") != (ts.type == "unit"):
904                    # Discard silently
905                    continue
906
907                if ts.modules and self.modules and not set(ts.modules).issubset(set(self.modules)):
908                    instance.add_filter(
909                        f"one or more required modules not available: {','.join(ts.modules)}",
910                        Filters.MODULE
911                    )
912
913                if self.options.level:
914                    tl = self.get_level(self.options.level)
915                    if tl is None:
916                        instance.add_filter(
917                            f"Unknown test level '{self.options.level}'",
918                            Filters.TESTPLAN
919                        )
920                    else:
921                        planned_scenarios = tl.scenarios
922                        if (
923                            ts.id not in planned_scenarios
924                            and not set(ts.levels).intersection(set(tl.levels))
925                        ):
926                            instance.add_filter("Not part of requested test plan", Filters.TESTPLAN)
927
928                if runnable and not instance.run:
929                    instance.add_filter("Not runnable on device", Filters.CMD_LINE)
930
931                if (
932                    self.options.integration
933                    and ts.integration_platforms
934                    and plat.name not in ts.integration_platforms
935                ):
936                    instance.add_filter("Not part of integration platforms", Filters.TESTSUITE)
937
938                if ts.skip:
939                    instance.add_filter("Skip filter", Filters.SKIP)
940
941                if tag_filter and not ts.tags.intersection(tag_filter):
942                    instance.add_filter("Command line testsuite tag filter", Filters.CMD_LINE)
943
944                if slow_only and not ts.slow:
945                    instance.add_filter("Not a slow test", Filters.CMD_LINE)
946
947                if exclude_tag and ts.tags.intersection(exclude_tag):
948                    instance.add_filter("Command line testsuite exclude filter", Filters.CMD_LINE)
949
950                if testsuite_filter:
951                    normalized_f = [os.path.basename(_ts) for _ts in testsuite_filter]
952                    if ts.id not in normalized_f:
953                        instance.add_filter("Testsuite name filter", Filters.CMD_LINE)
954
955                if arch_filter and plat.arch not in arch_filter:
956                    instance.add_filter("Command line testsuite arch filter", Filters.CMD_LINE)
957
958                if not force_platform:
959
960                    if ts.arch_allow and plat.arch not in ts.arch_allow:
961                        instance.add_filter("Not in testsuite arch allow list", Filters.TESTSUITE)
962
963                    if ts.arch_exclude and plat.arch in ts.arch_exclude:
964                        instance.add_filter("In testsuite arch exclude", Filters.TESTSUITE)
965
966                    if ts.vendor_allow and plat.vendor not in ts.vendor_allow:
967                        instance.add_filter(
968                            "Not in testsuite vendor allow list",
969                            Filters.TESTSUITE
970                        )
971
972                    if ts.vendor_exclude and plat.vendor in ts.vendor_exclude:
973                        instance.add_filter("In testsuite vendor exclude", Filters.TESTSUITE)
974
975                    if ts.platform_exclude and plat.name in ts.platform_exclude:
976                        instance.add_filter("In testsuite platform exclude", Filters.TESTSUITE)
977
978                if ts.toolchain_exclude and toolchain in ts.toolchain_exclude:
979                    instance.add_filter("In testsuite toolchain exclude", Filters.TOOLCHAIN)
980
981                if platform_filter and plat.name not in platform_filter:
982                    instance.add_filter("Command line platform filter", Filters.CMD_LINE)
983
984                if ts.platform_allow \
985                        and plat.name not in ts.platform_allow \
986                        and not (platform_filter and force_platform):
987                    instance.add_filter("Not in testsuite platform allow list", Filters.TESTSUITE)
988
989                if ts.platform_type and plat.type not in ts.platform_type:
990                    instance.add_filter("Not in testsuite platform type list", Filters.TESTSUITE)
991
992                if ts.toolchain_allow and toolchain not in ts.toolchain_allow:
993                    instance.add_filter("Not in testsuite toolchain allow list", Filters.TOOLCHAIN)
994
995                if not plat.env_satisfied:
996                    instance.add_filter(
997                        "Environment ({}) not satisfied".format(", ".join(plat.env)),
998                        Filters.ENVIRONMENT
999                    )
1000
1001                if not force_toolchain \
1002                        and toolchain and (toolchain not in plat.supported_toolchains) \
1003                        and "host" not in plat.supported_toolchains \
1004                        and ts.type != 'unit':
1005                    instance.add_filter(
1006                        f"Not supported by the toolchain: {toolchain}",
1007                        Filters.PLATFORM
1008                    )
1009
1010                if plat.ram < ts.min_ram:
1011                    instance.add_filter("Not enough RAM", Filters.PLATFORM)
1012
1013                if ts.harness:
1014                    sim = plat.simulator_by_name(self.options.sim_name)
1015                    if ts.harness == 'robot' and not (sim and sim.name == 'renode'):
1016                        instance.add_filter(
1017                            "No robot support for the selected platform",
1018                            Filters.SKIP
1019                        )
1020
1021                if ts.depends_on:
1022                    dep_intersection = ts.depends_on.intersection(set(plat.supported))
1023                    if dep_intersection != set(ts.depends_on):
1024                        instance.add_filter("No hardware support", Filters.PLATFORM)
1025
1026                if plat.flash < ts.min_flash:
1027                    instance.add_filter("Not enough FLASH", Filters.PLATFORM)
1028
1029                if set(plat.ignore_tags) & ts.tags:
1030                    instance.add_filter(
1031                        "Excluded tags per platform (exclude_tags)",
1032                        Filters.PLATFORM
1033                    )
1034
1035                if plat.only_tags and not set(plat.only_tags) & ts.tags:
1036                    instance.add_filter("Excluded tags per platform (only_tags)", Filters.PLATFORM)
1037
1038                if ts.required_snippets:
1039                    missing_snippet = False
1040                    snippet_args = {"snippets": ts.required_snippets}
1041                    found_snippets = snippets.find_snippets_in_roots(
1042                        snippet_args,
1043                        [*self.env.snippet_roots, Path(ts.source_dir)]
1044                    )
1045
1046                    # Search and check that all required snippet files are found
1047                    for this_snippet in snippet_args['snippets']:
1048                        if this_snippet not in found_snippets:
1049                            logger.error(
1050                                f"Can't find snippet '{this_snippet}' for test '{ts.name}'"
1051                            )
1052                            instance.status = TwisterStatus.ERROR
1053                            instance.reason = f"Snippet {this_snippet} not found"
1054                            missing_snippet = True
1055                            break
1056
1057                    if not missing_snippet:
1058                        # Look for required snippets and check that they are applicable for these
1059                        # platforms/boards
1060                        for this_snippet in snippet_args['snippets']:
1061                            matched_snippet_board = False
1062
1063                            # If the "appends" key is present with at least one entry then this
1064                            # snippet applies to all boards and further platform-specific checks
1065                            # are not required
1066                            if found_snippets[this_snippet].appends:
1067                                continue
1068
1069                            for this_board in found_snippets[this_snippet].board2appends:
1070                                if this_board.startswith('/'):
1071                                    match = re.search(this_board[1:-1], plat.name)
1072                                    if match is not None:
1073                                        matched_snippet_board = True
1074                                        break
1075                                elif this_board == plat.name:
1076                                    matched_snippet_board = True
1077                                    break
1078
1079                            if matched_snippet_board is False:
1080                                instance.add_filter("Snippet not supported", Filters.PLATFORM)
1081                                break
1082
1083                # handle quarantined tests
1084                self.handle_quarantined_tests(instance, plat)
1085
1086                # platform_key is a list of unique platform attributes that form a unique key
1087                # a test will match against to determine if it should be scheduled to run.
1088                # A key containing a field name that the platform does not have
1089                # will filter the platform.
1090                #
1091                # A simple example is keying on arch and simulation
1092                # to run a test once per unique (arch, simulation) platform.
1093                if (
1094                    not ignore_platform_key
1095                    and hasattr(ts, 'platform_key')
1096                    and len(ts.platform_key) > 0
1097                ):
1098                    key_fields = sorted(set(ts.platform_key))
1099                    keys = [getattr(plat, key_field, None) for key_field in key_fields]
1100                    for key in keys:
1101                        if key is None or key == 'na':
1102                            instance.add_filter(
1103                                "Excluded platform missing key fields"
1104                                f" demanded by test {key_fields}",
1105                                Filters.PLATFORM
1106                            )
1107                            break
1108                    else:
1109                        test_keys = copy.deepcopy(keys)
1110                        test_keys.append(ts.name)
1111                        test_keys = tuple(test_keys)
1112                        keyed_test = keyed_tests.get(test_keys)
1113                        if keyed_test is not None:
1114                            plat_key = {
1115                                key_field: getattr(
1116                                    keyed_test['plat'],
1117                                    key_field
1118                                ) for key_field in key_fields
1119                            }
1120                            instance.add_filter(
1121                                f"Already covered for key {key}"
1122                                f" by platform {keyed_test['plat'].name} having key {plat_key}",
1123                                Filters.PLATFORM_KEY
1124                            )
1125                        else:
1126                            # do not add a platform to keyed tests if previously
1127                            # filtered
1128
1129                            if not instance.filters:
1130                                keyed_tests[test_keys] = {'plat': plat, 'ts': ts}
1131
1132                # if nothing stopped us until now, it means this configuration
1133                # needs to be added.
1134                instance_list.append(instance)
1135
1136            # no configurations, so jump to next testsuite
1137            if not instance_list:
1138                continue
1139
1140            # if twister was launched with no platform options at all, we
1141            # take all default platforms
1142            if default_platforms and not ts.build_on_all and not integration:
1143                if ts.platform_allow:
1144                    _default_p = set(self.default_platforms)
1145                    _platform_allow = set(ts.platform_allow)
1146                    _intersection = _default_p.intersection(_platform_allow)
1147                    if _intersection:
1148                        aa = list(
1149                            filter(
1150                                lambda _scenario: _scenario.platform.name in _intersection,
1151                                instance_list
1152                            )
1153                        )
1154                        self.add_instances(aa)
1155                    else:
1156                        self.add_instances(instance_list)
1157                else:
1158                    # add integration platforms to the list of default
1159                    # platforms, even if we are not in integration mode
1160                    _platforms = self.default_platforms + ts.integration_platforms
1161                    instances = list(
1162                        filter(lambda ts: ts.platform.name in _platforms, instance_list)
1163                    )
1164                    self.add_instances(instances)
1165            elif integration:
1166                instances = list(
1167                    filter(
1168                        lambda item:  item.platform.name in ts.integration_platforms,
1169                        instance_list
1170                    )
1171                )
1172                self.add_instances(instances)
1173
1174            elif emulation_platforms:
1175                self.add_instances(instance_list)
1176                for instance in list(
1177                    filter(
1178                        lambda inst: not inst.platform.simulator_by_name(self.options.sim_name),
1179                        instance_list
1180                    )
1181                ):
1182                    instance.add_filter("Not an emulated platform", Filters.CMD_LINE)
1183            elif vendor_platforms:
1184                self.add_instances(instance_list)
1185                for instance in list(
1186                    filter(
1187                        lambda inst: inst.platform.vendor not in vendor_filter,
1188                        instance_list
1189                    )
1190                ):
1191                    instance.add_filter("Not a selected vendor platform", Filters.CMD_LINE)
1192            else:
1193                self.add_instances(instance_list)
1194
1195        for _, case in self.instances.items():
1196            # Do not create files for filtered instances
1197            if case.status == TwisterStatus.FILTER:
1198                continue
1199            # set run_id for each unfiltered instance
1200            case.setup_run_id()
1201            case.create_overlay(case.platform,
1202                                self.options.enable_asan,
1203                                self.options.enable_ubsan,
1204                                self.options.enable_coverage,
1205                                self.options.coverage_platform)
1206
1207        self.selected_platforms = set(p.platform.name for p in self.instances.values())
1208
1209        filtered_instances = list(
1210            filter(lambda item:  item.status == TwisterStatus.FILTER, self.instances.values())
1211        )
1212        for filtered_instance in filtered_instances:
1213            change_skip_to_error_if_integration(self.options, filtered_instance)
1214
1215            filtered_instance.add_missing_case_status(filtered_instance.status)
1216
1217    def add_instances(self, instance_list):
1218        for instance in instance_list:
1219            self.instances[instance.name] = instance
1220
1221
1222    def get_testsuite(self, identifier):
1223        results = []
1224        for _, ts in self.testsuites.items():
1225            if ts.id == identifier:
1226                results.append(ts)
1227        return results
1228
1229    def get_testcase(self, identifier):
1230        results = []
1231        for _, ts in self.testsuites.items():
1232            for case in ts.testcases:
1233                if case.name == identifier:
1234                    results.append(ts)
1235        return results
1236
1237    def verify_platforms_existence(self, platform_names_to_verify, log_info=""):
1238        """
1239        Verify if platform name (passed by --platform option, or in yaml file
1240        as platform_allow or integration_platforms options) is correct. If not -
1241        log and raise error.
1242        """
1243        _platforms = []
1244        for platform in platform_names_to_verify:
1245            if platform in self.platform_names:
1246                p = self.get_platform(platform)
1247                if p:
1248                    _platforms.append(p.name)
1249            else:
1250                logger.error(f"{log_info} - unrecognized platform - {platform}")
1251                sys.exit(2)
1252        return _platforms
1253
1254    def create_build_dir_links(self):
1255        """
1256        Iterate through all no-skipped instances in suite and create links
1257        for each one build directories. Those links will be passed in the next
1258        steps to the CMake command.
1259        """
1260
1261        links_dir_name = "twister_links"  # folder for all links
1262        links_dir_path = os.path.join(self.env.outdir, links_dir_name)
1263        if not os.path.exists(links_dir_path):
1264            os.mkdir(links_dir_path)
1265
1266        for instance in self.instances.values():
1267            if instance.status != TwisterStatus.SKIP:
1268                self._create_build_dir_link(links_dir_path, instance)
1269
1270    def _create_build_dir_link(self, links_dir_path, instance):
1271        """
1272        Create build directory with original "long" path. Next take shorter
1273        path and link them with original path - create link. At the end
1274        replace build_dir to created link. This link will be passed to CMake
1275        command. This action helps to limit path length which can be
1276        significant during building by CMake on Windows OS.
1277        """
1278
1279        os.makedirs(instance.build_dir, exist_ok=True)
1280
1281        link_name = f"test_{self.link_dir_counter}"
1282        link_path = os.path.join(links_dir_path, link_name)
1283
1284        if os.name == "nt":  # if OS is Windows
1285            command = ["mklink", "/J", f"{link_path}", os.path.normpath(instance.build_dir)]
1286            subprocess.call(command, shell=True)
1287        else:  # for Linux and MAC OS
1288            os.symlink(instance.build_dir, link_path)
1289
1290        # Here original build directory is replaced with symbolic link. It will
1291        # be passed to CMake command
1292        instance.build_dir = link_path
1293
1294        self.link_dir_counter += 1
1295
1296
1297def change_skip_to_error_if_integration(options, instance):
1298    ''' All skips on integration_platforms are treated as errors.'''
1299    if instance.platform.name in instance.testsuite.integration_platforms:
1300        # Do not treat this as error if filter type is among ignore_filters
1301        filters = {t['type'] for t in instance.filters}
1302        ignore_filters ={Filters.CMD_LINE, Filters.SKIP, Filters.PLATFORM_KEY,
1303                         Filters.TOOLCHAIN, Filters.MODULE, Filters.TESTPLAN,
1304                         Filters.QUARANTINE, Filters.ENVIRONMENT}
1305        if filters.intersection(ignore_filters):
1306            return
1307        instance.status = TwisterStatus.ERROR
1308        instance.reason += " but is one of the integration platforms"
1309        logger.debug(
1310            f"Changing status of {instance.name} to ERROR because it is an integration platform"
1311        )
1312