1#!/usr/bin/env python3 2# vim: set syntax=python ts=4 : 3# 4# Copyright (c) 2018-2024 Intel Corporation 5# Copyright (c) 2024 Arm Limited (or its affiliates). All rights reserved. 6# 7# SPDX-License-Identifier: Apache-2.0 8import collections 9import copy 10import glob 11import json 12import logging 13import os 14import random 15import re 16import subprocess 17import sys 18from argparse import Namespace 19from collections import OrderedDict 20from itertools import islice 21from pathlib import Path 22 23import snippets 24 25try: 26 from anytree import Node, RenderTree, find 27except ImportError: 28 print("Install the anytree module to use the --test-tree option") 29 30import list_boards 31import scl 32from twisterlib.config_parser import TwisterConfigParser 33from twisterlib.error import TwisterRuntimeError 34from twisterlib.platform import Platform 35from twisterlib.quarantine import Quarantine 36from twisterlib.statuses import TwisterStatus 37from twisterlib.testinstance import TestInstance 38from twisterlib.testsuite import TestSuite, scan_testsuite_path 39from zephyr_module import parse_modules 40 41logger = logging.getLogger('twister') 42logger.setLevel(logging.DEBUG) 43 44ZEPHYR_BASE = os.getenv("ZEPHYR_BASE") 45if not ZEPHYR_BASE: 46 sys.exit("$ZEPHYR_BASE environment variable undefined") 47 48# This is needed to load edt.pickle files. 49sys.path.insert(0, os.path.join(ZEPHYR_BASE, "scripts", "dts", 50 "python-devicetree", "src")) 51from devicetree import edtlib # pylint: disable=unused-import 52 53sys.path.insert(0, os.path.join(ZEPHYR_BASE, "scripts/")) 54 55class Filters: 56 # platform keys 57 PLATFORM_KEY = 'platform key filter' 58 # filters provided on command line by the user/tester 59 CMD_LINE = 'command line filter' 60 # filters in the testsuite yaml definition 61 TESTSUITE = 'testsuite filter' 62 # filters in the testplan yaml definition 63 TESTPLAN = 'testplan filter' 64 # filters related to platform definition 65 PLATFORM = 'Platform related filter' 66 # in case a test suite was quarantined. 67 QUARANTINE = 'Quarantine filter' 68 # in case a test suite is skipped intentionally . 69 SKIP = 'Skip filter' 70 # in case of incompatibility between selected and allowed toolchains. 71 TOOLCHAIN = 'Toolchain filter' 72 # in case where an optional module is not available 73 MODULE = 'Module filter' 74 # in case of missing env. variable required for a platform 75 ENVIRONMENT = 'Environment filter' 76 77 78class TestLevel: 79 name = None 80 levels = [] 81 scenarios = [] 82 83 84class TestPlan: 85 __test__ = False # for pytest to skip this class when collects tests 86 config_re = re.compile('(CONFIG_[A-Za-z0-9_]+)[=]\"?([^\"]*)\"?$') 87 dt_re = re.compile('([A-Za-z0-9_]+)[=]\"?([^\"]*)\"?$') 88 89 suite_schema = scl.yaml_load( 90 os.path.join(ZEPHYR_BASE, 91 "scripts", "schemas", "twister", "testsuite-schema.yaml")) 92 quarantine_schema = scl.yaml_load( 93 os.path.join(ZEPHYR_BASE, 94 "scripts", "schemas", "twister", "quarantine-schema.yaml")) 95 96 tc_schema_path = os.path.join( 97 ZEPHYR_BASE, 98 "scripts", 99 "schemas", 100 "twister", 101 "test-config-schema.yaml" 102 ) 103 104 SAMPLE_FILENAME = 'sample.yaml' 105 TESTSUITE_FILENAME = 'testcase.yaml' 106 107 def __init__(self, env: Namespace): 108 109 self.options = env.options 110 self.env = env 111 112 # Keep track of which test cases we've filtered out and why 113 self.testsuites = {} 114 self.quarantine = None 115 self.platforms = [] 116 self.platform_names = [] 117 self.selected_platforms = [] 118 self.default_platforms = [] 119 self.load_errors = 0 120 self.instances = dict() 121 self.instance_fail_count = 0 122 self.warnings = 0 123 124 self.scenarios = [] 125 126 self.hwm = env.hwm 127 # used during creating shorter build paths 128 self.link_dir_counter = 0 129 self.modules = [] 130 131 self.run_individual_testsuite = [] 132 self.levels = [] 133 self.test_config = {} 134 135 self.name = "unnamed" 136 137 def get_level(self, name): 138 level = next((lvl for lvl in self.levels if lvl.name == name), None) 139 return level 140 141 def parse_configuration(self, config_file): 142 if os.path.exists(config_file): 143 tc_schema = scl.yaml_load(self.tc_schema_path) 144 self.test_config = scl.yaml_load_verify(config_file, tc_schema) 145 else: 146 raise TwisterRuntimeError(f"File {config_file} not found.") 147 148 levels = self.test_config.get('levels', []) 149 150 # Do first pass on levels to get initial data. 151 for level in levels: 152 adds = [] 153 for s in level.get('adds', []): 154 r = re.compile(s) 155 adds.extend(list(filter(r.fullmatch, self.scenarios))) 156 157 tl = TestLevel() 158 tl.name = level['name'] 159 tl.scenarios = adds 160 tl.levels = level.get('inherits', []) 161 self.levels.append(tl) 162 163 # Go over levels again to resolve inheritance. 164 for level in levels: 165 inherit = level.get('inherits', []) 166 _level = self.get_level(level['name']) 167 if inherit: 168 for inherted_level in inherit: 169 _inherited = self.get_level(inherted_level) 170 assert _inherited, "Unknown inherited level {inherted_level}" 171 _inherited_scenarios = _inherited.scenarios 172 level_scenarios = _level.scenarios if _level else [] 173 level_scenarios.extend(_inherited_scenarios) 174 175 def find_subtests(self): 176 sub_tests = self.options.sub_test 177 if sub_tests: 178 for subtest in sub_tests: 179 _subtests = self.get_testcase(subtest) 180 for _subtest in _subtests: 181 self.run_individual_testsuite.append(_subtest.name) 182 183 if self.run_individual_testsuite: 184 logger.info("Running the following tests:") 185 for test in self.run_individual_testsuite: 186 print(f" - {test}") 187 else: 188 raise TwisterRuntimeError("Tests not found") 189 190 def discover(self): 191 self.handle_modules() 192 if self.options.test: 193 self.run_individual_testsuite = self.options.test 194 195 self.add_configurations() 196 num = self.add_testsuites(testsuite_filter=self.run_individual_testsuite) 197 if num == 0: 198 raise TwisterRuntimeError("No testsuites found at the specified location...") 199 if self.load_errors: 200 raise TwisterRuntimeError( 201 f"Found {self.load_errors} errors loading {num} test configurations." 202 ) 203 204 self.find_subtests() 205 # get list of scenarios we have parsed into one list 206 for _, ts in self.testsuites.items(): 207 self.scenarios.append(ts.id) 208 209 self.report_duplicates() 210 self.parse_configuration(config_file=self.env.test_config) 211 212 # handle quarantine 213 ql = self.options.quarantine_list 214 qv = self.options.quarantine_verify 215 if qv and not ql: 216 logger.error("No quarantine list given to be verified") 217 raise TwisterRuntimeError("No quarantine list given to be verified") 218 if ql: 219 for quarantine_file in ql: 220 try: 221 # validate quarantine yaml file against the provided schema 222 scl.yaml_load_verify(quarantine_file, self.quarantine_schema) 223 except scl.EmptyYamlFileException: 224 logger.debug(f'Quarantine file {quarantine_file} is empty') 225 self.quarantine = Quarantine(ql) 226 227 def load(self): 228 229 if self.options.report_suffix: 230 last_run = os.path.join( 231 self.options.outdir, 232 f"twister_{self.options.report_suffix}.json" 233 ) 234 else: 235 last_run = os.path.join(self.options.outdir, "twister.json") 236 237 if self.options.only_failed or self.options.report_summary is not None: 238 self.load_from_file(last_run) 239 self.selected_platforms = set(p.platform.name for p in self.instances.values()) 240 elif self.options.load_tests: 241 self.load_from_file(self.options.load_tests) 242 self.selected_platforms = set(p.platform.name for p in self.instances.values()) 243 elif self.options.test_only: 244 # Get list of connected hardware and filter tests to only be run on connected hardware. 245 # If the platform does not exist in the hardware map or was not specified by --platform, 246 # just skip it. 247 248 connected_list = [] 249 excluded_list = [] 250 for _cp in self.options.platform: 251 if _cp in self.platform_names: 252 connected_list.append(self.get_platform(_cp).name) 253 254 if self.options.exclude_platform: 255 for _p in self.options.exclude_platform: 256 if _p in self.platform_names: 257 excluded_list.append(self.get_platform(_p).name) 258 for excluded in excluded_list: 259 if excluded in connected_list: 260 connected_list.remove(excluded) 261 262 self.load_from_file(last_run, filter_platform=connected_list) 263 self.selected_platforms = set(p.platform.name for p in self.instances.values()) 264 else: 265 self.apply_filters() 266 267 if self.options.subset: 268 s = self.options.subset 269 try: 270 subset, sets = (int(x) for x in s.split("/")) 271 except ValueError as err: 272 raise TwisterRuntimeError("Bad subset value.") from err 273 274 if subset > sets: 275 raise TwisterRuntimeError("subset should not exceed the total number of sets") 276 277 if int(subset) > 0 and int(sets) >= int(subset): 278 logger.info(f"Running only a subset: {subset}/{sets}") 279 else: 280 raise TwisterRuntimeError( 281 f"You have provided a wrong subset value: {self.options.subset}." 282 ) 283 284 self.generate_subset(subset, int(sets)) 285 286 def generate_subset(self, subset, sets): 287 # Test instances are sorted depending on the context. For CI runs 288 # the execution order is: "plat1-testA, plat1-testB, ..., 289 # plat1-testZ, plat2-testA, ...". For hardware tests 290 # (device_testing), were multiple physical platforms can run the tests 291 # in parallel, it is more efficient to run in the order: 292 # "plat1-testA, plat2-testA, ..., plat1-testB, plat2-testB, ..." 293 if self.options.device_testing: 294 self.instances = OrderedDict(sorted(self.instances.items(), 295 key=lambda x: x[0][x[0].find("/") + 1:])) 296 else: 297 self.instances = OrderedDict(sorted(self.instances.items())) 298 299 if self.options.shuffle_tests: 300 seed_value = int.from_bytes(os.urandom(8), byteorder="big") 301 if self.options.shuffle_tests_seed is not None: 302 seed_value = self.options.shuffle_tests_seed 303 304 logger.info(f"Shuffle tests with seed: {seed_value}") 305 random.seed(seed_value) 306 temp_list = list(self.instances.items()) 307 random.shuffle(temp_list) 308 self.instances = OrderedDict(temp_list) 309 310 # Do calculation based on what is actually going to be run and evaluated 311 # at runtime, ignore the cases we already know going to be skipped. 312 # This fixes an issue where some sets would get majority of skips and 313 # basically run nothing beside filtering. 314 to_run = {k : v for k,v in self.instances.items() if v.status == TwisterStatus.NONE} 315 total = len(to_run) 316 per_set = int(total / sets) 317 num_extra_sets = total - (per_set * sets) 318 319 # Try and be more fair for rounding error with integer division 320 # so the last subset doesn't get overloaded, we add 1 extra to 321 # subsets 1..num_extra_sets. 322 if subset <= num_extra_sets: 323 start = (subset - 1) * (per_set + 1) 324 end = start + per_set + 1 325 else: 326 base = num_extra_sets * (per_set + 1) 327 start = ((subset - num_extra_sets - 1) * per_set) + base 328 end = start + per_set 329 330 sliced_instances = islice(to_run.items(), start, end) 331 skipped = {k : v for k,v in self.instances.items() if v.status == TwisterStatus.SKIP} 332 errors = {k : v for k,v in self.instances.items() if v.status == TwisterStatus.ERROR} 333 self.instances = OrderedDict(sliced_instances) 334 if subset == 1: 335 # add all pre-filtered tests that are skipped or got error status 336 # to the first set to allow for better distribution among all sets. 337 self.instances.update(skipped) 338 self.instances.update(errors) 339 340 341 def handle_modules(self): 342 # get all enabled west projects 343 modules_meta = parse_modules(ZEPHYR_BASE) 344 self.modules = [module.meta.get('name') for module in modules_meta] 345 346 347 def report(self): 348 if self.options.test_tree: 349 if not self.options.detailed_test_id: 350 logger.info("Test tree is always shown with detailed test-id.") 351 self.report_test_tree() 352 return 0 353 elif self.options.list_tests: 354 if not self.options.detailed_test_id: 355 logger.info("Test list is always shown with detailed test-id.") 356 self.report_test_list() 357 return 0 358 elif self.options.list_tags: 359 self.report_tag_list() 360 return 0 361 362 return 1 363 364 def report_duplicates(self): 365 dupes = [item for item, count in collections.Counter(self.scenarios).items() if count > 1] 366 if dupes: 367 msg = "Duplicated test scenarios found:\n" 368 for dupe in dupes: 369 msg += (f"- {dupe} found in:\n") 370 for dc in self.get_testsuite(dupe): 371 msg += (f" - {dc.yamlfile}\n") 372 raise TwisterRuntimeError(msg) 373 else: 374 logger.debug("No duplicates found.") 375 376 def report_tag_list(self): 377 tags = set() 378 for _, tc in self.testsuites.items(): 379 tags = tags.union(tc.tags) 380 381 for t in tags: 382 print(f"- {t}") 383 384 def report_test_tree(self): 385 tests_list = self.get_tests_list() 386 387 testsuite = Node("Testsuite") 388 samples = Node("Samples", parent=testsuite) 389 tests = Node("Tests", parent=testsuite) 390 391 for test in sorted(tests_list): 392 if test.startswith("sample."): 393 sec = test.split(".") 394 area = find( 395 samples, 396 lambda node, sname=sec[1]: node.name == sname and node.parent == samples 397 ) 398 if not area: 399 area = Node(sec[1], parent=samples) 400 401 Node(test, parent=area) 402 else: 403 sec = test.split(".") 404 area = find( 405 tests, 406 lambda node, sname=sec[0]: node.name == sname and node.parent == tests 407 ) 408 if not area: 409 area = Node(sec[0], parent=tests) 410 411 if area and len(sec) > 2: 412 subarea = find( 413 area, lambda node, sname=sec[1], sparent=area: node.name == sname 414 and node.parent == sparent 415 ) 416 if not subarea: 417 subarea = Node(sec[1], parent=area) 418 Node(test, parent=subarea) 419 420 for pre, _, node in RenderTree(testsuite): 421 print(f"{pre}{node.name}") 422 423 def report_test_list(self): 424 tests_list = self.get_tests_list() 425 426 cnt = 0 427 for test in sorted(tests_list): 428 cnt = cnt + 1 429 print(f" - {test}") 430 print(f"{cnt} total.") 431 432 433 # Debug Functions 434 @staticmethod 435 def info(what): 436 sys.stdout.write(what + "\n") 437 sys.stdout.flush() 438 439 def find_twister_data(self, board_data_list, board_aliases): 440 """Find the twister data for a board in the list of board data based on the aliases""" 441 for board_data in board_data_list: 442 if board_data.get('identifier') in board_aliases: 443 return board_data 444 445 def add_configurations(self): 446 # Create a list of board roots as defined by the build system in general 447 # Note, internally in twister a board root includes the `boards` folder 448 # but in Zephyr build system, the board root is without the `boards` in folder path. 449 board_roots = [Path(os.path.dirname(root)) for root in self.env.board_roots] 450 lb_args = Namespace(arch_roots=self.env.arch_roots, soc_roots=self.env.soc_roots, 451 board_roots=board_roots, board=None, board_dir=None) 452 453 known_boards = list_boards.find_v2_boards(lb_args) 454 bdirs = {} 455 platform_config = self.test_config.get('platforms', {}) 456 457 # helper function to initialize and add platforms 458 def init_and_add_platforms(data, board, target, qualifier, aliases): 459 platform = Platform() 460 if not new_config_found: 461 data = self.find_twister_data(bdirs[board.dir], aliases) 462 if not data: 463 return 464 platform.load(board, target, aliases, data) 465 platform.qualifier = qualifier 466 if platform.name in [p.name for p in self.platforms]: 467 logger.error(f"Duplicate platform {platform.name} in {board.dir}") 468 raise Exception(f"Duplicate platform identifier {platform.name} found") 469 if not platform.twister: 470 return 471 self.platforms.append(platform) 472 473 for board in known_boards.values(): 474 new_config_found = False 475 # don't load the same board data twice 476 if not bdirs.get(board.dir): 477 datas = [] 478 for file in glob.glob(os.path.join(board.dir, "*.yaml")): 479 if os.path.basename(file) == "twister.yaml": 480 continue 481 try: 482 scp = TwisterConfigParser(file, Platform.platform_schema) 483 sdata = scp.load() 484 datas.append(sdata) 485 except Exception as e: 486 logger.error(f"Error loading {file}: {e!r}") 487 self.load_errors += 1 488 continue 489 bdirs[board.dir] = datas 490 data = {} 491 if os.path.exists(board.dir / 'twister.yaml'): 492 try: 493 scp = TwisterConfigParser(board.dir / 'twister.yaml', Platform.platform_schema) 494 data = scp.load() 495 except Exception as e: 496 logger.error(f"Error loading {board.dir / 'twister.yaml'}: {e!r}") 497 self.load_errors += 1 498 continue 499 new_config_found = True 500 501 502 503 for qual in list_boards.board_v2_qualifiers(board): 504 505 if board.revisions: 506 for rev in board.revisions: 507 if rev.name: 508 target = f"{board.name}@{rev.name}/{qual}" 509 aliases = [target] 510 if rev.name == board.revision_default: 511 aliases.append(f"{board.name}/{qual}") 512 if '/' not in qual and len(board.socs) == 1: 513 if rev.name == board.revision_default: 514 aliases.append(f"{board.name}") 515 aliases.append(f"{board.name}@{rev.name}") 516 else: 517 target = f"{board.name}/{qual}" 518 aliases = [target] 519 if '/' not in qual and len(board.socs) == 1 \ 520 and rev.name == board.revision_default: 521 aliases.append(f"{board.name}") 522 523 init_and_add_platforms(data, board, target, qual, aliases) 524 else: 525 target = f"{board.name}/{qual}" 526 aliases = [target] 527 if '/' not in qual and len(board.socs) == 1: 528 aliases.append(board.name) 529 init_and_add_platforms(data, board, target, qual, aliases) 530 531 for platform in self.platforms: 532 if not platform_config.get('override_default_platforms', False): 533 if platform.default: 534 self.default_platforms.append(platform.name) 535 #logger.debug(f"adding {platform.name} to default platforms") 536 continue 537 for pp in platform_config.get('default_platforms', []): 538 if pp in platform.aliases: 539 logger.debug(f"adding {platform.name} to default platforms (override mode)") 540 self.default_platforms.append(platform.name) 541 542 self.platform_names = [a for p in self.platforms for a in p.aliases] 543 544 def get_all_tests(self): 545 testcases = [] 546 for _, ts in self.testsuites.items(): 547 for case in ts.testcases: 548 testcases.append(case.name) 549 550 return testcases 551 552 def get_tests_list(self): 553 testcases = [] 554 if tag_filter := self.options.tag: 555 for _, ts in self.testsuites.items(): 556 if ts.tags.intersection(tag_filter): 557 for case in ts.testcases: 558 testcases.append(case.detailed_name) 559 else: 560 for _, ts in self.testsuites.items(): 561 for case in ts.testcases: 562 testcases.append(case.detailed_name) 563 564 if exclude_tag := self.options.exclude_tag: 565 for _, ts in self.testsuites.items(): 566 if ts.tags.intersection(exclude_tag): 567 for case in ts.testcases: 568 if case.detailed_name in testcases: 569 testcases.remove(case.detailed_name) 570 return testcases 571 572 def add_testsuites(self, testsuite_filter=None): 573 if testsuite_filter is None: 574 testsuite_filter = [] 575 for root in self.env.test_roots: 576 root = os.path.abspath(root) 577 578 logger.debug(f"Reading testsuite configuration files under {root}...") 579 580 for dirpath, _, filenames in os.walk(root, topdown=True): 581 if self.SAMPLE_FILENAME in filenames: 582 filename = self.SAMPLE_FILENAME 583 elif self.TESTSUITE_FILENAME in filenames: 584 filename = self.TESTSUITE_FILENAME 585 else: 586 continue 587 588 logger.debug("Found possible testsuite in " + dirpath) 589 590 suite_yaml_path = os.path.join(dirpath, filename) 591 suite_path = os.path.dirname(suite_yaml_path) 592 593 for alt_config_root in self.env.alt_config_root: 594 alt_config = os.path.join(os.path.abspath(alt_config_root), 595 os.path.relpath(suite_path, root), 596 filename) 597 if os.path.exists(alt_config): 598 logger.info( 599 f"Using alternative configuration from {os.path.normpath(alt_config)}" 600 ) 601 suite_yaml_path = alt_config 602 break 603 604 try: 605 parsed_data = TwisterConfigParser(suite_yaml_path, self.suite_schema) 606 parsed_data.load() 607 subcases = None 608 ztest_suite_names = None 609 610 for name in parsed_data.scenarios: 611 suite_dict = parsed_data.get_scenario(name) 612 suite = TestSuite( 613 root, 614 suite_path, 615 name, 616 data=suite_dict, 617 detailed_test_id=self.options.detailed_test_id 618 ) 619 620 # convert to fully qualified names 621 suite.integration_platforms = self.verify_platforms_existence( 622 suite.integration_platforms, 623 f"integration_platforms in {suite.name}") 624 suite.platform_exclude = self.verify_platforms_existence( 625 suite.platform_exclude, 626 f"platform_exclude in {suite.name}") 627 suite.platform_allow = self.verify_platforms_existence( 628 suite.platform_allow, 629 f"platform_allow in {suite.name}") 630 631 if suite.harness in ['ztest', 'test']: 632 if subcases is None: 633 # scan it only once per testsuite 634 subcases, ztest_suite_names = scan_testsuite_path(suite_path) 635 suite.add_subcases(suite_dict, subcases, ztest_suite_names) 636 else: 637 suite.add_subcases(suite_dict) 638 639 if testsuite_filter: 640 scenario = os.path.basename(suite.name) 641 if ( 642 suite.name 643 and (suite.name in testsuite_filter or scenario in testsuite_filter) 644 ): 645 self.testsuites[suite.name] = suite 646 elif suite.name in self.testsuites: 647 msg = ( 648 f"test suite '{suite.name}' in '{suite.yamlfile}' is already added" 649 ) 650 if suite.yamlfile == self.testsuites[suite.name].yamlfile: 651 logger.debug(f"Skip - {msg}") 652 else: 653 msg = ( 654 f"Duplicate {msg} from '{self.testsuites[suite.name].yamlfile}'" 655 ) 656 raise TwisterRuntimeError(msg) 657 else: 658 self.testsuites[suite.name] = suite 659 660 except Exception as e: 661 logger.error(f"{suite_path}: can't load (skipping): {e!r}") 662 self.load_errors += 1 663 return len(self.testsuites) 664 665 def __str__(self): 666 return self.name 667 668 def get_platform(self, name): 669 selected_platform = None 670 for platform in self.platforms: 671 if name in platform.aliases: 672 selected_platform = platform 673 break 674 return selected_platform 675 676 def handle_quarantined_tests(self, instance: TestInstance, plat: Platform): 677 if self.quarantine: 678 simulator = plat.simulator_by_name(self.options) 679 matched_quarantine = self.quarantine.get_matched_quarantine( 680 instance.testsuite.id, 681 plat.name, 682 plat.arch, 683 simulator.name if simulator is not None else 'na' 684 ) 685 if matched_quarantine and not self.options.quarantine_verify: 686 instance.add_filter("Quarantine: " + matched_quarantine, Filters.QUARANTINE) 687 return 688 if not matched_quarantine and self.options.quarantine_verify: 689 instance.add_filter("Not under quarantine", Filters.QUARANTINE) 690 691 def load_from_file(self, file, filter_platform=None): 692 if filter_platform is None: 693 filter_platform = [] 694 try: 695 with open(file) as json_test_plan: 696 jtp = json.load(json_test_plan) 697 instance_list = [] 698 for ts in jtp.get("testsuites", []): 699 logger.debug(f"loading {ts['name']}...") 700 testsuite = ts["name"] 701 702 platform = self.get_platform(ts["platform"]) 703 if filter_platform and platform.name not in filter_platform: 704 continue 705 instance = TestInstance(self.testsuites[testsuite], platform, self.env.outdir) 706 if ts.get("run_id"): 707 instance.run_id = ts.get("run_id") 708 709 instance.run = instance.check_runnable( 710 self.options, 711 self.hwm 712 ) 713 714 if self.options.test_only and not instance.run: 715 continue 716 717 instance.metrics['handler_time'] = ts.get('execution_time', 0) 718 instance.metrics['used_ram'] = ts.get("used_ram", 0) 719 instance.metrics['used_rom'] = ts.get("used_rom",0) 720 instance.metrics['available_ram'] = ts.get('available_ram', 0) 721 instance.metrics['available_rom'] = ts.get('available_rom', 0) 722 723 status = TwisterStatus(ts.get('status')) 724 reason = ts.get("reason", "Unknown") 725 if status in [TwisterStatus.ERROR, TwisterStatus.FAIL]: 726 if self.options.report_summary is not None: 727 instance.status = status 728 instance.reason = reason 729 self.instance_fail_count += 1 730 else: 731 instance.status = TwisterStatus.NONE 732 instance.reason = None 733 instance.retries += 1 734 # test marked as built only can run when --test-only is used. 735 # Reset status to capture new results. 736 elif status == TwisterStatus.NOTRUN and instance.run and self.options.test_only: 737 instance.status = TwisterStatus.NONE 738 instance.reason = None 739 else: 740 instance.status = status 741 instance.reason = reason 742 743 self.handle_quarantined_tests(instance, platform) 744 745 for tc in ts.get('testcases', []): 746 identifier = tc['identifier'] 747 tc_status = TwisterStatus(tc.get('status')) 748 tc_reason = None 749 # we set reason only if status is valid, it might have been 750 # reset above... 751 if instance.status != TwisterStatus.NONE: 752 tc_reason = tc.get('reason') 753 if tc_status != TwisterStatus.NONE: 754 case = instance.set_case_status_by_name( 755 identifier, 756 tc_status, 757 tc_reason 758 ) 759 case.duration = tc.get('execution_time', 0) 760 if tc.get('log'): 761 case.output = tc.get('log') 762 763 instance.create_overlay(platform, 764 self.options.enable_asan, 765 self.options.enable_ubsan, 766 self.options.enable_coverage, 767 self.options.coverage_platform 768 ) 769 instance_list.append(instance) 770 self.add_instances(instance_list) 771 except FileNotFoundError as e: 772 logger.error(f"{e}") 773 return 1 774 775 def check_platform(self, platform, platform_list): 776 return any(p in platform.aliases for p in platform_list) 777 778 def apply_filters(self, **kwargs): 779 780 toolchain = self.env.toolchain 781 platform_filter = self.options.platform 782 vendor_filter = self.options.vendor 783 exclude_platform = self.options.exclude_platform 784 testsuite_filter = self.run_individual_testsuite 785 arch_filter = self.options.arch 786 tag_filter = self.options.tag 787 exclude_tag = self.options.exclude_tag 788 all_filter = self.options.all 789 runnable = (self.options.device_testing or self.options.filter == 'runnable') 790 force_toolchain = self.options.force_toolchain 791 force_platform = self.options.force_platform 792 slow_only = self.options.enable_slow_only 793 ignore_platform_key = self.options.ignore_platform_key 794 emu_filter = self.options.emulation_only 795 796 logger.debug("platform filter: " + str(platform_filter)) 797 logger.debug(" vendor filter: " + str(vendor_filter)) 798 logger.debug(" arch_filter: " + str(arch_filter)) 799 logger.debug(" tag_filter: " + str(tag_filter)) 800 logger.debug(" exclude_tag: " + str(exclude_tag)) 801 802 default_platforms = False 803 vendor_platforms = False 804 emulation_platforms = False 805 806 if all_filter: 807 logger.info("Selecting all possible platforms per testsuite scenario") 808 # When --all used, any --platform arguments ignored 809 platform_filter = [] 810 elif not platform_filter and not emu_filter and not vendor_filter: 811 logger.info("Selecting default platforms per testsuite scenario") 812 default_platforms = True 813 elif emu_filter: 814 logger.info("Selecting emulation platforms per testsuite scenraio") 815 emulation_platforms = True 816 elif vendor_filter: 817 vendor_platforms = True 818 819 _platforms = [] 820 if platform_filter: 821 logger.debug(f"Checking platform filter: {platform_filter}") 822 # find in aliases and rename 823 platform_filter = self.verify_platforms_existence(platform_filter, "platform_filter") 824 platforms = list(filter(lambda p: p.name in platform_filter, self.platforms)) 825 elif emu_filter: 826 platforms = list( 827 filter(lambda p: bool(p.simulator_by_name(self.options.sim_name)), self.platforms) 828 ) 829 elif vendor_filter: 830 platforms = list(filter(lambda p: p.vendor in vendor_filter, self.platforms)) 831 logger.info(f"Selecting platforms by vendors: {','.join(vendor_filter)}") 832 elif arch_filter: 833 platforms = list(filter(lambda p: p.arch in arch_filter, self.platforms)) 834 elif default_platforms: 835 _platforms = list(filter(lambda p: p.name in self.default_platforms, self.platforms)) 836 platforms = [] 837 # default platforms that can't be run are dropped from the list of 838 # the default platforms list. Default platforms should always be 839 # runnable. 840 for p in _platforms: 841 sim = p.simulator_by_name(self.options.sim_name) 842 if (not sim) or sim.is_runnable(): 843 platforms.append(p) 844 else: 845 platforms = self.platforms 846 847 platform_config = self.test_config.get('platforms', {}) 848 logger.info("Building initial testsuite list...") 849 850 keyed_tests = {} 851 852 for _, ts in self.testsuites.items(): 853 if ( 854 ts.build_on_all 855 and not platform_filter 856 and platform_config.get('increased_platform_scope', True) 857 ): 858 platform_scope = self.platforms 859 elif ts.integration_platforms: 860 integration_platforms = list( 861 filter(lambda item: item.name in ts.integration_platforms, self.platforms) 862 ) 863 if self.options.integration: 864 platform_scope = integration_platforms 865 else: 866 # if not in integration mode, still add integration platforms to the list 867 if not platform_filter: 868 platform_scope = platforms + integration_platforms 869 else: 870 platform_scope = platforms 871 else: 872 platform_scope = platforms 873 874 integration = self.options.integration and ts.integration_platforms 875 876 # If there isn't any overlap between the platform_allow list and the platform_scope 877 # we set the scope to the platform_allow list 878 if ( 879 ts.platform_allow 880 and not platform_filter 881 and not integration 882 and platform_config.get('increased_platform_scope', True) 883 ): 884 a = set(platform_scope) 885 b = set(filter(lambda item: item.name in ts.platform_allow, self.platforms)) 886 c = a.intersection(b) 887 if not c: 888 platform_scope = list( 889 filter(lambda item: item.name in ts.platform_allow, self.platforms) 890 ) 891 # list of instances per testsuite, aka configurations. 892 instance_list = [] 893 for plat in platform_scope: 894 instance = TestInstance(ts, plat, self.env.outdir) 895 instance.run = instance.check_runnable( 896 self.options, 897 self.hwm 898 ) 899 900 if not force_platform and self.check_platform(plat,exclude_platform): 901 instance.add_filter("Platform is excluded on command line.", Filters.CMD_LINE) 902 903 if (plat.arch == "unit") != (ts.type == "unit"): 904 # Discard silently 905 continue 906 907 if ts.modules and self.modules and not set(ts.modules).issubset(set(self.modules)): 908 instance.add_filter( 909 f"one or more required modules not available: {','.join(ts.modules)}", 910 Filters.MODULE 911 ) 912 913 if self.options.level: 914 tl = self.get_level(self.options.level) 915 if tl is None: 916 instance.add_filter( 917 f"Unknown test level '{self.options.level}'", 918 Filters.TESTPLAN 919 ) 920 else: 921 planned_scenarios = tl.scenarios 922 if ( 923 ts.id not in planned_scenarios 924 and not set(ts.levels).intersection(set(tl.levels)) 925 ): 926 instance.add_filter("Not part of requested test plan", Filters.TESTPLAN) 927 928 if runnable and not instance.run: 929 instance.add_filter("Not runnable on device", Filters.CMD_LINE) 930 931 if ( 932 self.options.integration 933 and ts.integration_platforms 934 and plat.name not in ts.integration_platforms 935 ): 936 instance.add_filter("Not part of integration platforms", Filters.TESTSUITE) 937 938 if ts.skip: 939 instance.add_filter("Skip filter", Filters.SKIP) 940 941 if tag_filter and not ts.tags.intersection(tag_filter): 942 instance.add_filter("Command line testsuite tag filter", Filters.CMD_LINE) 943 944 if slow_only and not ts.slow: 945 instance.add_filter("Not a slow test", Filters.CMD_LINE) 946 947 if exclude_tag and ts.tags.intersection(exclude_tag): 948 instance.add_filter("Command line testsuite exclude filter", Filters.CMD_LINE) 949 950 if testsuite_filter: 951 normalized_f = [os.path.basename(_ts) for _ts in testsuite_filter] 952 if ts.id not in normalized_f: 953 instance.add_filter("Testsuite name filter", Filters.CMD_LINE) 954 955 if arch_filter and plat.arch not in arch_filter: 956 instance.add_filter("Command line testsuite arch filter", Filters.CMD_LINE) 957 958 if not force_platform: 959 960 if ts.arch_allow and plat.arch not in ts.arch_allow: 961 instance.add_filter("Not in testsuite arch allow list", Filters.TESTSUITE) 962 963 if ts.arch_exclude and plat.arch in ts.arch_exclude: 964 instance.add_filter("In testsuite arch exclude", Filters.TESTSUITE) 965 966 if ts.vendor_allow and plat.vendor not in ts.vendor_allow: 967 instance.add_filter( 968 "Not in testsuite vendor allow list", 969 Filters.TESTSUITE 970 ) 971 972 if ts.vendor_exclude and plat.vendor in ts.vendor_exclude: 973 instance.add_filter("In testsuite vendor exclude", Filters.TESTSUITE) 974 975 if ts.platform_exclude and plat.name in ts.platform_exclude: 976 instance.add_filter("In testsuite platform exclude", Filters.TESTSUITE) 977 978 if ts.toolchain_exclude and toolchain in ts.toolchain_exclude: 979 instance.add_filter("In testsuite toolchain exclude", Filters.TOOLCHAIN) 980 981 if platform_filter and plat.name not in platform_filter: 982 instance.add_filter("Command line platform filter", Filters.CMD_LINE) 983 984 if ts.platform_allow \ 985 and plat.name not in ts.platform_allow \ 986 and not (platform_filter and force_platform): 987 instance.add_filter("Not in testsuite platform allow list", Filters.TESTSUITE) 988 989 if ts.platform_type and plat.type not in ts.platform_type: 990 instance.add_filter("Not in testsuite platform type list", Filters.TESTSUITE) 991 992 if ts.toolchain_allow and toolchain not in ts.toolchain_allow: 993 instance.add_filter("Not in testsuite toolchain allow list", Filters.TOOLCHAIN) 994 995 if not plat.env_satisfied: 996 instance.add_filter( 997 "Environment ({}) not satisfied".format(", ".join(plat.env)), 998 Filters.ENVIRONMENT 999 ) 1000 1001 if not force_toolchain \ 1002 and toolchain and (toolchain not in plat.supported_toolchains) \ 1003 and "host" not in plat.supported_toolchains \ 1004 and ts.type != 'unit': 1005 instance.add_filter( 1006 f"Not supported by the toolchain: {toolchain}", 1007 Filters.PLATFORM 1008 ) 1009 1010 if plat.ram < ts.min_ram: 1011 instance.add_filter("Not enough RAM", Filters.PLATFORM) 1012 1013 if ts.harness: 1014 sim = plat.simulator_by_name(self.options.sim_name) 1015 if ts.harness == 'robot' and not (sim and sim.name == 'renode'): 1016 instance.add_filter( 1017 "No robot support for the selected platform", 1018 Filters.SKIP 1019 ) 1020 1021 if ts.depends_on: 1022 dep_intersection = ts.depends_on.intersection(set(plat.supported)) 1023 if dep_intersection != set(ts.depends_on): 1024 instance.add_filter("No hardware support", Filters.PLATFORM) 1025 1026 if plat.flash < ts.min_flash: 1027 instance.add_filter("Not enough FLASH", Filters.PLATFORM) 1028 1029 if set(plat.ignore_tags) & ts.tags: 1030 instance.add_filter( 1031 "Excluded tags per platform (exclude_tags)", 1032 Filters.PLATFORM 1033 ) 1034 1035 if plat.only_tags and not set(plat.only_tags) & ts.tags: 1036 instance.add_filter("Excluded tags per platform (only_tags)", Filters.PLATFORM) 1037 1038 if ts.required_snippets: 1039 missing_snippet = False 1040 snippet_args = {"snippets": ts.required_snippets} 1041 found_snippets = snippets.find_snippets_in_roots( 1042 snippet_args, 1043 [*self.env.snippet_roots, Path(ts.source_dir)] 1044 ) 1045 1046 # Search and check that all required snippet files are found 1047 for this_snippet in snippet_args['snippets']: 1048 if this_snippet not in found_snippets: 1049 logger.error( 1050 f"Can't find snippet '{this_snippet}' for test '{ts.name}'" 1051 ) 1052 instance.status = TwisterStatus.ERROR 1053 instance.reason = f"Snippet {this_snippet} not found" 1054 missing_snippet = True 1055 break 1056 1057 if not missing_snippet: 1058 # Look for required snippets and check that they are applicable for these 1059 # platforms/boards 1060 for this_snippet in snippet_args['snippets']: 1061 matched_snippet_board = False 1062 1063 # If the "appends" key is present with at least one entry then this 1064 # snippet applies to all boards and further platform-specific checks 1065 # are not required 1066 if found_snippets[this_snippet].appends: 1067 continue 1068 1069 for this_board in found_snippets[this_snippet].board2appends: 1070 if this_board.startswith('/'): 1071 match = re.search(this_board[1:-1], plat.name) 1072 if match is not None: 1073 matched_snippet_board = True 1074 break 1075 elif this_board == plat.name: 1076 matched_snippet_board = True 1077 break 1078 1079 if matched_snippet_board is False: 1080 instance.add_filter("Snippet not supported", Filters.PLATFORM) 1081 break 1082 1083 # handle quarantined tests 1084 self.handle_quarantined_tests(instance, plat) 1085 1086 # platform_key is a list of unique platform attributes that form a unique key 1087 # a test will match against to determine if it should be scheduled to run. 1088 # A key containing a field name that the platform does not have 1089 # will filter the platform. 1090 # 1091 # A simple example is keying on arch and simulation 1092 # to run a test once per unique (arch, simulation) platform. 1093 if ( 1094 not ignore_platform_key 1095 and hasattr(ts, 'platform_key') 1096 and len(ts.platform_key) > 0 1097 ): 1098 key_fields = sorted(set(ts.platform_key)) 1099 keys = [getattr(plat, key_field, None) for key_field in key_fields] 1100 for key in keys: 1101 if key is None or key == 'na': 1102 instance.add_filter( 1103 "Excluded platform missing key fields" 1104 f" demanded by test {key_fields}", 1105 Filters.PLATFORM 1106 ) 1107 break 1108 else: 1109 test_keys = copy.deepcopy(keys) 1110 test_keys.append(ts.name) 1111 test_keys = tuple(test_keys) 1112 keyed_test = keyed_tests.get(test_keys) 1113 if keyed_test is not None: 1114 plat_key = { 1115 key_field: getattr( 1116 keyed_test['plat'], 1117 key_field 1118 ) for key_field in key_fields 1119 } 1120 instance.add_filter( 1121 f"Already covered for key {key}" 1122 f" by platform {keyed_test['plat'].name} having key {plat_key}", 1123 Filters.PLATFORM_KEY 1124 ) 1125 else: 1126 # do not add a platform to keyed tests if previously 1127 # filtered 1128 1129 if not instance.filters: 1130 keyed_tests[test_keys] = {'plat': plat, 'ts': ts} 1131 1132 # if nothing stopped us until now, it means this configuration 1133 # needs to be added. 1134 instance_list.append(instance) 1135 1136 # no configurations, so jump to next testsuite 1137 if not instance_list: 1138 continue 1139 1140 # if twister was launched with no platform options at all, we 1141 # take all default platforms 1142 if default_platforms and not ts.build_on_all and not integration: 1143 if ts.platform_allow: 1144 _default_p = set(self.default_platforms) 1145 _platform_allow = set(ts.platform_allow) 1146 _intersection = _default_p.intersection(_platform_allow) 1147 if _intersection: 1148 aa = list( 1149 filter( 1150 lambda _scenario: _scenario.platform.name in _intersection, 1151 instance_list 1152 ) 1153 ) 1154 self.add_instances(aa) 1155 else: 1156 self.add_instances(instance_list) 1157 else: 1158 # add integration platforms to the list of default 1159 # platforms, even if we are not in integration mode 1160 _platforms = self.default_platforms + ts.integration_platforms 1161 instances = list( 1162 filter(lambda ts: ts.platform.name in _platforms, instance_list) 1163 ) 1164 self.add_instances(instances) 1165 elif integration: 1166 instances = list( 1167 filter( 1168 lambda item: item.platform.name in ts.integration_platforms, 1169 instance_list 1170 ) 1171 ) 1172 self.add_instances(instances) 1173 1174 elif emulation_platforms: 1175 self.add_instances(instance_list) 1176 for instance in list( 1177 filter( 1178 lambda inst: not inst.platform.simulator_by_name(self.options.sim_name), 1179 instance_list 1180 ) 1181 ): 1182 instance.add_filter("Not an emulated platform", Filters.CMD_LINE) 1183 elif vendor_platforms: 1184 self.add_instances(instance_list) 1185 for instance in list( 1186 filter( 1187 lambda inst: inst.platform.vendor not in vendor_filter, 1188 instance_list 1189 ) 1190 ): 1191 instance.add_filter("Not a selected vendor platform", Filters.CMD_LINE) 1192 else: 1193 self.add_instances(instance_list) 1194 1195 for _, case in self.instances.items(): 1196 # Do not create files for filtered instances 1197 if case.status == TwisterStatus.FILTER: 1198 continue 1199 # set run_id for each unfiltered instance 1200 case.setup_run_id() 1201 case.create_overlay(case.platform, 1202 self.options.enable_asan, 1203 self.options.enable_ubsan, 1204 self.options.enable_coverage, 1205 self.options.coverage_platform) 1206 1207 self.selected_platforms = set(p.platform.name for p in self.instances.values()) 1208 1209 filtered_instances = list( 1210 filter(lambda item: item.status == TwisterStatus.FILTER, self.instances.values()) 1211 ) 1212 for filtered_instance in filtered_instances: 1213 change_skip_to_error_if_integration(self.options, filtered_instance) 1214 1215 filtered_instance.add_missing_case_status(filtered_instance.status) 1216 1217 def add_instances(self, instance_list): 1218 for instance in instance_list: 1219 self.instances[instance.name] = instance 1220 1221 1222 def get_testsuite(self, identifier): 1223 results = [] 1224 for _, ts in self.testsuites.items(): 1225 if ts.id == identifier: 1226 results.append(ts) 1227 return results 1228 1229 def get_testcase(self, identifier): 1230 results = [] 1231 for _, ts in self.testsuites.items(): 1232 for case in ts.testcases: 1233 if case.name == identifier: 1234 results.append(ts) 1235 return results 1236 1237 def verify_platforms_existence(self, platform_names_to_verify, log_info=""): 1238 """ 1239 Verify if platform name (passed by --platform option, or in yaml file 1240 as platform_allow or integration_platforms options) is correct. If not - 1241 log and raise error. 1242 """ 1243 _platforms = [] 1244 for platform in platform_names_to_verify: 1245 if platform in self.platform_names: 1246 p = self.get_platform(platform) 1247 if p: 1248 _platforms.append(p.name) 1249 else: 1250 logger.error(f"{log_info} - unrecognized platform - {platform}") 1251 sys.exit(2) 1252 return _platforms 1253 1254 def create_build_dir_links(self): 1255 """ 1256 Iterate through all no-skipped instances in suite and create links 1257 for each one build directories. Those links will be passed in the next 1258 steps to the CMake command. 1259 """ 1260 1261 links_dir_name = "twister_links" # folder for all links 1262 links_dir_path = os.path.join(self.env.outdir, links_dir_name) 1263 if not os.path.exists(links_dir_path): 1264 os.mkdir(links_dir_path) 1265 1266 for instance in self.instances.values(): 1267 if instance.status != TwisterStatus.SKIP: 1268 self._create_build_dir_link(links_dir_path, instance) 1269 1270 def _create_build_dir_link(self, links_dir_path, instance): 1271 """ 1272 Create build directory with original "long" path. Next take shorter 1273 path and link them with original path - create link. At the end 1274 replace build_dir to created link. This link will be passed to CMake 1275 command. This action helps to limit path length which can be 1276 significant during building by CMake on Windows OS. 1277 """ 1278 1279 os.makedirs(instance.build_dir, exist_ok=True) 1280 1281 link_name = f"test_{self.link_dir_counter}" 1282 link_path = os.path.join(links_dir_path, link_name) 1283 1284 if os.name == "nt": # if OS is Windows 1285 command = ["mklink", "/J", f"{link_path}", os.path.normpath(instance.build_dir)] 1286 subprocess.call(command, shell=True) 1287 else: # for Linux and MAC OS 1288 os.symlink(instance.build_dir, link_path) 1289 1290 # Here original build directory is replaced with symbolic link. It will 1291 # be passed to CMake command 1292 instance.build_dir = link_path 1293 1294 self.link_dir_counter += 1 1295 1296 1297def change_skip_to_error_if_integration(options, instance): 1298 ''' All skips on integration_platforms are treated as errors.''' 1299 if instance.platform.name in instance.testsuite.integration_platforms: 1300 # Do not treat this as error if filter type is among ignore_filters 1301 filters = {t['type'] for t in instance.filters} 1302 ignore_filters ={Filters.CMD_LINE, Filters.SKIP, Filters.PLATFORM_KEY, 1303 Filters.TOOLCHAIN, Filters.MODULE, Filters.TESTPLAN, 1304 Filters.QUARANTINE, Filters.ENVIRONMENT} 1305 if filters.intersection(ignore_filters): 1306 return 1307 instance.status = TwisterStatus.ERROR 1308 instance.reason += " but is one of the integration platforms" 1309 logger.debug( 1310 f"Changing status of {instance.name} to ERROR because it is an integration platform" 1311 ) 1312