1#!/usr/bin/env python3 2# vim: set syntax=python ts=4 : 3# 4# Copyright (c) 2018 Intel Corporation 5# SPDX-License-Identifier: Apache-2.0 6import os 7import sys 8import re 9import subprocess 10import glob 11import json 12import collections 13from collections import OrderedDict 14from itertools import islice 15import logging 16import copy 17import shutil 18import random 19import snippets 20from pathlib import Path 21 22logger = logging.getLogger('twister') 23logger.setLevel(logging.DEBUG) 24 25try: 26 from anytree import RenderTree, Node, find 27except ImportError: 28 print("Install the anytree module to use the --test-tree option") 29 30from twisterlib.testsuite import TestSuite, scan_testsuite_path 31from twisterlib.error import TwisterRuntimeError 32from twisterlib.platform import Platform 33from twisterlib.config_parser import TwisterConfigParser 34from twisterlib.testinstance import TestInstance 35from twisterlib.quarantine import Quarantine 36 37 38from zephyr_module import parse_modules 39 40ZEPHYR_BASE = os.getenv("ZEPHYR_BASE") 41if not ZEPHYR_BASE: 42 sys.exit("$ZEPHYR_BASE environment variable undefined") 43 44# This is needed to load edt.pickle files. 45sys.path.insert(0, os.path.join(ZEPHYR_BASE, "scripts", "dts", 46 "python-devicetree", "src")) 47from devicetree import edtlib # pylint: disable=unused-import 48 49sys.path.insert(0, os.path.join(ZEPHYR_BASE, "scripts/")) 50 51import scl 52class Filters: 53 # platform keys 54 PLATFORM_KEY = 'platform key filter' 55 # filters provided on command line by the user/tester 56 CMD_LINE = 'command line filter' 57 # filters in the testsuite yaml definition 58 TESTSUITE = 'testsuite filter' 59 # filters in the testplan yaml definition 60 TESTPLAN = 'testplan filter' 61 # filters related to platform definition 62 PLATFORM = 'Platform related filter' 63 # in case a test suite was quarantined. 64 QUARANTINE = 'Quarantine filter' 65 # in case a test suite is skipped intentionally . 66 SKIP = 'Skip filter' 67 # in case of incompatibility between selected and allowed toolchains. 68 TOOLCHAIN = 'Toolchain filter' 69 # in case an optional module is not available 70 MODULE = 'Module filter' 71 72 73class TestLevel: 74 name = None 75 levels = [] 76 scenarios = [] 77 78class TestPlan: 79 config_re = re.compile('(CONFIG_[A-Za-z0-9_]+)[=]\"?([^\"]*)\"?$') 80 dt_re = re.compile('([A-Za-z0-9_]+)[=]\"?([^\"]*)\"?$') 81 82 suite_schema = scl.yaml_load( 83 os.path.join(ZEPHYR_BASE, 84 "scripts", "schemas", "twister", "testsuite-schema.yaml")) 85 quarantine_schema = scl.yaml_load( 86 os.path.join(ZEPHYR_BASE, 87 "scripts", "schemas", "twister", "quarantine-schema.yaml")) 88 89 tc_schema_path = os.path.join(ZEPHYR_BASE, "scripts", "schemas", "twister", "test-config-schema.yaml") 90 91 SAMPLE_FILENAME = 'sample.yaml' 92 TESTSUITE_FILENAME = 'testcase.yaml' 93 94 def __init__(self, env=None): 95 96 self.options = env.options 97 self.env = env 98 99 # Keep track of which test cases we've filtered out and why 100 self.testsuites = {} 101 self.quarantine = None 102 self.platforms = [] 103 self.platform_names = [] 104 self.selected_platforms = [] 105 self.filtered_platforms = [] 106 self.default_platforms = [] 107 self.load_errors = 0 108 self.instances = dict() 109 self.warnings = 0 110 111 self.scenarios = [] 112 113 self.hwm = env.hwm 114 # used during creating shorter build paths 115 self.link_dir_counter = 0 116 self.modules = [] 117 118 self.run_individual_testsuite = [] 119 self.levels = [] 120 self.test_config = {} 121 122 123 def get_level(self, name): 124 level = next((l for l in self.levels if l.name == name), None) 125 return level 126 127 def parse_configuration(self, config_file): 128 if os.path.exists(config_file): 129 tc_schema = scl.yaml_load(self.tc_schema_path) 130 self.test_config = scl.yaml_load_verify(config_file, tc_schema) 131 else: 132 raise TwisterRuntimeError(f"File {config_file} not found.") 133 134 levels = self.test_config.get('levels', []) 135 136 # Do first pass on levels to get initial data. 137 for level in levels: 138 adds = [] 139 for s in level.get('adds', []): 140 r = re.compile(s) 141 adds.extend(list(filter(r.fullmatch, self.scenarios))) 142 143 tl = TestLevel() 144 tl.name = level['name'] 145 tl.scenarios = adds 146 tl.levels = level.get('inherits', []) 147 self.levels.append(tl) 148 149 # Go over levels again to resolve inheritance. 150 for level in levels: 151 inherit = level.get('inherits', []) 152 _level = self.get_level(level['name']) 153 if inherit: 154 for inherted_level in inherit: 155 _inherited = self.get_level(inherted_level) 156 _inherited_scenarios = _inherited.scenarios 157 level_scenarios = _level.scenarios 158 level_scenarios.extend(_inherited_scenarios) 159 160 def find_subtests(self): 161 sub_tests = self.options.sub_test 162 if sub_tests: 163 for subtest in sub_tests: 164 _subtests = self.get_testsuite(subtest) 165 for _subtest in _subtests: 166 self.run_individual_testsuite.append(_subtest.name) 167 168 if self.run_individual_testsuite: 169 logger.info("Running the following tests:") 170 for test in self.run_individual_testsuite: 171 print(" - {}".format(test)) 172 else: 173 raise TwisterRuntimeError("Tests not found") 174 175 def discover(self): 176 self.handle_modules() 177 if self.options.test: 178 self.run_individual_testsuite = self.options.test 179 180 num = self.add_testsuites(testsuite_filter=self.run_individual_testsuite) 181 if num == 0: 182 raise TwisterRuntimeError("No test cases found at the specified location...") 183 184 self.find_subtests() 185 # get list of scenarios we have parsed into one list 186 for _, ts in self.testsuites.items(): 187 self.scenarios.append(ts.id) 188 189 self.report_duplicates() 190 191 self.parse_configuration(config_file=self.env.test_config) 192 self.add_configurations() 193 194 if self.load_errors: 195 raise TwisterRuntimeError("Errors while loading configurations") 196 197 # handle quarantine 198 ql = self.options.quarantine_list 199 qv = self.options.quarantine_verify 200 if qv and not ql: 201 logger.error("No quarantine list given to be verified") 202 raise TwisterRuntimeError("No quarantine list given to be verified") 203 if ql: 204 for quarantine_file in ql: 205 try: 206 # validate quarantine yaml file against the provided schema 207 scl.yaml_load_verify(quarantine_file, self.quarantine_schema) 208 except scl.EmptyYamlFileException: 209 logger.debug(f'Quarantine file {quarantine_file} is empty') 210 self.quarantine = Quarantine(ql) 211 212 def load(self): 213 214 if self.options.report_suffix: 215 last_run = os.path.join(self.options.outdir, "twister_{}.json".format(self.options.report_suffix)) 216 else: 217 last_run = os.path.join(self.options.outdir, "twister.json") 218 219 if self.options.only_failed: 220 self.load_from_file(last_run) 221 self.selected_platforms = set(p.platform.name for p in self.instances.values()) 222 elif self.options.load_tests: 223 self.load_from_file(self.options.load_tests) 224 self.selected_platforms = set(p.platform.name for p in self.instances.values()) 225 elif self.options.test_only: 226 # Get list of connected hardware and filter tests to only be run on connected hardware. 227 # If the platform does not exist in the hardware map or was not specified by --platform, 228 # just skip it. 229 connected_list = self.options.platform 230 if self.options.exclude_platform: 231 for excluded in self.options.exclude_platform: 232 if excluded in connected_list: 233 connected_list.remove(excluded) 234 self.load_from_file(last_run, filter_platform=connected_list) 235 self.selected_platforms = set(p.platform.name for p in self.instances.values()) 236 else: 237 self.apply_filters() 238 239 if self.options.subset: 240 s = self.options.subset 241 try: 242 subset, sets = (int(x) for x in s.split("/")) 243 except ValueError: 244 raise TwisterRuntimeError("Bad subset value.") 245 246 if subset > sets: 247 raise TwisterRuntimeError("subset should not exceed the total number of sets") 248 249 if int(subset) > 0 and int(sets) >= int(subset): 250 logger.info("Running only a subset: %s/%s" % (subset, sets)) 251 else: 252 raise TwisterRuntimeError(f"You have provided a wrong subset value: {self.options.subset}.") 253 254 self.generate_subset(subset, int(sets)) 255 256 def generate_subset(self, subset, sets): 257 # Test instances are sorted depending on the context. For CI runs 258 # the execution order is: "plat1-testA, plat1-testB, ..., 259 # plat1-testZ, plat2-testA, ...". For hardware tests 260 # (device_testing), were multiple physical platforms can run the tests 261 # in parallel, it is more efficient to run in the order: 262 # "plat1-testA, plat2-testA, ..., plat1-testB, plat2-testB, ..." 263 if self.options.device_testing: 264 self.instances = OrderedDict(sorted(self.instances.items(), 265 key=lambda x: x[0][x[0].find("/") + 1:])) 266 else: 267 self.instances = OrderedDict(sorted(self.instances.items())) 268 269 if self.options.shuffle_tests: 270 seed_value = int.from_bytes(os.urandom(8), byteorder="big") 271 if self.options.shuffle_tests_seed is not None: 272 seed_value = self.options.shuffle_tests_seed 273 274 logger.info(f"Shuffle tests with seed: {seed_value}") 275 random.seed(seed_value) 276 temp_list = list(self.instances.items()) 277 random.shuffle(temp_list) 278 self.instances = OrderedDict(temp_list) 279 280 # Do calculation based on what is actually going to be run and evaluated 281 # at runtime, ignore the cases we already know going to be skipped. 282 # This fixes an issue where some sets would get majority of skips and 283 # basically run nothing beside filtering. 284 to_run = {k : v for k,v in self.instances.items() if v.status is None} 285 total = len(to_run) 286 per_set = int(total / sets) 287 num_extra_sets = total - (per_set * sets) 288 289 # Try and be more fair for rounding error with integer division 290 # so the last subset doesn't get overloaded, we add 1 extra to 291 # subsets 1..num_extra_sets. 292 if subset <= num_extra_sets: 293 start = (subset - 1) * (per_set + 1) 294 end = start + per_set + 1 295 else: 296 base = num_extra_sets * (per_set + 1) 297 start = ((subset - num_extra_sets - 1) * per_set) + base 298 end = start + per_set 299 300 sliced_instances = islice(to_run.items(), start, end) 301 skipped = {k : v for k,v in self.instances.items() if v.status == 'skipped'} 302 errors = {k : v for k,v in self.instances.items() if v.status == 'error'} 303 self.instances = OrderedDict(sliced_instances) 304 if subset == 1: 305 # add all pre-filtered tests that are skipped or got error status 306 # to the first set to allow for better distribution among all sets. 307 self.instances.update(skipped) 308 self.instances.update(errors) 309 310 311 def handle_modules(self): 312 # get all enabled west projects 313 modules_meta = parse_modules(ZEPHYR_BASE) 314 self.modules = [module.meta.get('name') for module in modules_meta] 315 316 317 def report(self): 318 if self.options.test_tree: 319 self.report_test_tree() 320 return 0 321 elif self.options.list_tests: 322 self.report_test_list() 323 return 0 324 elif self.options.list_tags: 325 self.report_tag_list() 326 return 0 327 328 return 1 329 330 def report_duplicates(self): 331 dupes = [item for item, count in collections.Counter(self.scenarios).items() if count > 1] 332 if dupes: 333 msg = "Duplicated test scenarios found:\n" 334 for dupe in dupes: 335 msg += ("- {} found in:\n".format(dupe)) 336 for dc in self.get_testsuite(dupe): 337 msg += (" - {}\n".format(dc.yamlfile)) 338 raise TwisterRuntimeError(msg) 339 else: 340 logger.debug("No duplicates found.") 341 342 def report_tag_list(self): 343 tags = set() 344 for _, tc in self.testsuites.items(): 345 tags = tags.union(tc.tags) 346 347 for t in tags: 348 print("- {}".format(t)) 349 350 def report_test_tree(self): 351 all_tests = self.get_all_tests() 352 353 testsuite = Node("Testsuite") 354 samples = Node("Samples", parent=testsuite) 355 tests = Node("Tests", parent=testsuite) 356 357 for test in sorted(all_tests): 358 if test.startswith("sample."): 359 sec = test.split(".") 360 area = find(samples, lambda node: node.name == sec[1] and node.parent == samples) 361 if not area: 362 area = Node(sec[1], parent=samples) 363 364 Node(test, parent=area) 365 else: 366 sec = test.split(".") 367 area = find(tests, lambda node: node.name == sec[0] and node.parent == tests) 368 if not area: 369 area = Node(sec[0], parent=tests) 370 371 if area and len(sec) > 2: 372 subarea = find(area, lambda node: node.name == sec[1] and node.parent == area) 373 if not subarea: 374 subarea = Node(sec[1], parent=area) 375 Node(test, parent=subarea) 376 377 for pre, _, node in RenderTree(testsuite): 378 print("%s%s" % (pre, node.name)) 379 380 def report_test_list(self): 381 cnt = 0 382 all_tests = self.get_all_tests() 383 384 for test in sorted(all_tests): 385 cnt = cnt + 1 386 print(" - {}".format(test)) 387 388 print("{} total.".format(cnt)) 389 390 def config(self): 391 logger.info("coverage platform: {}".format(self.coverage_platform)) 392 393 # Debug Functions 394 @staticmethod 395 def info(what): 396 sys.stdout.write(what + "\n") 397 sys.stdout.flush() 398 399 400 def add_configurations(self): 401 for board_root in self.env.board_roots: 402 board_root = os.path.abspath(board_root) 403 logger.debug("Reading platform configuration files under %s..." % 404 board_root) 405 406 platform_config = self.test_config.get('platforms', {}) 407 for file in glob.glob(os.path.join(board_root, "*", "*", "*.yaml")): 408 try: 409 platform = Platform() 410 platform.load(file) 411 if platform.name in [p.name for p in self.platforms]: 412 logger.error(f"Duplicate platform {platform.name} in {file}") 413 raise Exception(f"Duplicate platform identifier {platform.name} found") 414 415 if not platform.twister: 416 continue 417 418 self.platforms.append(platform) 419 if not platform_config.get('override_default_platforms', False): 420 if platform.default: 421 self.default_platforms.append(platform.name) 422 else: 423 if platform.name in platform_config.get('default_platforms', []): 424 logger.debug(f"adding {platform.name} to default platforms") 425 self.default_platforms.append(platform.name) 426 427 # support board@revision 428 # if there is already an existed <board>_<revision>.yaml, then use it to 429 # load platform directly, otherwise, iterate the directory to 430 # get all valid board revision based on each <board>_<revision>.conf. 431 if '@' not in platform.name: 432 tmp_dir = os.listdir(os.path.dirname(file)) 433 for item in tmp_dir: 434 # Need to make sure the revision matches 435 # the permitted patterns as described in 436 # cmake/modules/extensions.cmake. 437 revision_patterns = ["[A-Z]", 438 "[0-9]+", 439 "(0|[1-9][0-9]*)(_[0-9]+)*(_[0-9]+)*"] 440 441 for pattern in revision_patterns: 442 result = re.match(f"{platform.name}_(?P<revision>{pattern})\\.conf", item) 443 if result: 444 revision = result.group("revision") 445 yaml_file = f"{platform.name}_{revision}.yaml" 446 if yaml_file not in tmp_dir: 447 platform_revision = copy.deepcopy(platform) 448 revision = revision.replace("_", ".") 449 platform_revision.name = f"{platform.name}@{revision}" 450 platform_revision.default = False 451 self.platforms.append(platform_revision) 452 453 break 454 455 456 except RuntimeError as e: 457 logger.error("E: %s: can't load: %s" % (file, e)) 458 self.load_errors += 1 459 460 self.platform_names = [p.name for p in self.platforms] 461 462 def get_all_tests(self): 463 testcases = [] 464 for _, ts in self.testsuites.items(): 465 for case in ts.testcases: 466 testcases.append(case.name) 467 468 return testcases 469 470 def add_testsuites(self, testsuite_filter=[]): 471 for root in self.env.test_roots: 472 root = os.path.abspath(root) 473 474 logger.debug("Reading test case configuration files under %s..." % root) 475 476 for dirpath, _, filenames in os.walk(root, topdown=True): 477 if self.SAMPLE_FILENAME in filenames: 478 filename = self.SAMPLE_FILENAME 479 elif self.TESTSUITE_FILENAME in filenames: 480 filename = self.TESTSUITE_FILENAME 481 else: 482 continue 483 484 logger.debug("Found possible testsuite in " + dirpath) 485 486 suite_yaml_path = os.path.join(dirpath, filename) 487 suite_path = os.path.dirname(suite_yaml_path) 488 489 for alt_config_root in self.env.alt_config_root: 490 alt_config = os.path.join(os.path.abspath(alt_config_root), 491 os.path.relpath(suite_path, root), 492 filename) 493 if os.path.exists(alt_config): 494 logger.info("Using alternative configuration from %s" % 495 os.path.normpath(alt_config)) 496 suite_yaml_path = alt_config 497 break 498 499 try: 500 parsed_data = TwisterConfigParser(suite_yaml_path, self.suite_schema) 501 parsed_data.load() 502 subcases, ztest_suite_names = scan_testsuite_path(suite_path) 503 504 for name in parsed_data.scenarios.keys(): 505 suite_dict = parsed_data.get_scenario(name) 506 suite = TestSuite(root, suite_path, name, data=suite_dict, detailed_test_id=self.options.detailed_test_id) 507 suite.add_subcases(suite_dict, subcases, ztest_suite_names) 508 if testsuite_filter: 509 scenario = os.path.basename(suite.name) 510 if suite.name and (suite.name in testsuite_filter or scenario in testsuite_filter): 511 self.testsuites[suite.name] = suite 512 else: 513 self.testsuites[suite.name] = suite 514 515 except Exception as e: 516 logger.error(f"{suite_path}: can't load (skipping): {e!r}") 517 self.load_errors += 1 518 return len(self.testsuites) 519 520 def __str__(self): 521 return self.name 522 523 def get_platform(self, name): 524 selected_platform = None 525 for platform in self.platforms: 526 if platform.name == name: 527 selected_platform = platform 528 break 529 return selected_platform 530 531 def load_from_file(self, file, filter_platform=[]): 532 with open(file, "r") as json_test_plan: 533 jtp = json.load(json_test_plan) 534 instance_list = [] 535 for ts in jtp.get("testsuites", []): 536 logger.debug(f"loading {ts['name']}...") 537 testsuite = ts["name"] 538 539 platform = self.get_platform(ts["platform"]) 540 if filter_platform and platform.name not in filter_platform: 541 continue 542 instance = TestInstance(self.testsuites[testsuite], platform, self.env.outdir) 543 if ts.get("run_id"): 544 instance.run_id = ts.get("run_id") 545 546 if self.options.device_testing: 547 tfilter = 'runnable' 548 else: 549 tfilter = 'buildable' 550 instance.run = instance.check_runnable( 551 self.options.enable_slow, 552 tfilter, 553 self.options.fixture, 554 self.hwm 555 ) 556 557 instance.metrics['handler_time'] = ts.get('execution_time', 0) 558 instance.metrics['used_ram'] = ts.get("used_ram", 0) 559 instance.metrics['used_rom'] = ts.get("used_rom",0) 560 instance.metrics['available_ram'] = ts.get('available_ram', 0) 561 instance.metrics['available_rom'] = ts.get('available_rom', 0) 562 563 status = ts.get('status', None) 564 reason = ts.get("reason", "Unknown") 565 if status in ["error", "failed"]: 566 instance.status = None 567 instance.reason = None 568 instance.retries += 1 569 # test marked as passed (built only) but can run when 570 # --test-only is used. Reset status to capture new results. 571 elif status == 'passed' and instance.run and self.options.test_only: 572 instance.status = None 573 instance.reason = None 574 else: 575 instance.status = status 576 instance.reason = reason 577 578 for tc in ts.get('testcases', []): 579 identifier = tc['identifier'] 580 tc_status = tc.get('status', None) 581 tc_reason = None 582 # we set reason only if status is valid, it might have been 583 # reset above... 584 if instance.status: 585 tc_reason = tc.get('reason') 586 if tc_status: 587 case = instance.set_case_status_by_name(identifier, tc_status, tc_reason) 588 case.duration = tc.get('execution_time', 0) 589 if tc.get('log'): 590 case.output = tc.get('log') 591 592 593 instance.create_overlay(platform, self.options.enable_asan, self.options.enable_ubsan, self.options.enable_coverage, self.options.coverage_platform) 594 instance_list.append(instance) 595 self.add_instances(instance_list) 596 597 def apply_filters(self, **kwargs): 598 599 toolchain = self.env.toolchain 600 platform_filter = self.options.platform 601 vendor_filter = self.options.vendor 602 exclude_platform = self.options.exclude_platform 603 testsuite_filter = self.run_individual_testsuite 604 arch_filter = self.options.arch 605 tag_filter = self.options.tag 606 exclude_tag = self.options.exclude_tag 607 all_filter = self.options.all 608 runnable = (self.options.device_testing or self.options.filter == 'runnable') 609 force_toolchain = self.options.force_toolchain 610 force_platform = self.options.force_platform 611 slow_only = self.options.enable_slow_only 612 ignore_platform_key = self.options.ignore_platform_key 613 emu_filter = self.options.emulation_only 614 615 logger.debug("platform filter: " + str(platform_filter)) 616 logger.debug(" vendor filter: " + str(vendor_filter)) 617 logger.debug(" arch_filter: " + str(arch_filter)) 618 logger.debug(" tag_filter: " + str(tag_filter)) 619 logger.debug(" exclude_tag: " + str(exclude_tag)) 620 621 default_platforms = False 622 vendor_platforms = False 623 emulation_platforms = False 624 625 if all_filter: 626 logger.info("Selecting all possible platforms per test case") 627 # When --all used, any --platform arguments ignored 628 platform_filter = [] 629 elif not platform_filter and not emu_filter and not vendor_filter: 630 logger.info("Selecting default platforms per test case") 631 default_platforms = True 632 elif emu_filter: 633 logger.info("Selecting emulation platforms per test case") 634 emulation_platforms = True 635 elif vendor_filter: 636 vendor_platforms = True 637 638 if platform_filter: 639 self.verify_platforms_existence(platform_filter, f"platform_filter") 640 platforms = list(filter(lambda p: p.name in platform_filter, self.platforms)) 641 elif emu_filter: 642 platforms = list(filter(lambda p: p.simulation != 'na', self.platforms)) 643 elif vendor_filter: 644 platforms = list(filter(lambda p: p.vendor in vendor_filter, self.platforms)) 645 logger.info(f"Selecting platforms by vendors: {','.join(vendor_filter)}") 646 elif arch_filter: 647 platforms = list(filter(lambda p: p.arch in arch_filter, self.platforms)) 648 elif default_platforms: 649 _platforms = list(filter(lambda p: p.name in self.default_platforms, self.platforms)) 650 platforms = [] 651 # default platforms that can't be run are dropped from the list of 652 # the default platforms list. Default platforms should always be 653 # runnable. 654 for p in _platforms: 655 if p.simulation and p.simulation_exec: 656 if shutil.which(p.simulation_exec): 657 platforms.append(p) 658 else: 659 platforms.append(p) 660 else: 661 platforms = self.platforms 662 663 platform_config = self.test_config.get('platforms', {}) 664 logger.info("Building initial testsuite list...") 665 666 keyed_tests = {} 667 668 for ts_name, ts in self.testsuites.items(): 669 if ts.build_on_all and not platform_filter and platform_config.get('increased_platform_scope', True): 670 platform_scope = self.platforms 671 elif ts.integration_platforms: 672 self.verify_platforms_existence( 673 ts.integration_platforms, f"{ts_name} - integration_platforms") 674 integration_platforms = list(filter(lambda item: item.name in ts.integration_platforms, 675 self.platforms)) 676 if self.options.integration: 677 platform_scope = integration_platforms 678 else: 679 # if not in integration mode, still add integration platforms to the list 680 if not platform_filter: 681 platform_scope = platforms + integration_platforms 682 else: 683 platform_scope = platforms 684 else: 685 platform_scope = platforms 686 687 integration = self.options.integration and ts.integration_platforms 688 689 # If there isn't any overlap between the platform_allow list and the platform_scope 690 # we set the scope to the platform_allow list 691 if ts.platform_allow and not platform_filter and not integration and platform_config.get('increased_platform_scope', True): 692 self.verify_platforms_existence( 693 ts.platform_allow, f"{ts_name} - platform_allow") 694 a = set(platform_scope) 695 b = set(filter(lambda item: item.name in ts.platform_allow, self.platforms)) 696 c = a.intersection(b) 697 if not c: 698 platform_scope = list(filter(lambda item: item.name in ts.platform_allow, \ 699 self.platforms)) 700 # list of instances per testsuite, aka configurations. 701 instance_list = [] 702 for plat in platform_scope: 703 instance = TestInstance(ts, plat, self.env.outdir) 704 if runnable: 705 tfilter = 'runnable' 706 else: 707 tfilter = 'buildable' 708 709 instance.run = instance.check_runnable( 710 self.options.enable_slow, 711 tfilter, 712 self.options.fixture, 713 self.hwm 714 ) 715 716 if not force_platform and plat.name in exclude_platform: 717 instance.add_filter("Platform is excluded on command line.", Filters.CMD_LINE) 718 719 if (plat.arch == "unit") != (ts.type == "unit"): 720 # Discard silently 721 continue 722 723 if ts.modules and self.modules: 724 if not set(ts.modules).issubset(set(self.modules)): 725 instance.add_filter(f"one or more required modules not available: {','.join(ts.modules)}", Filters.MODULE) 726 727 if self.options.level: 728 tl = self.get_level(self.options.level) 729 planned_scenarios = tl.scenarios 730 if ts.id not in planned_scenarios and not set(ts.levels).intersection(set(tl.levels)): 731 instance.add_filter("Not part of requested test plan", Filters.TESTPLAN) 732 733 if runnable and not instance.run: 734 instance.add_filter("Not runnable on device", Filters.CMD_LINE) 735 736 if self.options.integration and ts.integration_platforms and plat.name not in ts.integration_platforms: 737 instance.add_filter("Not part of integration platforms", Filters.TESTSUITE) 738 739 if ts.skip: 740 instance.add_filter("Skip filter", Filters.SKIP) 741 742 if tag_filter and not ts.tags.intersection(tag_filter): 743 instance.add_filter("Command line testsuite tag filter", Filters.CMD_LINE) 744 745 if slow_only and not ts.slow: 746 instance.add_filter("Not a slow test", Filters.CMD_LINE) 747 748 if exclude_tag and ts.tags.intersection(exclude_tag): 749 instance.add_filter("Command line testsuite exclude filter", Filters.CMD_LINE) 750 751 if testsuite_filter: 752 normalized_f = [os.path.basename(_ts) for _ts in testsuite_filter] 753 if ts.id not in normalized_f: 754 instance.add_filter("Testsuite name filter", Filters.CMD_LINE) 755 756 if arch_filter and plat.arch not in arch_filter: 757 instance.add_filter("Command line testsuite arch filter", Filters.CMD_LINE) 758 759 if not force_platform: 760 761 if ts.arch_allow and plat.arch not in ts.arch_allow: 762 instance.add_filter("Not in test case arch allow list", Filters.TESTSUITE) 763 764 if ts.arch_exclude and plat.arch in ts.arch_exclude: 765 instance.add_filter("In test case arch exclude", Filters.TESTSUITE) 766 767 if ts.platform_exclude and plat.name in ts.platform_exclude: 768 instance.add_filter("In test case platform exclude", Filters.TESTSUITE) 769 770 if ts.toolchain_exclude and toolchain in ts.toolchain_exclude: 771 instance.add_filter("In test case toolchain exclude", Filters.TOOLCHAIN) 772 773 if platform_filter and plat.name not in platform_filter: 774 instance.add_filter("Command line platform filter", Filters.CMD_LINE) 775 776 if ts.platform_allow \ 777 and plat.name not in ts.platform_allow \ 778 and not (platform_filter and force_platform): 779 instance.add_filter("Not in testsuite platform allow list", Filters.TESTSUITE) 780 781 if ts.platform_type and plat.type not in ts.platform_type: 782 instance.add_filter("Not in testsuite platform type list", Filters.TESTSUITE) 783 784 if ts.toolchain_allow and toolchain not in ts.toolchain_allow: 785 instance.add_filter("Not in testsuite toolchain allow list", Filters.TOOLCHAIN) 786 787 if not plat.env_satisfied: 788 instance.add_filter("Environment ({}) not satisfied".format(", ".join(plat.env)), Filters.PLATFORM) 789 790 if not force_toolchain \ 791 and toolchain and (toolchain not in plat.supported_toolchains) \ 792 and "host" not in plat.supported_toolchains \ 793 and ts.type != 'unit': 794 instance.add_filter("Not supported by the toolchain", Filters.PLATFORM) 795 796 if plat.ram < ts.min_ram: 797 instance.add_filter("Not enough RAM", Filters.PLATFORM) 798 799 if ts.harness: 800 if ts.harness == 'robot' and plat.simulation != 'renode': 801 instance.add_filter("No robot support for the selected platform", Filters.SKIP) 802 803 if ts.depends_on: 804 dep_intersection = ts.depends_on.intersection(set(plat.supported)) 805 if dep_intersection != set(ts.depends_on): 806 instance.add_filter("No hardware support", Filters.PLATFORM) 807 808 if plat.flash < ts.min_flash: 809 instance.add_filter("Not enough FLASH", Filters.PLATFORM) 810 811 if set(plat.ignore_tags) & ts.tags: 812 instance.add_filter("Excluded tags per platform (exclude_tags)", Filters.PLATFORM) 813 814 if plat.only_tags and not set(plat.only_tags) & ts.tags: 815 instance.add_filter("Excluded tags per platform (only_tags)", Filters.PLATFORM) 816 817 if ts.required_snippets: 818 missing_snippet = False 819 snippet_args = {"snippets": ts.required_snippets} 820 found_snippets = snippets.find_snippets_in_roots(snippet_args, [*self.env.snippet_roots, Path(ts.source_dir)]) 821 822 # Search and check that all required snippet files are found 823 for this_snippet in snippet_args['snippets']: 824 if this_snippet not in found_snippets: 825 logger.error(f"Can't find snippet '%s' for test '%s'", this_snippet, ts.name) 826 instance.status = "error" 827 instance.reason = f"Snippet {this_snippet} not found" 828 missing_snippet = True 829 break 830 831 if not missing_snippet: 832 # Look for required snippets and check that they are applicable for these 833 # platforms/boards 834 for this_snippet in snippet_args['snippets']: 835 matched_snippet_board = False 836 837 # If the "appends" key is present with at least one entry then this 838 # snippet applies to all boards and further platform-specific checks 839 # are not required 840 if found_snippets[this_snippet].appends: 841 continue 842 843 for this_board in found_snippets[this_snippet].board2appends: 844 if this_board.startswith('/'): 845 match = re.search(this_board[1:-1], plat.name) 846 if match is not None: 847 matched_snippet_board = True 848 break 849 elif this_board == plat.name: 850 matched_snippet_board = True 851 break 852 853 if matched_snippet_board is False: 854 instance.add_filter("Snippet not supported", Filters.PLATFORM) 855 break 856 857 # handle quarantined tests 858 if self.quarantine: 859 matched_quarantine = self.quarantine.get_matched_quarantine( 860 instance.testsuite.id, plat.name, plat.arch, plat.simulation 861 ) 862 if matched_quarantine and not self.options.quarantine_verify: 863 instance.add_filter("Quarantine: " + matched_quarantine, Filters.QUARANTINE) 864 if not matched_quarantine and self.options.quarantine_verify: 865 instance.add_filter("Not under quarantine", Filters.QUARANTINE) 866 867 868 # platform_key is a list of unique platform attributes that form a unique key a test 869 # will match against to determine if it should be scheduled to run. A key containing a 870 # field name that the platform does not have will filter the platform. 871 # 872 # A simple example is keying on arch and simulation 873 # to run a test once per unique (arch, simulation) platform. 874 if not ignore_platform_key and hasattr(ts, 'platform_key') and len(ts.platform_key) > 0: 875 key_fields = sorted(set(ts.platform_key)) 876 keys = [getattr(plat, key_field) for key_field in key_fields] 877 for key in keys: 878 if key is None or key == 'na': 879 instance.add_filter( 880 f"Excluded platform missing key fields demanded by test {key_fields}", 881 Filters.PLATFORM 882 ) 883 break 884 else: 885 test_keys = copy.deepcopy(keys) 886 test_keys.append(ts.name) 887 test_keys = tuple(test_keys) 888 keyed_test = keyed_tests.get(test_keys) 889 if keyed_test is not None: 890 plat_key = {key_field: getattr(keyed_test['plat'], key_field) for key_field in key_fields} 891 instance.add_filter(f"Already covered for key {tuple(key)} by platform {keyed_test['plat'].name} having key {plat_key}", Filters.PLATFORM_KEY) 892 else: 893 # do not add a platform to keyed tests if previously filtered 894 if not instance.filters: 895 keyed_tests[test_keys] = {'plat': plat, 'ts': ts} 896 else: 897 instance.add_filter(f"Excluded platform missing key fields demanded by test {key_fields}", Filters.PLATFORM) 898 899 # if nothing stopped us until now, it means this configuration 900 # needs to be added. 901 instance_list.append(instance) 902 903 # no configurations, so jump to next testsuite 904 if not instance_list: 905 continue 906 907 # if twister was launched with no platform options at all, we 908 # take all default platforms 909 if default_platforms and not ts.build_on_all and not integration: 910 if ts.platform_allow: 911 a = set(self.default_platforms) 912 b = set(ts.platform_allow) 913 c = a.intersection(b) 914 if c: 915 aa = list(filter(lambda ts: ts.platform.name in c, instance_list)) 916 self.add_instances(aa) 917 else: 918 self.add_instances(instance_list) 919 else: 920 # add integration platforms to the list of default 921 # platforms, even if we are not in integration mode 922 _platforms = self.default_platforms + ts.integration_platforms 923 instances = list(filter(lambda ts: ts.platform.name in _platforms, instance_list)) 924 self.add_instances(instances) 925 elif integration: 926 instances = list(filter(lambda item: item.platform.name in ts.integration_platforms, instance_list)) 927 self.add_instances(instances) 928 929 elif emulation_platforms: 930 self.add_instances(instance_list) 931 for instance in list(filter(lambda inst: not inst.platform.simulation != 'na', instance_list)): 932 instance.add_filter("Not an emulated platform", Filters.CMD_LINE) 933 elif vendor_platforms: 934 self.add_instances(instance_list) 935 for instance in list(filter(lambda inst: not inst.platform.vendor in vendor_filter, instance_list)): 936 instance.add_filter("Not a selected vendor platform", Filters.CMD_LINE) 937 else: 938 self.add_instances(instance_list) 939 940 for _, case in self.instances.items(): 941 case.create_overlay(case.platform, self.options.enable_asan, self.options.enable_ubsan, self.options.enable_coverage, self.options.coverage_platform) 942 943 self.selected_platforms = set(p.platform.name for p in self.instances.values()) 944 945 filtered_instances = list(filter(lambda item: item.status == "filtered", self.instances.values())) 946 for filtered_instance in filtered_instances: 947 change_skip_to_error_if_integration(self.options, filtered_instance) 948 949 filtered_instance.add_missing_case_status(filtered_instance.status) 950 951 self.filtered_platforms = set(p.platform.name for p in self.instances.values() 952 if p.status != "skipped" ) 953 954 def add_instances(self, instance_list): 955 for instance in instance_list: 956 self.instances[instance.name] = instance 957 958 959 def get_testsuite(self, identifier): 960 results = [] 961 for _, ts in self.testsuites.items(): 962 for case in ts.testcases: 963 if case.name == identifier: 964 results.append(ts) 965 return results 966 967 def verify_platforms_existence(self, platform_names_to_verify, log_info=""): 968 """ 969 Verify if platform name (passed by --platform option, or in yaml file 970 as platform_allow or integration_platforms options) is correct. If not - 971 log and raise error. 972 """ 973 for platform in platform_names_to_verify: 974 if platform in self.platform_names: 975 continue 976 else: 977 logger.error(f"{log_info} - unrecognized platform - {platform}") 978 sys.exit(2) 979 980 def create_build_dir_links(self): 981 """ 982 Iterate through all no-skipped instances in suite and create links 983 for each one build directories. Those links will be passed in the next 984 steps to the CMake command. 985 """ 986 987 links_dir_name = "twister_links" # folder for all links 988 links_dir_path = os.path.join(self.env.outdir, links_dir_name) 989 if not os.path.exists(links_dir_path): 990 os.mkdir(links_dir_path) 991 992 for instance in self.instances.values(): 993 if instance.status != "skipped": 994 self._create_build_dir_link(links_dir_path, instance) 995 996 def _create_build_dir_link(self, links_dir_path, instance): 997 """ 998 Create build directory with original "long" path. Next take shorter 999 path and link them with original path - create link. At the end 1000 replace build_dir to created link. This link will be passed to CMake 1001 command. This action helps to limit path length which can be 1002 significant during building by CMake on Windows OS. 1003 """ 1004 1005 os.makedirs(instance.build_dir, exist_ok=True) 1006 1007 link_name = f"test_{self.link_dir_counter}" 1008 link_path = os.path.join(links_dir_path, link_name) 1009 1010 if os.name == "nt": # if OS is Windows 1011 command = ["mklink", "/J", f"{link_path}", os.path.normpath(instance.build_dir)] 1012 subprocess.call(command, shell=True) 1013 else: # for Linux and MAC OS 1014 os.symlink(instance.build_dir, link_path) 1015 1016 # Here original build directory is replaced with symbolic link. It will 1017 # be passed to CMake command 1018 instance.build_dir = link_path 1019 1020 self.link_dir_counter += 1 1021 1022 1023def change_skip_to_error_if_integration(options, instance): 1024 ''' All skips on integration_platforms are treated as errors.''' 1025 if instance.platform.name in instance.testsuite.integration_platforms: 1026 # Do not treat this as error if filter type is among ignore_filters 1027 filters = {t['type'] for t in instance.filters} 1028 ignore_filters ={Filters.CMD_LINE, Filters.SKIP, Filters.PLATFORM_KEY, 1029 Filters.TOOLCHAIN, Filters.MODULE, Filters.TESTPLAN, 1030 Filters.QUARANTINE} 1031 if filters.intersection(ignore_filters): 1032 return 1033 instance.status = "error" 1034 instance.reason += " but is one of the integration platforms" 1035