1#!/usr/bin/env python3 2# vim: set syntax=python ts=4 : 3# 4# Copyright (c) 2018 Intel Corporation 5# SPDX-License-Identifier: Apache-2.0 6import os 7import sys 8import re 9import subprocess 10import glob 11import json 12import collections 13from collections import OrderedDict 14from itertools import islice 15import logging 16import copy 17import shutil 18import random 19import snippets 20from colorama import Fore 21from pathlib import Path 22from argparse import Namespace 23 24logger = logging.getLogger('twister') 25logger.setLevel(logging.DEBUG) 26 27try: 28 from anytree import RenderTree, Node, find 29except ImportError: 30 print("Install the anytree module to use the --test-tree option") 31 32from twisterlib.testsuite import TestSuite, scan_testsuite_path 33from twisterlib.error import TwisterRuntimeError 34from twisterlib.platform import Platform 35from twisterlib.config_parser import TwisterConfigParser 36from twisterlib.testinstance import TestInstance 37from twisterlib.quarantine import Quarantine 38 39import list_boards 40from zephyr_module import parse_modules 41 42ZEPHYR_BASE = os.getenv("ZEPHYR_BASE") 43if not ZEPHYR_BASE: 44 sys.exit("$ZEPHYR_BASE environment variable undefined") 45 46# This is needed to load edt.pickle files. 47sys.path.insert(0, os.path.join(ZEPHYR_BASE, "scripts", "dts", 48 "python-devicetree", "src")) 49from devicetree import edtlib # pylint: disable=unused-import 50 51sys.path.insert(0, os.path.join(ZEPHYR_BASE, "scripts/")) 52 53import scl 54class Filters: 55 # platform keys 56 PLATFORM_KEY = 'platform key filter' 57 # filters provided on command line by the user/tester 58 CMD_LINE = 'command line filter' 59 # filters in the testsuite yaml definition 60 TESTSUITE = 'testsuite filter' 61 # filters in the testplan yaml definition 62 TESTPLAN = 'testplan filter' 63 # filters related to platform definition 64 PLATFORM = 'Platform related filter' 65 # in case a test suite was quarantined. 66 QUARANTINE = 'Quarantine filter' 67 # in case a test suite is skipped intentionally . 68 SKIP = 'Skip filter' 69 # in case of incompatibility between selected and allowed toolchains. 70 TOOLCHAIN = 'Toolchain filter' 71 # in case an optional module is not available 72 MODULE = 'Module filter' 73 74 75class TestLevel: 76 name = None 77 levels = [] 78 scenarios = [] 79 80class TestPlan: 81 config_re = re.compile('(CONFIG_[A-Za-z0-9_]+)[=]\"?([^\"]*)\"?$') 82 dt_re = re.compile('([A-Za-z0-9_]+)[=]\"?([^\"]*)\"?$') 83 84 suite_schema = scl.yaml_load( 85 os.path.join(ZEPHYR_BASE, 86 "scripts", "schemas", "twister", "testsuite-schema.yaml")) 87 quarantine_schema = scl.yaml_load( 88 os.path.join(ZEPHYR_BASE, 89 "scripts", "schemas", "twister", "quarantine-schema.yaml")) 90 91 tc_schema_path = os.path.join(ZEPHYR_BASE, "scripts", "schemas", "twister", "test-config-schema.yaml") 92 93 SAMPLE_FILENAME = 'sample.yaml' 94 TESTSUITE_FILENAME = 'testcase.yaml' 95 96 def __init__(self, env=None): 97 98 self.options = env.options 99 self.env = env 100 101 # Keep track of which test cases we've filtered out and why 102 self.testsuites = {} 103 self.quarantine = None 104 self.platforms = [] 105 self.platform_names = [] 106 self.selected_platforms = [] 107 self.filtered_platforms = [] 108 self.default_platforms = [] 109 self.load_errors = 0 110 self.instances = dict() 111 self.instance_fail_count = 0 112 self.warnings = 0 113 114 self.scenarios = [] 115 116 self.hwm = env.hwm 117 # used during creating shorter build paths 118 self.link_dir_counter = 0 119 self.modules = [] 120 121 self.run_individual_testsuite = [] 122 self.levels = [] 123 self.test_config = {} 124 125 126 def get_level(self, name): 127 level = next((l for l in self.levels if l.name == name), None) 128 return level 129 130 def parse_configuration(self, config_file): 131 if os.path.exists(config_file): 132 tc_schema = scl.yaml_load(self.tc_schema_path) 133 self.test_config = scl.yaml_load_verify(config_file, tc_schema) 134 else: 135 raise TwisterRuntimeError(f"File {config_file} not found.") 136 137 levels = self.test_config.get('levels', []) 138 139 # Do first pass on levels to get initial data. 140 for level in levels: 141 adds = [] 142 for s in level.get('adds', []): 143 r = re.compile(s) 144 adds.extend(list(filter(r.fullmatch, self.scenarios))) 145 146 tl = TestLevel() 147 tl.name = level['name'] 148 tl.scenarios = adds 149 tl.levels = level.get('inherits', []) 150 self.levels.append(tl) 151 152 # Go over levels again to resolve inheritance. 153 for level in levels: 154 inherit = level.get('inherits', []) 155 _level = self.get_level(level['name']) 156 if inherit: 157 for inherted_level in inherit: 158 _inherited = self.get_level(inherted_level) 159 _inherited_scenarios = _inherited.scenarios 160 level_scenarios = _level.scenarios 161 level_scenarios.extend(_inherited_scenarios) 162 163 def find_subtests(self): 164 sub_tests = self.options.sub_test 165 if sub_tests: 166 for subtest in sub_tests: 167 _subtests = self.get_testsuite(subtest) 168 for _subtest in _subtests: 169 self.run_individual_testsuite.append(_subtest.name) 170 171 if self.run_individual_testsuite: 172 logger.info("Running the following tests:") 173 for test in self.run_individual_testsuite: 174 print(" - {}".format(test)) 175 else: 176 raise TwisterRuntimeError("Tests not found") 177 178 def discover(self): 179 self.handle_modules() 180 if self.options.test: 181 self.run_individual_testsuite = self.options.test 182 183 num = self.add_testsuites(testsuite_filter=self.run_individual_testsuite) 184 if num == 0: 185 raise TwisterRuntimeError("No test cases found at the specified location...") 186 187 self.find_subtests() 188 # get list of scenarios we have parsed into one list 189 for _, ts in self.testsuites.items(): 190 self.scenarios.append(ts.id) 191 192 self.report_duplicates() 193 194 self.parse_configuration(config_file=self.env.test_config) 195 self.add_configurations() 196 197 if self.load_errors: 198 raise TwisterRuntimeError("Errors while loading configurations") 199 200 # handle quarantine 201 ql = self.options.quarantine_list 202 qv = self.options.quarantine_verify 203 if qv and not ql: 204 logger.error("No quarantine list given to be verified") 205 raise TwisterRuntimeError("No quarantine list given to be verified") 206 if ql: 207 for quarantine_file in ql: 208 try: 209 # validate quarantine yaml file against the provided schema 210 scl.yaml_load_verify(quarantine_file, self.quarantine_schema) 211 except scl.EmptyYamlFileException: 212 logger.debug(f'Quarantine file {quarantine_file} is empty') 213 self.quarantine = Quarantine(ql) 214 215 def load(self): 216 217 if self.options.report_suffix: 218 last_run = os.path.join(self.options.outdir, "twister_{}.json".format(self.options.report_suffix)) 219 else: 220 last_run = os.path.join(self.options.outdir, "twister.json") 221 222 if self.options.only_failed or self.options.report_summary is not None: 223 self.load_from_file(last_run) 224 self.selected_platforms = set(p.platform.name for p in self.instances.values()) 225 elif self.options.load_tests: 226 self.load_from_file(self.options.load_tests) 227 self.selected_platforms = set(p.platform.name for p in self.instances.values()) 228 elif self.options.test_only: 229 # Get list of connected hardware and filter tests to only be run on connected hardware. 230 # If the platform does not exist in the hardware map or was not specified by --platform, 231 # just skip it. 232 connected_list = self.options.platform 233 if self.options.exclude_platform: 234 for excluded in self.options.exclude_platform: 235 if excluded in connected_list: 236 connected_list.remove(excluded) 237 self.load_from_file(last_run, filter_platform=connected_list) 238 self.selected_platforms = set(p.platform.name for p in self.instances.values()) 239 else: 240 self.apply_filters() 241 242 if self.options.subset: 243 s = self.options.subset 244 try: 245 subset, sets = (int(x) for x in s.split("/")) 246 except ValueError: 247 raise TwisterRuntimeError("Bad subset value.") 248 249 if subset > sets: 250 raise TwisterRuntimeError("subset should not exceed the total number of sets") 251 252 if int(subset) > 0 and int(sets) >= int(subset): 253 logger.info("Running only a subset: %s/%s" % (subset, sets)) 254 else: 255 raise TwisterRuntimeError(f"You have provided a wrong subset value: {self.options.subset}.") 256 257 self.generate_subset(subset, int(sets)) 258 259 def generate_subset(self, subset, sets): 260 # Test instances are sorted depending on the context. For CI runs 261 # the execution order is: "plat1-testA, plat1-testB, ..., 262 # plat1-testZ, plat2-testA, ...". For hardware tests 263 # (device_testing), were multiple physical platforms can run the tests 264 # in parallel, it is more efficient to run in the order: 265 # "plat1-testA, plat2-testA, ..., plat1-testB, plat2-testB, ..." 266 if self.options.device_testing: 267 self.instances = OrderedDict(sorted(self.instances.items(), 268 key=lambda x: x[0][x[0].find("/") + 1:])) 269 else: 270 self.instances = OrderedDict(sorted(self.instances.items())) 271 272 if self.options.shuffle_tests: 273 seed_value = int.from_bytes(os.urandom(8), byteorder="big") 274 if self.options.shuffle_tests_seed is not None: 275 seed_value = self.options.shuffle_tests_seed 276 277 logger.info(f"Shuffle tests with seed: {seed_value}") 278 random.seed(seed_value) 279 temp_list = list(self.instances.items()) 280 random.shuffle(temp_list) 281 self.instances = OrderedDict(temp_list) 282 283 # Do calculation based on what is actually going to be run and evaluated 284 # at runtime, ignore the cases we already know going to be skipped. 285 # This fixes an issue where some sets would get majority of skips and 286 # basically run nothing beside filtering. 287 to_run = {k : v for k,v in self.instances.items() if v.status is None} 288 total = len(to_run) 289 per_set = int(total / sets) 290 num_extra_sets = total - (per_set * sets) 291 292 # Try and be more fair for rounding error with integer division 293 # so the last subset doesn't get overloaded, we add 1 extra to 294 # subsets 1..num_extra_sets. 295 if subset <= num_extra_sets: 296 start = (subset - 1) * (per_set + 1) 297 end = start + per_set + 1 298 else: 299 base = num_extra_sets * (per_set + 1) 300 start = ((subset - num_extra_sets - 1) * per_set) + base 301 end = start + per_set 302 303 sliced_instances = islice(to_run.items(), start, end) 304 skipped = {k : v for k,v in self.instances.items() if v.status == 'skipped'} 305 errors = {k : v for k,v in self.instances.items() if v.status == 'error'} 306 self.instances = OrderedDict(sliced_instances) 307 if subset == 1: 308 # add all pre-filtered tests that are skipped or got error status 309 # to the first set to allow for better distribution among all sets. 310 self.instances.update(skipped) 311 self.instances.update(errors) 312 313 314 def handle_modules(self): 315 # get all enabled west projects 316 modules_meta = parse_modules(ZEPHYR_BASE) 317 self.modules = [module.meta.get('name') for module in modules_meta] 318 319 320 def report(self): 321 if self.options.test_tree: 322 self.report_test_tree() 323 return 0 324 elif self.options.list_tests: 325 self.report_test_list() 326 return 0 327 elif self.options.list_tags: 328 self.report_tag_list() 329 return 0 330 331 return 1 332 333 def report_duplicates(self): 334 dupes = [item for item, count in collections.Counter(self.scenarios).items() if count > 1] 335 if dupes: 336 msg = "Duplicated test scenarios found:\n" 337 for dupe in dupes: 338 msg += ("- {} found in:\n".format(dupe)) 339 for dc in self.get_testsuite(dupe): 340 msg += (" - {}\n".format(dc.yamlfile)) 341 raise TwisterRuntimeError(msg) 342 else: 343 logger.debug("No duplicates found.") 344 345 def report_tag_list(self): 346 tags = set() 347 for _, tc in self.testsuites.items(): 348 tags = tags.union(tc.tags) 349 350 for t in tags: 351 print("- {}".format(t)) 352 353 def report_test_tree(self): 354 tests_list = self.get_tests_list() 355 356 testsuite = Node("Testsuite") 357 samples = Node("Samples", parent=testsuite) 358 tests = Node("Tests", parent=testsuite) 359 360 for test in sorted(tests_list): 361 if test.startswith("sample."): 362 sec = test.split(".") 363 area = find(samples, lambda node: node.name == sec[1] and node.parent == samples) 364 if not area: 365 area = Node(sec[1], parent=samples) 366 367 Node(test, parent=area) 368 else: 369 sec = test.split(".") 370 area = find(tests, lambda node: node.name == sec[0] and node.parent == tests) 371 if not area: 372 area = Node(sec[0], parent=tests) 373 374 if area and len(sec) > 2: 375 subarea = find(area, lambda node: node.name == sec[1] and node.parent == area) 376 if not subarea: 377 subarea = Node(sec[1], parent=area) 378 Node(test, parent=subarea) 379 380 for pre, _, node in RenderTree(testsuite): 381 print("%s%s" % (pre, node.name)) 382 383 def report_test_list(self): 384 tests_list = self.get_tests_list() 385 386 cnt = 0 387 for test in sorted(tests_list): 388 cnt = cnt + 1 389 print(" - {}".format(test)) 390 print("{} total.".format(cnt)) 391 392 393 # Debug Functions 394 @staticmethod 395 def info(what): 396 sys.stdout.write(what + "\n") 397 sys.stdout.flush() 398 399 def add_configurations(self): 400 board_dirs = set() 401 # Create a list of board roots as defined by the build system in general 402 # Note, internally in twister a board root includes the `boards` folder 403 # but in Zephyr build system, the board root is without the `boards` in folder path. 404 board_roots = [Path(os.path.dirname(root)) for root in self.env.board_roots] 405 lb_args = Namespace(arch_roots=[Path(ZEPHYR_BASE)], soc_roots=[Path(ZEPHYR_BASE), 406 Path(ZEPHYR_BASE) / 'subsys' / 'testsuite'], 407 board_roots=board_roots, board=None, board_dir=None) 408 v1_boards = list_boards.find_boards(lb_args) 409 v2_dirs = list_boards.find_v2_board_dirs(lb_args) 410 for b in v1_boards: 411 board_dirs.add(b.dir) 412 board_dirs.update(v2_dirs) 413 logger.debug("Reading platform configuration files under %s..." % self.env.board_roots) 414 415 platform_config = self.test_config.get('platforms', {}) 416 for folder in board_dirs: 417 for file in glob.glob(os.path.join(folder, "*.yaml")): 418 # If the user set a platform filter, we can, if no other option would increase 419 # the allowed platform pool, save on time by not loading YAMLs of any boards 420 # that do not start with the required names. 421 if self.options.platform and \ 422 not self.options.all and \ 423 not self.options.integration and \ 424 not any([ 425 os.path.basename(file).startswith( 426 re.split('[/@]', p)[0] 427 ) for p in self.options.platform 428 ]): 429 continue 430 try: 431 platform = Platform() 432 platform.load(file) 433 if platform.name in [p.name for p in self.platforms]: 434 logger.error(f"Duplicate platform {platform.name} in {file}") 435 raise Exception(f"Duplicate platform identifier {platform.name} found") 436 437 if not platform.twister: 438 continue 439 440 self.platforms.append(platform) 441 if not platform_config.get('override_default_platforms', False): 442 if platform.default: 443 self.default_platforms.append(platform.name) 444 else: 445 if platform.name in platform_config.get('default_platforms', []): 446 logger.debug(f"adding {platform.name} to default platforms") 447 self.default_platforms.append(platform.name) 448 449 # support board@revision 450 # if there is already an existed <board>_<revision>.yaml, then use it to 451 # load platform directly, otherwise, iterate the directory to 452 # get all valid board revision based on each <board>_<revision>.conf. 453 if '@' not in platform.name: 454 tmp_dir = os.listdir(os.path.dirname(file)) 455 for item in tmp_dir: 456 # Need to make sure the revision matches 457 # the permitted patterns as described in 458 # cmake/modules/extensions.cmake. 459 revision_patterns = ["[A-Z]", 460 "[0-9]+", 461 "(0|[1-9][0-9]*)(_[0-9]+){0,2}"] 462 463 for pattern in revision_patterns: 464 result = re.match(f"{platform.name}_(?P<revision>{pattern})\\.conf", item) 465 if result: 466 revision = result.group("revision") 467 yaml_file = f"{platform.name}_{revision}.yaml" 468 if yaml_file not in tmp_dir: 469 platform_revision = copy.deepcopy(platform) 470 revision = revision.replace("_", ".") 471 platform_revision.name = f"{platform.name}@{revision}" 472 platform_revision.normalized_name = platform_revision.name.replace("/", "_") 473 platform_revision.default = False 474 self.platforms.append(platform_revision) 475 476 break 477 478 479 except RuntimeError as e: 480 logger.error("E: %s: can't load: %s" % (file, e)) 481 self.load_errors += 1 482 483 self.platform_names = [p.name for p in self.platforms] 484 485 def get_all_tests(self): 486 testcases = [] 487 for _, ts in self.testsuites.items(): 488 for case in ts.testcases: 489 testcases.append(case.name) 490 491 return testcases 492 493 def get_tests_list(self): 494 testcases = [] 495 if tag_filter := self.options.tag: 496 for _, ts in self.testsuites.items(): 497 if ts.tags.intersection(tag_filter): 498 for case in ts.testcases: 499 testcases.append(case.name) 500 else: 501 for _, ts in self.testsuites.items(): 502 for case in ts.testcases: 503 testcases.append(case.name) 504 505 if exclude_tag := self.options.exclude_tag: 506 for _, ts in self.testsuites.items(): 507 if ts.tags.intersection(exclude_tag): 508 for case in ts.testcases: 509 if case.name in testcases: 510 testcases.remove(case.name) 511 return testcases 512 513 def add_testsuites(self, testsuite_filter=[]): 514 for root in self.env.test_roots: 515 root = os.path.abspath(root) 516 517 logger.debug("Reading test case configuration files under %s..." % root) 518 519 for dirpath, _, filenames in os.walk(root, topdown=True): 520 if self.SAMPLE_FILENAME in filenames: 521 filename = self.SAMPLE_FILENAME 522 elif self.TESTSUITE_FILENAME in filenames: 523 filename = self.TESTSUITE_FILENAME 524 else: 525 continue 526 527 logger.debug("Found possible testsuite in " + dirpath) 528 529 suite_yaml_path = os.path.join(dirpath, filename) 530 suite_path = os.path.dirname(suite_yaml_path) 531 532 for alt_config_root in self.env.alt_config_root: 533 alt_config = os.path.join(os.path.abspath(alt_config_root), 534 os.path.relpath(suite_path, root), 535 filename) 536 if os.path.exists(alt_config): 537 logger.info("Using alternative configuration from %s" % 538 os.path.normpath(alt_config)) 539 suite_yaml_path = alt_config 540 break 541 542 try: 543 parsed_data = TwisterConfigParser(suite_yaml_path, self.suite_schema) 544 parsed_data.load() 545 subcases = None 546 ztest_suite_names = None 547 548 for name in parsed_data.scenarios.keys(): 549 suite_dict = parsed_data.get_scenario(name) 550 suite = TestSuite(root, suite_path, name, data=suite_dict, detailed_test_id=self.options.detailed_test_id) 551 if suite.harness in ['ztest', 'test']: 552 if subcases is None: 553 # scan it only once per testsuite 554 subcases, ztest_suite_names = scan_testsuite_path(suite_path) 555 suite.add_subcases(suite_dict, subcases, ztest_suite_names) 556 else: 557 suite.add_subcases(suite_dict) 558 if testsuite_filter: 559 scenario = os.path.basename(suite.name) 560 if suite.name and (suite.name in testsuite_filter or scenario in testsuite_filter): 561 self.testsuites[suite.name] = suite 562 else: 563 self.testsuites[suite.name] = suite 564 565 except Exception as e: 566 logger.error(f"{suite_path}: can't load (skipping): {e!r}") 567 self.load_errors += 1 568 return len(self.testsuites) 569 570 def __str__(self): 571 return self.name 572 573 def get_platform(self, name): 574 selected_platform = None 575 for platform in self.platforms: 576 if platform.name == name: 577 selected_platform = platform 578 break 579 return selected_platform 580 581 def handle_quarantined_tests(self, instance: TestInstance, plat: Platform): 582 if self.quarantine: 583 matched_quarantine = self.quarantine.get_matched_quarantine( 584 instance.testsuite.id, plat.name, plat.arch, plat.simulation 585 ) 586 if matched_quarantine and not self.options.quarantine_verify: 587 instance.add_filter("Quarantine: " + matched_quarantine, Filters.QUARANTINE) 588 return 589 if not matched_quarantine and self.options.quarantine_verify: 590 instance.add_filter("Not under quarantine", Filters.QUARANTINE) 591 592 def load_from_file(self, file, filter_platform=[]): 593 try: 594 with open(file, "r") as json_test_plan: 595 jtp = json.load(json_test_plan) 596 instance_list = [] 597 for ts in jtp.get("testsuites", []): 598 logger.debug(f"loading {ts['name']}...") 599 testsuite = ts["name"] 600 601 platform = self.get_platform(ts["platform"]) 602 if filter_platform and platform.name not in filter_platform: 603 continue 604 instance = TestInstance(self.testsuites[testsuite], platform, self.env.outdir) 605 if ts.get("run_id"): 606 instance.run_id = ts.get("run_id") 607 608 if self.options.device_testing: 609 tfilter = 'runnable' 610 else: 611 tfilter = 'buildable' 612 instance.run = instance.check_runnable( 613 self.options.enable_slow, 614 tfilter, 615 self.options.fixture, 616 self.hwm 617 ) 618 619 instance.metrics['handler_time'] = ts.get('execution_time', 0) 620 instance.metrics['used_ram'] = ts.get("used_ram", 0) 621 instance.metrics['used_rom'] = ts.get("used_rom",0) 622 instance.metrics['available_ram'] = ts.get('available_ram', 0) 623 instance.metrics['available_rom'] = ts.get('available_rom', 0) 624 625 status = ts.get('status', None) 626 reason = ts.get("reason", "Unknown") 627 if status in ["error", "failed"]: 628 if self.options.report_summary is not None: 629 if status == "error": status = "ERROR" 630 elif status == "failed": status = "FAILED" 631 instance.status = Fore.RED + status + Fore.RESET 632 instance.reason = reason 633 self.instance_fail_count += 1 634 else: 635 instance.status = None 636 instance.reason = None 637 instance.retries += 1 638 # test marked as passed (built only) but can run when 639 # --test-only is used. Reset status to capture new results. 640 elif status == 'passed' and instance.run and self.options.test_only: 641 instance.status = None 642 instance.reason = None 643 else: 644 instance.status = status 645 instance.reason = reason 646 647 self.handle_quarantined_tests(instance, platform) 648 649 for tc in ts.get('testcases', []): 650 identifier = tc['identifier'] 651 tc_status = tc.get('status', None) 652 tc_reason = None 653 # we set reason only if status is valid, it might have been 654 # reset above... 655 if instance.status: 656 tc_reason = tc.get('reason') 657 if tc_status: 658 case = instance.set_case_status_by_name(identifier, tc_status, tc_reason) 659 case.duration = tc.get('execution_time', 0) 660 if tc.get('log'): 661 case.output = tc.get('log') 662 663 664 instance.create_overlay(platform, self.options.enable_asan, self.options.enable_ubsan, self.options.enable_coverage, self.options.coverage_platform) 665 instance_list.append(instance) 666 self.add_instances(instance_list) 667 except FileNotFoundError as e: 668 logger.error(f"{e}") 669 return 1 670 671 def apply_filters(self, **kwargs): 672 673 toolchain = self.env.toolchain 674 platform_filter = self.options.platform 675 vendor_filter = self.options.vendor 676 exclude_platform = self.options.exclude_platform 677 testsuite_filter = self.run_individual_testsuite 678 arch_filter = self.options.arch 679 tag_filter = self.options.tag 680 exclude_tag = self.options.exclude_tag 681 all_filter = self.options.all 682 runnable = (self.options.device_testing or self.options.filter == 'runnable') 683 force_toolchain = self.options.force_toolchain 684 force_platform = self.options.force_platform 685 slow_only = self.options.enable_slow_only 686 ignore_platform_key = self.options.ignore_platform_key 687 emu_filter = self.options.emulation_only 688 689 logger.debug("platform filter: " + str(platform_filter)) 690 logger.debug(" vendor filter: " + str(vendor_filter)) 691 logger.debug(" arch_filter: " + str(arch_filter)) 692 logger.debug(" tag_filter: " + str(tag_filter)) 693 logger.debug(" exclude_tag: " + str(exclude_tag)) 694 695 default_platforms = False 696 vendor_platforms = False 697 emulation_platforms = False 698 699 if all_filter: 700 logger.info("Selecting all possible platforms per test case") 701 # When --all used, any --platform arguments ignored 702 platform_filter = [] 703 elif not platform_filter and not emu_filter and not vendor_filter: 704 logger.info("Selecting default platforms per test case") 705 default_platforms = True 706 elif emu_filter: 707 logger.info("Selecting emulation platforms per test case") 708 emulation_platforms = True 709 elif vendor_filter: 710 vendor_platforms = True 711 712 if platform_filter: 713 self.verify_platforms_existence(platform_filter, f"platform_filter") 714 platforms = list(filter(lambda p: p.name in platform_filter, self.platforms)) 715 elif emu_filter: 716 platforms = list(filter(lambda p: p.simulation != 'na', self.platforms)) 717 elif vendor_filter: 718 platforms = list(filter(lambda p: p.vendor in vendor_filter, self.platforms)) 719 logger.info(f"Selecting platforms by vendors: {','.join(vendor_filter)}") 720 elif arch_filter: 721 platforms = list(filter(lambda p: p.arch in arch_filter, self.platforms)) 722 elif default_platforms: 723 _platforms = list(filter(lambda p: p.name in self.default_platforms, self.platforms)) 724 platforms = [] 725 # default platforms that can't be run are dropped from the list of 726 # the default platforms list. Default platforms should always be 727 # runnable. 728 for p in _platforms: 729 if p.simulation and p.simulation_exec: 730 if shutil.which(p.simulation_exec): 731 platforms.append(p) 732 else: 733 platforms.append(p) 734 else: 735 platforms = self.platforms 736 737 platform_config = self.test_config.get('platforms', {}) 738 logger.info("Building initial testsuite list...") 739 740 keyed_tests = {} 741 742 for ts_name, ts in self.testsuites.items(): 743 if ts.build_on_all and not platform_filter and platform_config.get('increased_platform_scope', True): 744 platform_scope = self.platforms 745 elif ts.integration_platforms: 746 integration_platforms = list(filter(lambda item: item.name in ts.integration_platforms, 747 self.platforms)) 748 if self.options.integration: 749 self.verify_platforms_existence( 750 ts.integration_platforms, f"{ts_name} - integration_platforms") 751 platform_scope = integration_platforms 752 else: 753 # if not in integration mode, still add integration platforms to the list 754 if not platform_filter: 755 self.verify_platforms_existence( 756 ts.integration_platforms, f"{ts_name} - integration_platforms") 757 platform_scope = platforms + integration_platforms 758 else: 759 platform_scope = platforms 760 else: 761 platform_scope = platforms 762 763 integration = self.options.integration and ts.integration_platforms 764 765 # If there isn't any overlap between the platform_allow list and the platform_scope 766 # we set the scope to the platform_allow list 767 if ts.platform_allow and not platform_filter and not integration and platform_config.get('increased_platform_scope', True): 768 self.verify_platforms_existence( 769 ts.platform_allow, f"{ts_name} - platform_allow") 770 a = set(platform_scope) 771 b = set(filter(lambda item: item.name in ts.platform_allow, self.platforms)) 772 c = a.intersection(b) 773 if not c: 774 platform_scope = list(filter(lambda item: item.name in ts.platform_allow, \ 775 self.platforms)) 776 # list of instances per testsuite, aka configurations. 777 instance_list = [] 778 for plat in platform_scope: 779 instance = TestInstance(ts, plat, self.env.outdir) 780 if runnable: 781 tfilter = 'runnable' 782 else: 783 tfilter = 'buildable' 784 785 instance.run = instance.check_runnable( 786 self.options.enable_slow, 787 tfilter, 788 self.options.fixture, 789 self.hwm 790 ) 791 792 if not force_platform and plat.name in exclude_platform: 793 instance.add_filter("Platform is excluded on command line.", Filters.CMD_LINE) 794 795 if (plat.arch == "unit") != (ts.type == "unit"): 796 # Discard silently 797 continue 798 799 if ts.modules and self.modules: 800 if not set(ts.modules).issubset(set(self.modules)): 801 instance.add_filter(f"one or more required modules not available: {','.join(ts.modules)}", Filters.MODULE) 802 803 if self.options.level: 804 tl = self.get_level(self.options.level) 805 if tl is None: 806 instance.add_filter(f"Unknown test level '{self.options.level}'", Filters.TESTPLAN) 807 else: 808 planned_scenarios = tl.scenarios 809 if ts.id not in planned_scenarios and not set(ts.levels).intersection(set(tl.levels)): 810 instance.add_filter("Not part of requested test plan", Filters.TESTPLAN) 811 812 if runnable and not instance.run: 813 instance.add_filter("Not runnable on device", Filters.CMD_LINE) 814 815 if self.options.integration and ts.integration_platforms and plat.name not in ts.integration_platforms: 816 instance.add_filter("Not part of integration platforms", Filters.TESTSUITE) 817 818 if ts.skip: 819 instance.add_filter("Skip filter", Filters.SKIP) 820 821 if tag_filter and not ts.tags.intersection(tag_filter): 822 instance.add_filter("Command line testsuite tag filter", Filters.CMD_LINE) 823 824 if slow_only and not ts.slow: 825 instance.add_filter("Not a slow test", Filters.CMD_LINE) 826 827 if exclude_tag and ts.tags.intersection(exclude_tag): 828 instance.add_filter("Command line testsuite exclude filter", Filters.CMD_LINE) 829 830 if testsuite_filter: 831 normalized_f = [os.path.basename(_ts) for _ts in testsuite_filter] 832 if ts.id not in normalized_f: 833 instance.add_filter("Testsuite name filter", Filters.CMD_LINE) 834 835 if arch_filter and plat.arch not in arch_filter: 836 instance.add_filter("Command line testsuite arch filter", Filters.CMD_LINE) 837 838 if not force_platform: 839 840 if ts.arch_allow and plat.arch not in ts.arch_allow: 841 instance.add_filter("Not in test case arch allow list", Filters.TESTSUITE) 842 843 if ts.arch_exclude and plat.arch in ts.arch_exclude: 844 instance.add_filter("In test case arch exclude", Filters.TESTSUITE) 845 846 if ts.platform_exclude and plat.name in ts.platform_exclude: 847 instance.add_filter("In test case platform exclude", Filters.TESTSUITE) 848 849 if ts.toolchain_exclude and toolchain in ts.toolchain_exclude: 850 instance.add_filter("In test case toolchain exclude", Filters.TOOLCHAIN) 851 852 if platform_filter and plat.name not in platform_filter: 853 instance.add_filter("Command line platform filter", Filters.CMD_LINE) 854 855 if ts.platform_allow \ 856 and plat.name not in ts.platform_allow \ 857 and not (platform_filter and force_platform): 858 instance.add_filter("Not in testsuite platform allow list", Filters.TESTSUITE) 859 860 if ts.platform_type and plat.type not in ts.platform_type: 861 instance.add_filter("Not in testsuite platform type list", Filters.TESTSUITE) 862 863 if ts.toolchain_allow and toolchain not in ts.toolchain_allow: 864 instance.add_filter("Not in testsuite toolchain allow list", Filters.TOOLCHAIN) 865 866 if not plat.env_satisfied: 867 instance.add_filter("Environment ({}) not satisfied".format(", ".join(plat.env)), Filters.PLATFORM) 868 869 if not force_toolchain \ 870 and toolchain and (toolchain not in plat.supported_toolchains) \ 871 and "host" not in plat.supported_toolchains \ 872 and ts.type != 'unit': 873 instance.add_filter("Not supported by the toolchain", Filters.PLATFORM) 874 875 if plat.ram < ts.min_ram: 876 instance.add_filter("Not enough RAM", Filters.PLATFORM) 877 878 if ts.harness: 879 if ts.harness == 'robot' and plat.simulation != 'renode': 880 instance.add_filter("No robot support for the selected platform", Filters.SKIP) 881 882 if ts.depends_on: 883 dep_intersection = ts.depends_on.intersection(set(plat.supported)) 884 if dep_intersection != set(ts.depends_on): 885 instance.add_filter("No hardware support", Filters.PLATFORM) 886 887 if plat.flash < ts.min_flash: 888 instance.add_filter("Not enough FLASH", Filters.PLATFORM) 889 890 if set(plat.ignore_tags) & ts.tags: 891 instance.add_filter("Excluded tags per platform (exclude_tags)", Filters.PLATFORM) 892 893 if plat.only_tags and not set(plat.only_tags) & ts.tags: 894 instance.add_filter("Excluded tags per platform (only_tags)", Filters.PLATFORM) 895 896 if ts.required_snippets: 897 missing_snippet = False 898 snippet_args = {"snippets": ts.required_snippets} 899 found_snippets = snippets.find_snippets_in_roots(snippet_args, [*self.env.snippet_roots, Path(ts.source_dir)]) 900 901 # Search and check that all required snippet files are found 902 for this_snippet in snippet_args['snippets']: 903 if this_snippet not in found_snippets: 904 logger.error(f"Can't find snippet '%s' for test '%s'", this_snippet, ts.name) 905 instance.status = "error" 906 instance.reason = f"Snippet {this_snippet} not found" 907 missing_snippet = True 908 break 909 910 if not missing_snippet: 911 # Look for required snippets and check that they are applicable for these 912 # platforms/boards 913 for this_snippet in snippet_args['snippets']: 914 matched_snippet_board = False 915 916 # If the "appends" key is present with at least one entry then this 917 # snippet applies to all boards and further platform-specific checks 918 # are not required 919 if found_snippets[this_snippet].appends: 920 continue 921 922 for this_board in found_snippets[this_snippet].board2appends: 923 if this_board.startswith('/'): 924 match = re.search(this_board[1:-1], plat.name) 925 if match is not None: 926 matched_snippet_board = True 927 break 928 elif this_board == plat.name: 929 matched_snippet_board = True 930 break 931 932 if matched_snippet_board is False: 933 instance.add_filter("Snippet not supported", Filters.PLATFORM) 934 break 935 936 # handle quarantined tests 937 self.handle_quarantined_tests(instance, plat) 938 939 # platform_key is a list of unique platform attributes that form a unique key a test 940 # will match against to determine if it should be scheduled to run. A key containing a 941 # field name that the platform does not have will filter the platform. 942 # 943 # A simple example is keying on arch and simulation 944 # to run a test once per unique (arch, simulation) platform. 945 if not ignore_platform_key and hasattr(ts, 'platform_key') and len(ts.platform_key) > 0: 946 key_fields = sorted(set(ts.platform_key)) 947 keys = [getattr(plat, key_field) for key_field in key_fields] 948 for key in keys: 949 if key is None or key == 'na': 950 instance.add_filter( 951 f"Excluded platform missing key fields demanded by test {key_fields}", 952 Filters.PLATFORM 953 ) 954 break 955 else: 956 test_keys = copy.deepcopy(keys) 957 test_keys.append(ts.name) 958 test_keys = tuple(test_keys) 959 keyed_test = keyed_tests.get(test_keys) 960 if keyed_test is not None: 961 plat_key = {key_field: getattr(keyed_test['plat'], key_field) for key_field in key_fields} 962 instance.add_filter(f"Already covered for key {tuple(key)} by platform {keyed_test['plat'].name} having key {plat_key}", Filters.PLATFORM_KEY) 963 else: 964 # do not add a platform to keyed tests if previously filtered 965 if not instance.filters: 966 keyed_tests[test_keys] = {'plat': plat, 'ts': ts} 967 else: 968 instance.add_filter(f"Excluded platform missing key fields demanded by test {key_fields}", Filters.PLATFORM) 969 970 # if nothing stopped us until now, it means this configuration 971 # needs to be added. 972 instance_list.append(instance) 973 974 # no configurations, so jump to next testsuite 975 if not instance_list: 976 continue 977 978 # if twister was launched with no platform options at all, we 979 # take all default platforms 980 if default_platforms and not ts.build_on_all and not integration: 981 if ts.platform_allow: 982 a = set(self.default_platforms) 983 b = set(ts.platform_allow) 984 c = a.intersection(b) 985 if c: 986 aa = list(filter(lambda ts: ts.platform.name in c, instance_list)) 987 self.add_instances(aa) 988 else: 989 self.add_instances(instance_list) 990 else: 991 # add integration platforms to the list of default 992 # platforms, even if we are not in integration mode 993 _platforms = self.default_platforms + ts.integration_platforms 994 instances = list(filter(lambda ts: ts.platform.name in _platforms, instance_list)) 995 self.add_instances(instances) 996 elif integration: 997 instances = list(filter(lambda item: item.platform.name in ts.integration_platforms, instance_list)) 998 self.add_instances(instances) 999 1000 elif emulation_platforms: 1001 self.add_instances(instance_list) 1002 for instance in list(filter(lambda inst: not inst.platform.simulation != 'na', instance_list)): 1003 instance.add_filter("Not an emulated platform", Filters.CMD_LINE) 1004 elif vendor_platforms: 1005 self.add_instances(instance_list) 1006 for instance in list(filter(lambda inst: not inst.platform.vendor in vendor_filter, instance_list)): 1007 instance.add_filter("Not a selected vendor platform", Filters.CMD_LINE) 1008 else: 1009 self.add_instances(instance_list) 1010 1011 for _, case in self.instances.items(): 1012 case.create_overlay(case.platform, self.options.enable_asan, self.options.enable_ubsan, self.options.enable_coverage, self.options.coverage_platform) 1013 1014 self.selected_platforms = set(p.platform.name for p in self.instances.values()) 1015 1016 filtered_instances = list(filter(lambda item: item.status == "filtered", self.instances.values())) 1017 for filtered_instance in filtered_instances: 1018 change_skip_to_error_if_integration(self.options, filtered_instance) 1019 1020 filtered_instance.add_missing_case_status(filtered_instance.status) 1021 1022 self.filtered_platforms = set(p.platform.name for p in self.instances.values() 1023 if p.status != "skipped" ) 1024 1025 def add_instances(self, instance_list): 1026 for instance in instance_list: 1027 self.instances[instance.name] = instance 1028 1029 1030 def get_testsuite(self, identifier): 1031 results = [] 1032 for _, ts in self.testsuites.items(): 1033 for case in ts.testcases: 1034 if case.name == identifier: 1035 results.append(ts) 1036 return results 1037 1038 def verify_platforms_existence(self, platform_names_to_verify, log_info=""): 1039 """ 1040 Verify if platform name (passed by --platform option, or in yaml file 1041 as platform_allow or integration_platforms options) is correct. If not - 1042 log and raise error. 1043 """ 1044 for platform in platform_names_to_verify: 1045 if platform in self.platform_names: 1046 continue 1047 else: 1048 logger.error(f"{log_info} - unrecognized platform - {platform}") 1049 sys.exit(2) 1050 1051 def create_build_dir_links(self): 1052 """ 1053 Iterate through all no-skipped instances in suite and create links 1054 for each one build directories. Those links will be passed in the next 1055 steps to the CMake command. 1056 """ 1057 1058 links_dir_name = "twister_links" # folder for all links 1059 links_dir_path = os.path.join(self.env.outdir, links_dir_name) 1060 if not os.path.exists(links_dir_path): 1061 os.mkdir(links_dir_path) 1062 1063 for instance in self.instances.values(): 1064 if instance.status != "skipped": 1065 self._create_build_dir_link(links_dir_path, instance) 1066 1067 def _create_build_dir_link(self, links_dir_path, instance): 1068 """ 1069 Create build directory with original "long" path. Next take shorter 1070 path and link them with original path - create link. At the end 1071 replace build_dir to created link. This link will be passed to CMake 1072 command. This action helps to limit path length which can be 1073 significant during building by CMake on Windows OS. 1074 """ 1075 1076 os.makedirs(instance.build_dir, exist_ok=True) 1077 1078 link_name = f"test_{self.link_dir_counter}" 1079 link_path = os.path.join(links_dir_path, link_name) 1080 1081 if os.name == "nt": # if OS is Windows 1082 command = ["mklink", "/J", f"{link_path}", os.path.normpath(instance.build_dir)] 1083 subprocess.call(command, shell=True) 1084 else: # for Linux and MAC OS 1085 os.symlink(instance.build_dir, link_path) 1086 1087 # Here original build directory is replaced with symbolic link. It will 1088 # be passed to CMake command 1089 instance.build_dir = link_path 1090 1091 self.link_dir_counter += 1 1092 1093 1094def change_skip_to_error_if_integration(options, instance): 1095 ''' All skips on integration_platforms are treated as errors.''' 1096 if instance.platform.name in instance.testsuite.integration_platforms: 1097 # Do not treat this as error if filter type is among ignore_filters 1098 filters = {t['type'] for t in instance.filters} 1099 ignore_filters ={Filters.CMD_LINE, Filters.SKIP, Filters.PLATFORM_KEY, 1100 Filters.TOOLCHAIN, Filters.MODULE, Filters.TESTPLAN, 1101 Filters.QUARANTINE} 1102 if filters.intersection(ignore_filters): 1103 return 1104 instance.status = "error" 1105 instance.reason += " but is one of the integration platforms" 1106