1# vim: set syntax=python ts=4 : 2# 3# Copyright (c) 2018-2025 Intel Corporation 4# Copyright 2022 NXP 5# SPDX-License-Identifier: Apache-2.0 6 7import logging 8import multiprocessing 9import os 10import pathlib 11import pickle 12import queue 13import re 14import shutil 15import subprocess 16import sys 17import time 18import traceback 19from math import log10 20from multiprocessing import Lock, Process, Value 21from multiprocessing.managers import BaseManager 22 23import elftools 24import yaml 25from colorama import Fore 26from elftools.elf.elffile import ELFFile 27from elftools.elf.sections import SymbolTableSection 28from packaging import version 29from twisterlib.cmakecache import CMakeCache 30from twisterlib.environment import canonical_zephyr_base 31from twisterlib.error import BuildError, ConfigurationError, StatusAttributeError 32from twisterlib.log_helper import setup_logging 33from twisterlib.statuses import TwisterStatus 34 35if version.parse(elftools.__version__) < version.parse('0.24'): 36 sys.exit("pyelftools is out of date, need version 0.24 or later") 37 38# Job server only works on Linux for now. 39if sys.platform == 'linux': 40 from twisterlib.jobserver import GNUMakeJobClient, GNUMakeJobServer, JobClient 41 42from twisterlib.environment import ZEPHYR_BASE 43 44sys.path.insert(0, os.path.join(ZEPHYR_BASE, "scripts/pylib/build_helpers")) 45from domains import Domains 46from twisterlib.coverage import run_coverage_instance 47from twisterlib.environment import TwisterEnv 48from twisterlib.harness import Ctest, HarnessImporter, Pytest 49from twisterlib.log_helper import log_command 50from twisterlib.platform import Platform 51from twisterlib.testinstance import TestInstance 52from twisterlib.testplan import change_skip_to_error_if_integration 53from twisterlib.testsuite import TestSuite 54 55try: 56 from yaml import CSafeLoader as SafeLoader 57except ImportError: 58 from yaml import SafeLoader 59 60import expr_parser 61from anytree import Node, RenderTree 62 63logger = logging.getLogger('twister') 64logger.setLevel(logging.DEBUG) 65 66 67class ExecutionCounter: 68 def __init__(self, total=0): 69 ''' 70 Most of the stats are at test instance level 71 Except that case statistics are for cases of ALL test instances 72 73 total = yaml test scenarios * applicable platforms 74 done := instances that reached report_out stage of the pipeline 75 done = filtered_configs + passed + failed + error 76 completed = done - filtered_static 77 filtered_configs = filtered_runtime + filtered_static 78 79 pass rate = passed / (total - filtered_configs) 80 case pass rate = passed_cases / (cases - filtered_cases - skipped_cases) 81 ''' 82 # instances that go through the pipeline 83 # updated by report_out() 84 self._done = Value('i', 0) 85 86 # iteration 87 self._iteration = Value('i', 0) 88 89 # instances that actually executed and passed 90 # updated by report_out() 91 self._passed = Value('i', 0) 92 93 # instances that are built but not runnable 94 # updated by report_out() 95 self._notrun = Value('i', 0) 96 97 # static filter + runtime filter + build skipped 98 # updated by update_counting_before_pipeline() and report_out() 99 self._filtered_configs = Value('i', 0) 100 101 # cmake filter + build skipped 102 # updated by report_out() 103 self._filtered_runtime = Value('i', 0) 104 105 # static filtered at yaml parsing time 106 # updated by update_counting_before_pipeline() 107 self._filtered_static = Value('i', 0) 108 109 # updated by report_out() in pipeline 110 self._error = Value('i', 0) 111 self._failed = Value('i', 0) 112 self._skipped = Value('i', 0) 113 114 # initialized to number of test instances 115 self._total = Value('i', total) 116 117 ####################################### 118 # TestCase counters for all instances # 119 ####################################### 120 # updated in report_out 121 self._cases = Value('i', 0) 122 123 # updated by update_counting_before_pipeline() and report_out() 124 self._skipped_cases = Value('i', 0) 125 self._filtered_cases = Value('i', 0) 126 127 # updated by report_out() in pipeline 128 self._passed_cases = Value('i', 0) 129 self._notrun_cases = Value('i', 0) 130 self._failed_cases = Value('i', 0) 131 self._error_cases = Value('i', 0) 132 self._blocked_cases = Value('i', 0) 133 134 # Incorrect statuses 135 self._none_cases = Value('i', 0) 136 self._started_cases = Value('i', 0) 137 138 self._warnings = Value('i', 0) 139 140 self.lock = Lock() 141 142 @staticmethod 143 def _find_number_length(n): 144 if n > 0: 145 length = int(log10(n))+1 146 elif n == 0: 147 length = 1 148 else: 149 length = int(log10(-n))+2 150 return length 151 152 def summary(self): 153 selected_cases = self.cases - self.filtered_cases 154 selected_configs = self.done - self.filtered_static - self.filtered_runtime 155 156 157 root = Node("Summary") 158 159 Node(f"Total test suites: {self.total}", parent=root) 160 processed_suites = Node(f"Processed test suites: {self.done}", parent=root) 161 filtered_suites = Node( 162 f"Filtered test suites: {self.filtered_configs}", 163 parent=processed_suites 164 ) 165 Node(f"Filtered test suites (static): {self.filtered_static}", parent=filtered_suites) 166 Node(f"Filtered test suites (at runtime): {self.filtered_runtime}", parent=filtered_suites) 167 selected_suites = Node(f"Selected test suites: {selected_configs}", parent=processed_suites) 168 Node(f"Skipped test suites: {self.skipped}", parent=selected_suites) 169 Node(f"Passed test suites: {self.passed}", parent=selected_suites) 170 Node(f"Built only test suites: {self.notrun}", parent=selected_suites) 171 Node(f"Failed test suites: {self.failed}", parent=selected_suites) 172 Node(f"Errors in test suites: {self.error}", parent=selected_suites) 173 174 total_cases = Node(f"Total test cases: {self.cases}", parent=root) 175 Node(f"Filtered test cases: {self.filtered_cases}", parent=total_cases) 176 selected_cases_node = Node(f"Selected test cases: {selected_cases}", parent=total_cases) 177 Node(f"Passed test cases: {self.passed_cases}", parent=selected_cases_node) 178 Node(f"Skipped test cases: {self.skipped_cases}", parent=selected_cases_node) 179 Node(f"Built only test cases: {self.notrun_cases}", parent=selected_cases_node) 180 Node(f"Blocked test cases: {self.blocked_cases}", parent=selected_cases_node) 181 Node(f"Failed test cases: {self.failed_cases}", parent=selected_cases_node) 182 error_cases_node = Node( 183 f"Errors in test cases: {self.error_cases}", 184 parent=selected_cases_node 185 ) 186 187 if self.none_cases or self.started_cases: 188 Node( 189 "The following test case statuses should not appear in a proper execution", 190 parent=error_cases_node 191 ) 192 if self.none_cases: 193 Node(f"Statusless test cases: {self.none_cases}", parent=error_cases_node) 194 if self.started_cases: 195 Node(f"Test cases only started: {self.started_cases}", parent=error_cases_node) 196 197 for pre, _, node in RenderTree(root): 198 print(f"{pre}{node.name}") 199 200 @property 201 def warnings(self): 202 with self._warnings.get_lock(): 203 return self._warnings.value 204 205 @warnings.setter 206 def warnings(self, value): 207 with self._warnings.get_lock(): 208 self._warnings.value = value 209 210 def warnings_increment(self, value=1): 211 with self._warnings.get_lock(): 212 self._warnings.value += value 213 214 @property 215 def cases(self): 216 with self._cases.get_lock(): 217 return self._cases.value 218 219 @cases.setter 220 def cases(self, value): 221 with self._cases.get_lock(): 222 self._cases.value = value 223 224 def cases_increment(self, value=1): 225 with self._cases.get_lock(): 226 self._cases.value += value 227 228 @property 229 def skipped_cases(self): 230 with self._skipped_cases.get_lock(): 231 return self._skipped_cases.value 232 233 @skipped_cases.setter 234 def skipped_cases(self, value): 235 with self._skipped_cases.get_lock(): 236 self._skipped_cases.value = value 237 238 def skipped_cases_increment(self, value=1): 239 with self._skipped_cases.get_lock(): 240 self._skipped_cases.value += value 241 242 @property 243 def filtered_cases(self): 244 with self._filtered_cases.get_lock(): 245 return self._filtered_cases.value 246 247 @filtered_cases.setter 248 def filtered_cases(self, value): 249 with self._filtered_cases.get_lock(): 250 self._filtered_cases.value = value 251 252 def filtered_cases_increment(self, value=1): 253 with self._filtered_cases.get_lock(): 254 self._filtered_cases.value += value 255 256 @property 257 def passed_cases(self): 258 with self._passed_cases.get_lock(): 259 return self._passed_cases.value 260 261 @passed_cases.setter 262 def passed_cases(self, value): 263 with self._passed_cases.get_lock(): 264 self._passed_cases.value = value 265 266 def passed_cases_increment(self, value=1): 267 with self._passed_cases.get_lock(): 268 self._passed_cases.value += value 269 270 @property 271 def notrun_cases(self): 272 with self._notrun_cases.get_lock(): 273 return self._notrun_cases.value 274 275 @notrun_cases.setter 276 def notrun_cases(self, value): 277 with self._notrun.get_lock(): 278 self._notrun.value = value 279 280 def notrun_cases_increment(self, value=1): 281 with self._notrun_cases.get_lock(): 282 self._notrun_cases.value += value 283 284 @property 285 def failed_cases(self): 286 with self._failed_cases.get_lock(): 287 return self._failed_cases.value 288 289 @failed_cases.setter 290 def failed_cases(self, value): 291 with self._failed_cases.get_lock(): 292 self._failed_cases.value = value 293 294 def failed_cases_increment(self, value=1): 295 with self._failed_cases.get_lock(): 296 self._failed_cases.value += value 297 298 @property 299 def error_cases(self): 300 with self._error_cases.get_lock(): 301 return self._error_cases.value 302 303 @error_cases.setter 304 def error_cases(self, value): 305 with self._error_cases.get_lock(): 306 self._error_cases.value = value 307 308 def error_cases_increment(self, value=1): 309 with self._error_cases.get_lock(): 310 self._error_cases.value += value 311 312 @property 313 def blocked_cases(self): 314 with self._blocked_cases.get_lock(): 315 return self._blocked_cases.value 316 317 @blocked_cases.setter 318 def blocked_cases(self, value): 319 with self._blocked_cases.get_lock(): 320 self._blocked_cases.value = value 321 322 def blocked_cases_increment(self, value=1): 323 with self._blocked_cases.get_lock(): 324 self._blocked_cases.value += value 325 326 @property 327 def none_cases(self): 328 with self._none_cases.get_lock(): 329 return self._none_cases.value 330 331 @none_cases.setter 332 def none_cases(self, value): 333 with self._none_cases.get_lock(): 334 self._none_cases.value = value 335 336 def none_cases_increment(self, value=1): 337 with self._none_cases.get_lock(): 338 self._none_cases.value += value 339 340 @property 341 def started_cases(self): 342 with self._started_cases.get_lock(): 343 return self._started_cases.value 344 345 @started_cases.setter 346 def started_cases(self, value): 347 with self._started_cases.get_lock(): 348 self._started_cases.value = value 349 350 def started_cases_increment(self, value=1): 351 with self._started_cases.get_lock(): 352 self._started_cases.value += value 353 354 @property 355 def skipped(self): 356 with self._skipped.get_lock(): 357 return self._skipped.value 358 359 @skipped.setter 360 def skipped(self, value): 361 with self._skipped.get_lock(): 362 self._skipped.value = value 363 364 def skipped_increment(self, value=1): 365 with self._skipped.get_lock(): 366 self._skipped.value += value 367 368 @property 369 def error(self): 370 with self._error.get_lock(): 371 return self._error.value 372 373 @error.setter 374 def error(self, value): 375 with self._error.get_lock(): 376 self._error.value = value 377 378 def error_increment(self, value=1): 379 with self._error.get_lock(): 380 self._error.value += value 381 382 @property 383 def iteration(self): 384 with self._iteration.get_lock(): 385 return self._iteration.value 386 387 @iteration.setter 388 def iteration(self, value): 389 with self._iteration.get_lock(): 390 self._iteration.value = value 391 392 def iteration_increment(self, value=1): 393 with self._iteration.get_lock(): 394 self._iteration.value += value 395 396 @property 397 def done(self): 398 with self._done.get_lock(): 399 return self._done.value 400 401 @done.setter 402 def done(self, value): 403 with self._done.get_lock(): 404 self._done.value = value 405 406 def done_increment(self, value=1): 407 with self._done.get_lock(): 408 self._done.value += value 409 410 @property 411 def passed(self): 412 with self._passed.get_lock(): 413 return self._passed.value 414 415 @passed.setter 416 def passed(self, value): 417 with self._passed.get_lock(): 418 self._passed.value = value 419 420 def passed_increment(self, value=1): 421 with self._passed.get_lock(): 422 self._passed.value += value 423 424 @property 425 def notrun(self): 426 with self._notrun.get_lock(): 427 return self._notrun.value 428 429 @notrun.setter 430 def notrun(self, value): 431 with self._notrun.get_lock(): 432 self._notrun.value = value 433 434 def notrun_increment(self, value=1): 435 with self._notrun.get_lock(): 436 self._notrun.value += value 437 438 @property 439 def filtered_configs(self): 440 with self._filtered_configs.get_lock(): 441 return self._filtered_configs.value 442 443 @filtered_configs.setter 444 def filtered_configs(self, value): 445 with self._filtered_configs.get_lock(): 446 self._filtered_configs.value = value 447 448 def filtered_configs_increment(self, value=1): 449 with self._filtered_configs.get_lock(): 450 self._filtered_configs.value += value 451 452 @property 453 def filtered_static(self): 454 with self._filtered_static.get_lock(): 455 return self._filtered_static.value 456 457 @filtered_static.setter 458 def filtered_static(self, value): 459 with self._filtered_static.get_lock(): 460 self._filtered_static.value = value 461 462 def filtered_static_increment(self, value=1): 463 with self._filtered_static.get_lock(): 464 self._filtered_static.value += value 465 466 @property 467 def filtered_runtime(self): 468 with self._filtered_runtime.get_lock(): 469 return self._filtered_runtime.value 470 471 @filtered_runtime.setter 472 def filtered_runtime(self, value): 473 with self._filtered_runtime.get_lock(): 474 self._filtered_runtime.value = value 475 476 def filtered_runtime_increment(self, value=1): 477 with self._filtered_runtime.get_lock(): 478 self._filtered_runtime.value += value 479 480 @property 481 def failed(self): 482 with self._failed.get_lock(): 483 return self._failed.value 484 485 @failed.setter 486 def failed(self, value): 487 with self._failed.get_lock(): 488 self._failed.value = value 489 490 def failed_increment(self, value=1): 491 with self._failed.get_lock(): 492 self._failed.value += value 493 494 @property 495 def total(self): 496 with self._total.get_lock(): 497 return self._total.value 498 499 @total.setter 500 def total(self, value): 501 with self._total.get_lock(): 502 self._total.value = value 503 504 def total_increment(self, value=1): 505 with self._total.get_lock(): 506 self._total.value += value 507 508class CMake: 509 config_re = re.compile('(CONFIG_[A-Za-z0-9_]+)[=]\"?([^\"]*)\"?$') 510 dt_re = re.compile('([A-Za-z0-9_]+)[=]\"?([^\"]*)\"?$') 511 512 def __init__(self, testsuite: TestSuite, platform: Platform, source_dir, build_dir, jobserver): 513 514 self.cwd = None 515 self.capture_output = True 516 517 self.defconfig = {} 518 self.cmake_cache = {} 519 520 self.instance = None 521 self.testsuite = testsuite 522 self.platform = platform 523 self.source_dir = source_dir 524 self.build_dir = build_dir 525 self.log = "build.log" 526 527 self.default_encoding = sys.getdefaultencoding() 528 self.jobserver = jobserver 529 530 def parse_generated(self, filter_stages=None): 531 self.defconfig = {} 532 return {} 533 534 def run_build(self, args=None): 535 if args is None: 536 args = [] 537 538 logger.debug(f"Building {self.source_dir} for {self.platform.name}") 539 540 cmake_args = [] 541 cmake_args.extend(args) 542 cmake = shutil.which('cmake') 543 cmd = [cmake] + cmake_args 544 kwargs = dict() 545 546 if self.capture_output: 547 kwargs['stdout'] = subprocess.PIPE 548 # CMake sends the output of message() to stderr unless it's STATUS 549 kwargs['stderr'] = subprocess.STDOUT 550 551 if self.cwd: 552 kwargs['cwd'] = self.cwd 553 554 start_time = time.time() 555 if sys.platform == 'linux': 556 p = self.jobserver.popen(cmd, **kwargs) 557 else: 558 p = subprocess.Popen(cmd, **kwargs) 559 logger.debug(f'Running {" ".join(cmd)}') 560 561 out, _ = p.communicate() 562 563 ret = {} 564 duration = time.time() - start_time 565 self.instance.build_time += duration 566 if p.returncode == 0: 567 msg = ( 568 f"Finished building {self.source_dir} for {self.platform.name}" 569 f" in {duration:.2f} seconds" 570 ) 571 logger.debug(msg) 572 573 if not self.instance.run: 574 self.instance.status = TwisterStatus.NOTRUN 575 self.instance.add_missing_case_status(TwisterStatus.NOTRUN, "Test was built only") 576 else: 577 self.instance.status = TwisterStatus.PASS 578 ret = {"returncode": p.returncode} 579 580 if out: 581 log_msg = out.decode(self.default_encoding) 582 with open( 583 os.path.join(self.build_dir, self.log), 584 "a", 585 encoding=self.default_encoding 586 ) as log: 587 log.write(log_msg) 588 else: 589 return None 590 else: 591 # A real error occurred, raise an exception 592 log_msg = "" 593 if out: 594 log_msg = out.decode(self.default_encoding) 595 with open( 596 os.path.join(self.build_dir, self.log), 597 "a", 598 encoding=self.default_encoding 599 ) as log: 600 log.write(log_msg) 601 602 if log_msg: 603 pattern = ( 604 r"region `(FLASH|ROM|RAM|ICCM|DCCM|SRAM|" 605 r"dram\d_\d_seg|iram\d_\d_seg)' " 606 "overflowed by" 607 ) 608 overflow_found = re.findall(pattern, log_msg) 609 610 imgtool_overflow_found = re.findall( 611 r"Error: Image size \(.*\) \+ trailer \(.*\) exceeds requested size", 612 log_msg 613 ) 614 if overflow_found and not self.options.overflow_as_errors: 615 logger.debug(f"Test skipped due to {overflow_found[0]} Overflow") 616 self.instance.status = TwisterStatus.SKIP 617 self.instance.reason = f"{overflow_found[0]} overflow" 618 change_skip_to_error_if_integration(self.options, self.instance) 619 elif imgtool_overflow_found and not self.options.overflow_as_errors: 620 self.instance.status = TwisterStatus.SKIP 621 self.instance.reason = "imgtool overflow" 622 change_skip_to_error_if_integration(self.options, self.instance) 623 else: 624 self.instance.status = TwisterStatus.ERROR 625 self.instance.reason = "Build failure" 626 627 ret = { 628 "returncode": p.returncode 629 } 630 631 return ret 632 633 def run_cmake(self, args="", filter_stages=None): 634 if filter_stages is None: 635 filter_stages = [] 636 637 if not self.options.disable_warnings_as_errors: 638 warnings_as_errors = 'y' 639 gen_edt_args = "--edtlib-Werror" 640 else: 641 warnings_as_errors = 'n' 642 gen_edt_args = "" 643 644 warning_command = 'CONFIG_COMPILER_WARNINGS_AS_ERRORS' 645 if self.instance.sysbuild: 646 warning_command = 'SB_' + warning_command 647 648 logger.debug(f"Running cmake on {self.source_dir} for {self.platform.name}") 649 cmake_args = [ 650 f'-B{self.build_dir}', 651 f'-DTC_RUNID={self.instance.run_id}', 652 f'-DTC_NAME={self.instance.testsuite.name}', 653 f'-D{warning_command}={warnings_as_errors}', 654 f'-DEXTRA_GEN_EDT_ARGS={gen_edt_args}', 655 f'-G{self.env.generator}', 656 f'-DPython3_EXECUTABLE={pathlib.Path(sys.executable).as_posix()}' 657 ] 658 659 if self.instance.testsuite.harness == 'bsim': 660 cmake_args.extend([ 661 '-DCMAKE_EXPORT_COMPILE_COMMANDS=ON', 662 '-DCONFIG_ASSERT=y', 663 '-DCONFIG_COVERAGE=y' 664 ]) 665 666 if self.instance.toolchain: 667 cmake_args.append(f'-DZEPHYR_TOOLCHAIN_VARIANT={self.instance.toolchain}') 668 669 # If needed, run CMake using the package_helper script first, to only run 670 # a subset of all cmake modules. This output will be used to filter 671 # testcases, and the full CMake configuration will be run for 672 # testcases that should be built 673 if filter_stages: 674 cmake_filter_args = [ 675 f'-DMODULES={",".join(filter_stages)}', 676 f'-P{canonical_zephyr_base}/cmake/package_helper.cmake', 677 ] 678 679 if self.instance.sysbuild and not filter_stages: 680 logger.debug(f"Building {self.source_dir} using sysbuild") 681 source_args = [ 682 f'-S{canonical_zephyr_base}/share/sysbuild', 683 f'-DAPP_DIR={self.source_dir}' 684 ] 685 else: 686 source_args = [ 687 f'-S{self.source_dir}' 688 ] 689 cmake_args.extend(source_args) 690 691 cmake_args.extend(args) 692 693 cmake_opts = [f'-DBOARD={self.platform.name}'] 694 cmake_args.extend(cmake_opts) 695 696 if self.instance.testsuite.required_snippets: 697 cmake_opts = [ 698 '-DSNIPPET={}'.format(';'.join(self.instance.testsuite.required_snippets)) 699 ] 700 cmake_args.extend(cmake_opts) 701 702 cmake = shutil.which('cmake') 703 cmd = [cmake] + cmake_args 704 705 if filter_stages: 706 cmd += cmake_filter_args 707 708 kwargs = dict() 709 710 log_command(logger, "Calling cmake", cmd) 711 712 if self.capture_output: 713 kwargs['stdout'] = subprocess.PIPE 714 # CMake sends the output of message() to stderr unless it's STATUS 715 kwargs['stderr'] = subprocess.STDOUT 716 717 if self.cwd: 718 kwargs['cwd'] = self.cwd 719 720 start_time = time.time() 721 if sys.platform == 'linux': 722 p = self.jobserver.popen(cmd, **kwargs) 723 else: 724 p = subprocess.Popen(cmd, **kwargs) 725 out, _ = p.communicate() 726 727 duration = time.time() - start_time 728 self.instance.build_time += duration 729 730 if p.returncode == 0: 731 filter_results = self.parse_generated(filter_stages) 732 msg = ( 733 f"Finished running cmake {self.source_dir} for {self.platform.name}" 734 f" in {duration:.2f} seconds" 735 ) 736 logger.debug(msg) 737 ret = { 738 'returncode': p.returncode, 739 'filter': filter_results 740 } 741 else: 742 self.instance.status = TwisterStatus.ERROR 743 self.instance.reason = "CMake build failure" 744 745 for tc in self.instance.testcases: 746 tc.status = self.instance.status 747 748 logger.error(f"CMake build failure: {self.source_dir} for {self.platform.name}") 749 ret = {"returncode": p.returncode} 750 751 if out: 752 os.makedirs(self.build_dir, exist_ok=True) 753 with open( 754 os.path.join(self.build_dir, self.log), 755 "a", 756 encoding=self.default_encoding 757 ) as log: 758 log_msg = out.decode(self.default_encoding) 759 log.write(log_msg) 760 761 return ret 762 763 764class FilterBuilder(CMake): 765 766 def __init__(self, testsuite: TestSuite, platform: Platform, source_dir, build_dir, jobserver): 767 super().__init__(testsuite, platform, source_dir, build_dir, jobserver) 768 769 self.log = "config-twister.log" 770 771 def parse_generated(self, filter_stages=None): 772 if filter_stages is None: 773 filter_stages = [] 774 775 if self.platform.name == "unit_testing": 776 return {} 777 778 if self.instance.sysbuild and not filter_stages: 779 # Load domain yaml to get default domain build directory 780 domain_path = os.path.join(self.build_dir, "domains.yaml") 781 domains = Domains.from_file(domain_path) 782 logger.debug(f"Loaded sysbuild domain data from {domain_path}") 783 self.instance.domains = domains 784 domain_build = domains.get_default_domain().build_dir 785 cmake_cache_path = os.path.join(domain_build, "CMakeCache.txt") 786 defconfig_path = os.path.join(domain_build, "zephyr", ".config") 787 edt_pickle = os.path.join(domain_build, "zephyr", "edt.pickle") 788 else: 789 cmake_cache_path = os.path.join(self.build_dir, "CMakeCache.txt") 790 # .config is only available after kconfig stage in cmake. 791 # If only dt based filtration is required package helper call won't produce .config 792 if not filter_stages or "kconfig" in filter_stages: 793 defconfig_path = os.path.join(self.build_dir, "zephyr", ".config") 794 # dt is compiled before kconfig, 795 # so edt_pickle is available regardless of choice of filter stages 796 edt_pickle = os.path.join(self.build_dir, "zephyr", "edt.pickle") 797 798 799 if not filter_stages or "kconfig" in filter_stages: 800 with open(defconfig_path) as fp: 801 defconfig = {} 802 for line in fp.readlines(): 803 m = self.config_re.match(line) 804 if not m: 805 if line.strip() and not line.startswith("#"): 806 sys.stderr.write(f"Unrecognized line {line}\n") 807 continue 808 defconfig[m.group(1)] = m.group(2).strip() 809 810 self.defconfig = defconfig 811 812 cmake_conf = {} 813 try: 814 cache = CMakeCache.from_file(cmake_cache_path) 815 except FileNotFoundError: 816 cache = {} 817 818 for k in iter(cache): 819 cmake_conf[k.name] = k.value 820 821 self.cmake_cache = cmake_conf 822 823 filter_data = { 824 "ARCH": self.platform.arch, 825 "PLATFORM": self.platform.name 826 } 827 filter_data.update(os.environ) 828 if not filter_stages or "kconfig" in filter_stages: 829 filter_data.update(self.defconfig) 830 filter_data.update(self.cmake_cache) 831 832 # Verify that twister's arguments support sysbuild. 833 # Twister sysbuild flashing currently only works with west, 834 # so --west-flash must be passed. 835 if ( 836 self.instance.sysbuild 837 and self.env.options.device_testing 838 and self.env.options.west_flash is None 839 ): 840 logger.warning("Sysbuild test will be skipped. West must be used for flashing.") 841 return { 842 os.path.join( 843 self.platform.name, 844 self.instance.toolchain, 845 self.testsuite.name 846 ): True 847 } 848 849 if self.testsuite and self.testsuite.filter: 850 try: 851 if os.path.exists(edt_pickle): 852 with open(edt_pickle, 'rb') as f: 853 edt = pickle.load(f) 854 else: 855 edt = None 856 ret = expr_parser.parse(self.testsuite.filter, filter_data, edt) 857 858 except (ValueError, SyntaxError) as se: 859 sys.stderr.write(f"Failed processing {self.testsuite.yamlfile}\n") 860 raise se 861 862 if not ret: 863 return { 864 os.path.join( 865 self.platform.name, 866 self.instance.toolchain, 867 self.testsuite.name 868 ): True 869 } 870 else: 871 return { 872 os.path.join( 873 self.platform.name, 874 self.instance.toolchain, 875 self.testsuite.name 876 ): False 877 } 878 else: 879 self.platform.filter_data = filter_data 880 return filter_data 881 882 883class ProjectBuilder(FilterBuilder): 884 885 def __init__(self, instance: TestInstance, env: TwisterEnv, jobserver, **kwargs): 886 super().__init__( 887 instance.testsuite, 888 instance.platform, 889 instance.testsuite.source_dir, 890 instance.build_dir, 891 jobserver 892 ) 893 894 self.log = "build.log" 895 self.instance = instance 896 self.filtered_tests = 0 897 self.options = env.options 898 self.env = env 899 self.duts = None 900 901 @property 902 def trace(self) -> bool: 903 return self.options.verbose > 2 904 905 def log_info(self, filename, inline_logs, log_testcases=False): 906 filename = os.path.abspath(os.path.realpath(filename)) 907 if inline_logs: 908 logger.info(f"{filename:-^100}") 909 910 try: 911 with open(filename) as fp: 912 data = fp.read() 913 except Exception as e: 914 data = f"Unable to read log data ({e!s})\n" 915 916 # Remove any coverage data from the dumped logs 917 data = re.sub( 918 r"GCOV_COVERAGE_DUMP_START.*GCOV_COVERAGE_DUMP_END", 919 "GCOV_COVERAGE_DUMP_START\n...\nGCOV_COVERAGE_DUMP_END", 920 data, 921 flags=re.DOTALL, 922 ) 923 logger.error(data) 924 925 logger.info(f"{filename:-^100}") 926 927 if log_testcases: 928 for tc in self.instance.testcases: 929 if not tc.reason: 930 continue 931 logger.info( 932 f"\n{str(tc.name).center(100, '_')}\n" 933 f"{tc.reason}\n" 934 f"{100*'_'}\n" 935 f"{tc.output}" 936 ) 937 else: 938 logger.error("see: " + Fore.YELLOW + filename + Fore.RESET) 939 940 def log_info_file(self, inline_logs): 941 build_dir = self.instance.build_dir 942 h_log = f"{build_dir}/handler.log" 943 he_log = f"{build_dir}/handler_stderr.log" 944 b_log = f"{build_dir}/build.log" 945 v_log = f"{build_dir}/valgrind.log" 946 d_log = f"{build_dir}/device.log" 947 pytest_log = f"{build_dir}/twister_harness.log" 948 949 if os.path.exists(v_log) and "Valgrind" in self.instance.reason: 950 self.log_info(f"{v_log}", inline_logs) 951 elif os.path.exists(pytest_log) and os.path.getsize(pytest_log) > 0: 952 self.log_info(f"{pytest_log}", inline_logs, log_testcases=True) 953 elif os.path.exists(h_log) and os.path.getsize(h_log) > 0: 954 self.log_info(f"{h_log}", inline_logs) 955 elif os.path.exists(he_log) and os.path.getsize(he_log) > 0: 956 self.log_info(f"{he_log}", inline_logs) 957 elif os.path.exists(d_log) and os.path.getsize(d_log) > 0: 958 self.log_info(f"{d_log}", inline_logs) 959 else: 960 self.log_info(f"{b_log}", inline_logs) 961 962 963 def _add_to_pipeline(self, pipeline, op: str, additionals: dict=None): 964 if additionals is None: 965 additionals = {} 966 try: 967 if op: 968 task = dict({'op': op, 'test': self.instance}, **additionals) 969 pipeline.put(task) 970 # Only possible RuntimeError source here is a mutation of the pipeline during iteration. 971 # If that happens, we ought to consider the whole pipeline corrupted. 972 except RuntimeError as e: 973 logger.error(f"RuntimeError: {e}") 974 traceback.print_exc() 975 976 977 def process(self, pipeline, done, message, lock, results): 978 next_op = None 979 additionals = {} 980 981 op = message.get('op') 982 options = self.options 983 if not logger.handlers: 984 setup_logging(options.outdir, options.log_file, options.log_level, options.timestamps) 985 self.instance.setup_handler(self.env) 986 987 if op == "filter": 988 try: 989 ret = self.cmake(filter_stages=self.instance.filter_stages) 990 if self.instance.status in [TwisterStatus.FAIL, TwisterStatus.ERROR]: 991 next_op = 'report' 992 else: 993 # Here we check the dt/kconfig filter results coming from running cmake 994 if self.instance.name in ret['filter'] and ret['filter'][self.instance.name]: 995 logger.debug(f"filtering {self.instance.name}") 996 self.instance.status = TwisterStatus.FILTER 997 self.instance.reason = "runtime filter" 998 results.filtered_runtime_increment() 999 self.instance.add_missing_case_status(TwisterStatus.FILTER) 1000 next_op = 'report' 1001 else: 1002 next_op = 'cmake' 1003 except StatusAttributeError as sae: 1004 logger.error(str(sae)) 1005 self.instance.status = TwisterStatus.ERROR 1006 reason = 'Incorrect status assignment' 1007 self.instance.reason = reason 1008 self.instance.add_missing_case_status(TwisterStatus.BLOCK, reason) 1009 next_op = 'report' 1010 finally: 1011 self._add_to_pipeline(pipeline, next_op) 1012 1013 # The build process, call cmake and build with configured generator 1014 elif op == "cmake": 1015 try: 1016 ret = self.cmake() 1017 if self.instance.status in [TwisterStatus.FAIL, TwisterStatus.ERROR]: 1018 next_op = 'report' 1019 elif self.options.cmake_only: 1020 if self.instance.status == TwisterStatus.NONE: 1021 logger.debug(f"CMake only: PASS {self.instance.name}") 1022 self.instance.status = TwisterStatus.NOTRUN 1023 self.instance.add_missing_case_status(TwisterStatus.NOTRUN, 'CMake only') 1024 next_op = 'report' 1025 else: 1026 # Here we check the runtime filter results coming from running cmake 1027 if self.instance.name in ret['filter'] and ret['filter'][self.instance.name]: 1028 logger.debug(f"filtering {self.instance.name}") 1029 self.instance.status = TwisterStatus.FILTER 1030 self.instance.reason = "runtime filter" 1031 results.filtered_runtime_increment() 1032 self.instance.add_missing_case_status(TwisterStatus.FILTER) 1033 next_op = 'report' 1034 else: 1035 next_op = 'build' 1036 except StatusAttributeError as sae: 1037 logger.error(str(sae)) 1038 self.instance.status = TwisterStatus.ERROR 1039 reason = 'Incorrect status assignment' 1040 self.instance.reason = reason 1041 self.instance.add_missing_case_status(TwisterStatus.BLOCK, reason) 1042 next_op = 'report' 1043 finally: 1044 self._add_to_pipeline(pipeline, next_op) 1045 1046 elif op == "build": 1047 try: 1048 logger.debug(f"build test: {self.instance.name}") 1049 ret = self.build() 1050 if not ret: 1051 self.instance.status = TwisterStatus.ERROR 1052 self.instance.reason = "Build Failure" 1053 next_op = 'report' 1054 else: 1055 # Count skipped cases during build, for example 1056 # due to ram/rom overflow. 1057 if self.instance.status == TwisterStatus.SKIP: 1058 results.skipped_increment() 1059 self.instance.add_missing_case_status( 1060 TwisterStatus.SKIP, 1061 self.instance.reason 1062 ) 1063 1064 if ret.get('returncode', 1) > 0: 1065 self.instance.add_missing_case_status( 1066 TwisterStatus.BLOCK, 1067 self.instance.reason 1068 ) 1069 next_op = 'report' 1070 else: 1071 if self.instance.testsuite.harness in ['ztest', 'test']: 1072 logger.debug( 1073 f"Determine test cases for test instance: {self.instance.name}" 1074 ) 1075 try: 1076 self.determine_testcases(results) 1077 next_op = 'gather_metrics' 1078 except BuildError as e: 1079 logger.error(str(e)) 1080 self.instance.status = TwisterStatus.ERROR 1081 self.instance.reason = str(e) 1082 next_op = 'report' 1083 else: 1084 next_op = 'gather_metrics' 1085 except StatusAttributeError as sae: 1086 logger.error(str(sae)) 1087 self.instance.status = TwisterStatus.ERROR 1088 reason = 'Incorrect status assignment' 1089 self.instance.reason = reason 1090 self.instance.add_missing_case_status(TwisterStatus.BLOCK, reason) 1091 next_op = 'report' 1092 finally: 1093 self._add_to_pipeline(pipeline, next_op) 1094 1095 elif op == "gather_metrics": 1096 try: 1097 ret = self.gather_metrics(self.instance) 1098 if not ret or ret.get('returncode', 1) > 0: 1099 self.instance.status = TwisterStatus.ERROR 1100 self.instance.reason = "Build Failure at gather_metrics." 1101 next_op = 'report' 1102 elif self.instance.run and self.instance.handler.ready: 1103 next_op = 'run' 1104 else: 1105 if self.instance.status == TwisterStatus.NOTRUN: 1106 run_conditions = ( 1107 f"(run:{self.instance.run}," 1108 f" handler.ready:{self.instance.handler.ready})" 1109 ) 1110 logger.debug(f"Instance {self.instance.name} can't run {run_conditions}") 1111 self.instance.add_missing_case_status( 1112 TwisterStatus.NOTRUN, 1113 "Nowhere to run" 1114 ) 1115 next_op = 'report' 1116 except StatusAttributeError as sae: 1117 logger.error(str(sae)) 1118 self.instance.status = TwisterStatus.ERROR 1119 reason = 'Incorrect status assignment' 1120 self.instance.reason = reason 1121 self.instance.add_missing_case_status(TwisterStatus.BLOCK, reason) 1122 next_op = 'report' 1123 finally: 1124 self._add_to_pipeline(pipeline, next_op) 1125 1126 # Run the generated binary using one of the supported handlers 1127 elif op == "run": 1128 try: 1129 logger.debug(f"run test: {self.instance.name}") 1130 self.run() 1131 logger.debug(f"run status: {self.instance.name} {self.instance.status}") 1132 1133 # to make it work with pickle 1134 self.instance.handler.thread = None 1135 self.instance.handler.duts = None 1136 1137 next_op = "coverage" if self.options.coverage else "report" 1138 additionals = { 1139 "status": self.instance.status, 1140 "reason": self.instance.reason 1141 } 1142 except StatusAttributeError as sae: 1143 logger.error(str(sae)) 1144 self.instance.status = TwisterStatus.ERROR 1145 reason = 'Incorrect status assignment' 1146 self.instance.reason = reason 1147 self.instance.add_missing_case_status(TwisterStatus.BLOCK, reason) 1148 next_op = 'report' 1149 additionals = {} 1150 finally: 1151 self._add_to_pipeline(pipeline, next_op, additionals) 1152 1153 # Run per-instance code coverage 1154 elif op == "coverage": 1155 try: 1156 logger.debug(f"Run coverage for '{self.instance.name}'") 1157 self.instance.coverage_status, self.instance.coverage = \ 1158 run_coverage_instance(self.options, self.instance) 1159 next_op = 'report' 1160 additionals = { 1161 "status": self.instance.status, 1162 "reason": self.instance.reason 1163 } 1164 except StatusAttributeError as sae: 1165 logger.error(str(sae)) 1166 self.instance.status = TwisterStatus.ERROR 1167 reason = f"Incorrect status assignment on {op}" 1168 self.instance.reason = reason 1169 self.instance.add_missing_case_status(TwisterStatus.BLOCK, reason) 1170 next_op = 'report' 1171 additionals = {} 1172 finally: 1173 self._add_to_pipeline(pipeline, next_op, additionals) 1174 1175 # Report results and output progress to screen 1176 elif op == "report": 1177 try: 1178 with lock: 1179 done.put(self.instance) 1180 self.report_out(results) 1181 1182 if not self.options.coverage: 1183 if self.options.prep_artifacts_for_testing: 1184 next_op = 'cleanup' 1185 additionals = {"mode": "device"} 1186 elif self.options.runtime_artifact_cleanup == "pass" and \ 1187 self.instance.status in [TwisterStatus.PASS, TwisterStatus.NOTRUN]: 1188 next_op = 'cleanup' 1189 additionals = {"mode": "passed"} 1190 elif self.options.runtime_artifact_cleanup == "all": 1191 next_op = 'cleanup' 1192 additionals = {"mode": "all"} 1193 except StatusAttributeError as sae: 1194 logger.error(str(sae)) 1195 self.instance.status = TwisterStatus.ERROR 1196 reason = 'Incorrect status assignment' 1197 self.instance.reason = reason 1198 self.instance.add_missing_case_status(TwisterStatus.BLOCK, reason) 1199 next_op = None 1200 additionals = {} 1201 finally: 1202 self._add_to_pipeline(pipeline, next_op, additionals) 1203 1204 elif op == "cleanup": 1205 try: 1206 mode = message.get("mode") 1207 if mode == "device": 1208 self.cleanup_device_testing_artifacts() 1209 elif ( 1210 mode == "passed" 1211 or (mode == "all" and self.instance.reason != "CMake build failure") 1212 ): 1213 self.cleanup_artifacts(self.options.keep_artifacts) 1214 except StatusAttributeError as sae: 1215 logger.error(str(sae)) 1216 self.instance.status = TwisterStatus.ERROR 1217 reason = 'Incorrect status assignment' 1218 self.instance.reason = reason 1219 self.instance.add_missing_case_status(TwisterStatus.BLOCK, reason) 1220 1221 def demangle(self, symbol_name): 1222 if symbol_name[:2] == '_Z': 1223 try: 1224 cpp_filt = subprocess.run( 1225 'c++filt', 1226 input=symbol_name, 1227 text=True, 1228 check=True, 1229 capture_output=True 1230 ) 1231 if self.trace: 1232 logger.debug(f"Demangle: '{symbol_name}'==>'{cpp_filt.stdout}'") 1233 return cpp_filt.stdout.strip() 1234 except Exception as e: 1235 logger.error(f"Failed to demangle '{symbol_name}': {e}") 1236 return symbol_name 1237 1238 def determine_testcases(self, results): 1239 logger.debug(f"Determine test cases for test suite: {self.instance.testsuite.id}") 1240 1241 new_ztest_unit_test_regex = re.compile(r"z_ztest_unit_test__([^\s]+?)__([^\s]*)") 1242 detected_cases = [] 1243 1244 elf_file = self.instance.get_elf_file() 1245 with open(elf_file, "rb") as elf_fp: 1246 elf = ELFFile(elf_fp) 1247 1248 for section in elf.iter_sections(): 1249 if isinstance(section, SymbolTableSection): 1250 for sym in section.iter_symbols(): 1251 # It is only meant for new ztest fx 1252 # because only new ztest fx exposes test functions precisely. 1253 m_ = new_ztest_unit_test_regex.search(sym.name) 1254 if not m_: 1255 continue 1256 # Demangle C++ symbols 1257 m_ = new_ztest_unit_test_regex.search(self.demangle(sym.name)) 1258 if not m_: 1259 continue 1260 # The 1st capture group is new ztest suite name. 1261 # The 2nd capture group is new ztest unit test name. 1262 new_ztest_suite = m_[1] 1263 if self.trace and \ 1264 new_ztest_suite not in self.instance.testsuite.ztest_suite_names: 1265 # This can happen if a ZTEST_SUITE name is macro-generated 1266 # in the test source files, e.g. based on DT information. 1267 logger.debug( 1268 f"Unexpected Ztest suite '{new_ztest_suite}' is " 1269 f"not present in: {self.instance.testsuite.ztest_suite_names}" 1270 ) 1271 test_func_name = m_[2].replace("test_", "", 1) 1272 testcase_id = self.instance.compose_case_name( 1273 f"{new_ztest_suite}.{test_func_name}" 1274 ) 1275 detected_cases.append(testcase_id) 1276 1277 logger.debug( 1278 f"Test instance {self.instance.name} already has {len(self.instance.testcases)} " 1279 f"testcase(s) known: {self.instance.testcases}" 1280 ) 1281 if detected_cases: 1282 logger.debug(f"Detected {len(detected_cases)} Ztest case(s): " 1283 f"[{', '.join(detected_cases)}] in {elf_file}") 1284 tc_keeper = { 1285 tc.name: {'status': tc.status, 'reason': tc.reason} 1286 for tc in self.instance.testcases 1287 } 1288 self.instance.testcases.clear() 1289 self.instance.testsuite.testcases.clear() 1290 1291 for testcase_id in detected_cases: 1292 testcase = self.instance.add_testcase(name=testcase_id) 1293 self.instance.testsuite.add_testcase(name=testcase_id) 1294 1295 # Keep previous statuses and reasons 1296 tc_info = tc_keeper.get(testcase_id, {}) 1297 if not tc_info and self.trace: 1298 # Also happens when Ztest uses macroses, eg. DEFINE_TEST_VARIANT 1299 logger.debug(f"Ztest case '{testcase_id}' discovered for " 1300 f"'{self.instance.testsuite.source_dir_rel}' " 1301 f"with {list(tc_keeper)}") 1302 testcase.status = tc_info.get('status', TwisterStatus.NONE) 1303 testcase.reason = tc_info.get('reason') 1304 1305 1306 def cleanup_artifacts(self, additional_keep: list[str] = None): 1307 if additional_keep is None: 1308 additional_keep = [] 1309 logger.debug(f"Cleaning up {self.instance.build_dir}") 1310 allow = [ 1311 os.path.join('zephyr', '.config'), 1312 'handler.log', 1313 'handler_stderr.log', 1314 'build.log', 1315 'device.log', 1316 'recording.csv', 1317 'rom.json', 1318 'ram.json', 1319 'build_info.yml', 1320 'zephyr/zephyr.dts', 1321 # below ones are needed to make --test-only work as well 1322 'Makefile', 1323 'CMakeCache.txt', 1324 'build.ninja', 1325 os.path.join('CMakeFiles', 'rules.ninja') 1326 ] 1327 1328 allow += additional_keep 1329 1330 if self.options.runtime_artifact_cleanup == 'all': 1331 allow += [os.path.join('twister', 'testsuite_extra.conf')] 1332 1333 allow = [os.path.join(self.instance.build_dir, file) for file in allow] 1334 1335 for dirpath, dirnames, filenames in os.walk(self.instance.build_dir, topdown=False): 1336 for name in filenames: 1337 path = os.path.join(dirpath, name) 1338 if path not in allow: 1339 os.remove(path) 1340 # Remove empty directories and symbolic links to directories 1341 for dir in dirnames: 1342 path = os.path.join(dirpath, dir) 1343 if os.path.islink(path): 1344 os.remove(path) 1345 elif not os.listdir(path): 1346 os.rmdir(path) 1347 1348 def cleanup_device_testing_artifacts(self): 1349 logger.debug(f"Cleaning up for Device Testing {self.instance.build_dir}") 1350 1351 files_to_keep = self._get_binaries() 1352 files_to_keep.append(os.path.join('zephyr', 'runners.yaml')) 1353 1354 if self.instance.sysbuild: 1355 files_to_keep.append('domains.yaml') 1356 for domain in self.instance.domains.get_domains(): 1357 files_to_keep += self._get_artifact_allow_list_for_domain(domain.name) 1358 1359 self.cleanup_artifacts(files_to_keep) 1360 1361 self._sanitize_files() 1362 1363 def _get_artifact_allow_list_for_domain(self, domain: str) -> list[str]: 1364 """ 1365 Return a list of files needed to test a given domain. 1366 """ 1367 allow = [ 1368 os.path.join(domain, 'build.ninja'), 1369 os.path.join(domain, 'CMakeCache.txt'), 1370 os.path.join(domain, 'CMakeFiles', 'rules.ninja'), 1371 os.path.join(domain, 'Makefile'), 1372 os.path.join(domain, 'zephyr', '.config'), 1373 os.path.join(domain, 'zephyr', 'runners.yaml') 1374 ] 1375 return allow 1376 1377 def _get_binaries(self) -> list[str]: 1378 """ 1379 Get list of binaries paths (absolute or relative to the 1380 self.instance.build_dir), basing on information from platform.binaries 1381 or runners.yaml. If they are not found take default binaries like 1382 "zephyr/zephyr.hex" etc. 1383 """ 1384 binaries: list[str] = [] 1385 1386 platform = self.instance.platform 1387 if platform.binaries: 1388 for binary in platform.binaries: 1389 binaries.append(os.path.join('zephyr', binary)) 1390 1391 # Get binaries for a single-domain build 1392 binaries += self._get_binaries_from_runners() 1393 # Get binaries in the case of a multiple-domain build 1394 if self.instance.sysbuild: 1395 for domain in self.instance.domains.get_domains(): 1396 binaries += self._get_binaries_from_runners(domain.name) 1397 1398 # if binaries was not found in platform.binaries and runners.yaml take default ones 1399 if len(binaries) == 0: 1400 binaries = [ 1401 os.path.join('zephyr', 'zephyr.hex'), 1402 os.path.join('zephyr', 'zephyr.bin'), 1403 os.path.join('zephyr', 'zephyr.elf'), 1404 os.path.join('zephyr', 'zephyr.exe'), 1405 ] 1406 return binaries 1407 1408 def _get_binaries_from_runners(self, domain='') -> list[str]: 1409 """ 1410 Get list of binaries paths (absolute or relative to the 1411 self.instance.build_dir) from runners.yaml file. May be used for 1412 multiple-domain builds by passing in one domain at a time. 1413 """ 1414 1415 runners_file_path: str = os.path.join(self.instance.build_dir, 1416 domain, 'zephyr', 'runners.yaml') 1417 if not os.path.exists(runners_file_path): 1418 return [] 1419 1420 with open(runners_file_path) as file: 1421 runners_content: dict = yaml.load(file, Loader=SafeLoader) 1422 1423 if 'config' not in runners_content: 1424 return [] 1425 1426 runners_config: dict = runners_content['config'] 1427 binary_keys: list[str] = ['elf_file', 'hex_file', 'bin_file'] 1428 1429 binaries: list[str] = [] 1430 for binary_key in binary_keys: 1431 binary_path = runners_config.get(binary_key) 1432 if binary_path is None: 1433 continue 1434 if os.path.isabs(binary_path): 1435 binaries.append(binary_path) 1436 else: 1437 binaries.append(os.path.join(domain, 'zephyr', binary_path)) 1438 1439 return binaries 1440 1441 def _sanitize_files(self): 1442 """ 1443 Sanitize files to make it possible to flash those file on different 1444 computer/system. 1445 """ 1446 self._sanitize_runners_file() 1447 self._sanitize_zephyr_base_from_files() 1448 1449 def _sanitize_runners_file(self): 1450 """ 1451 Replace absolute paths of binary files for relative ones. The base 1452 directory for those files is f"{self.instance.build_dir}/zephyr" 1453 """ 1454 runners_dir_path: str = os.path.join(self.instance.build_dir, 'zephyr') 1455 runners_file_path: str = os.path.join(runners_dir_path, 'runners.yaml') 1456 if not os.path.exists(runners_file_path): 1457 return 1458 1459 with open(runners_file_path) as file: 1460 runners_content_text = file.read() 1461 runners_content_yaml: dict = yaml.load(runners_content_text, Loader=SafeLoader) 1462 1463 if 'config' not in runners_content_yaml: 1464 return 1465 1466 runners_config: dict = runners_content_yaml['config'] 1467 binary_keys: list[str] = ['elf_file', 'hex_file', 'bin_file'] 1468 1469 for binary_key in binary_keys: 1470 binary_path = runners_config.get(binary_key) 1471 # sanitize only paths which exist and are absolute 1472 if binary_path is None or not os.path.isabs(binary_path): 1473 continue 1474 binary_path_relative = os.path.relpath(binary_path, start=runners_dir_path) 1475 runners_content_text = runners_content_text.replace(binary_path, binary_path_relative) 1476 1477 with open(runners_file_path, 'w') as file: 1478 file.write(runners_content_text) 1479 1480 def _sanitize_zephyr_base_from_files(self): 1481 """ 1482 Remove Zephyr base paths from selected files. 1483 """ 1484 files_to_sanitize = [ 1485 'CMakeCache.txt', 1486 os.path.join('zephyr', 'runners.yaml'), 1487 ] 1488 for file_path in files_to_sanitize: 1489 file_path = os.path.join(self.instance.build_dir, file_path) 1490 if not os.path.exists(file_path): 1491 continue 1492 1493 with open(file_path) as file: 1494 data = file.read() 1495 1496 # add trailing slash at the end of canonical_zephyr_base if it does not exist: 1497 path_to_remove = os.path.join(canonical_zephyr_base, "") 1498 data = data.replace(path_to_remove, "") 1499 1500 with open(file_path, 'w') as file: 1501 file.write(data) 1502 1503 @staticmethod 1504 def _add_instance_testcases_to_status_counts(instance, results, decrement=False): 1505 increment_value = -1 if decrement else 1 1506 for tc in instance.testcases: 1507 match tc.status: 1508 case TwisterStatus.PASS: 1509 results.passed_cases_increment(increment_value) 1510 case TwisterStatus.NOTRUN: 1511 results.notrun_cases_increment(increment_value) 1512 case TwisterStatus.BLOCK: 1513 results.blocked_cases_increment(increment_value) 1514 case TwisterStatus.SKIP: 1515 results.skipped_cases_increment(increment_value) 1516 case TwisterStatus.FILTER: 1517 results.filtered_cases_increment(increment_value) 1518 case TwisterStatus.ERROR: 1519 results.error_cases_increment(increment_value) 1520 case TwisterStatus.FAIL: 1521 results.failed_cases_increment(increment_value) 1522 # Statuses that should not appear. 1523 # Crashing Twister at this point would be counterproductive, 1524 # but having those statuses in this part of processing is an error. 1525 case TwisterStatus.NONE: 1526 results.none_cases_increment(increment_value) 1527 logger.warning(f'A None status detected in instance {instance.name},' 1528 f' test case {tc.name}.') 1529 results.warnings_increment(1) 1530 case TwisterStatus.STARTED: 1531 results.started_cases_increment(increment_value) 1532 logger.warning(f'A started status detected in instance {instance.name},' 1533 f' test case {tc.name}.') 1534 results.warnings_increment(1) 1535 case _: 1536 logger.warning( 1537 f'An unknown status "{tc.status}" detected in instance {instance.name},' 1538 f' test case {tc.name}.' 1539 ) 1540 results.warnings_increment(1) 1541 1542 1543 def report_out(self, results): 1544 total_to_do = results.total - results.filtered_static 1545 total_tests_width = len(str(total_to_do)) 1546 results.done_increment() 1547 instance = self.instance 1548 if results.iteration == 1: 1549 results.cases_increment(len(instance.testcases)) 1550 1551 self._add_instance_testcases_to_status_counts(instance, results) 1552 1553 status = ( 1554 f'{TwisterStatus.get_color(instance.status)}{str.upper(instance.status)}{Fore.RESET}' 1555 ) 1556 1557 if instance.status in [TwisterStatus.ERROR, TwisterStatus.FAIL]: 1558 if instance.status == TwisterStatus.ERROR: 1559 results.error_increment() 1560 else: 1561 results.failed_increment() 1562 if self.options.verbose: 1563 status += " " + instance.reason 1564 else: 1565 logger.error( 1566 f"{instance.platform.name:<25} {instance.testsuite.name:<50}" 1567 f" {status}: {instance.reason}" 1568 ) 1569 if not self.options.verbose: 1570 self.log_info_file(self.options.inline_logs) 1571 elif instance.status == TwisterStatus.SKIP: 1572 results.skipped_increment() 1573 elif instance.status == TwisterStatus.FILTER: 1574 results.filtered_configs_increment() 1575 elif instance.status == TwisterStatus.PASS: 1576 results.passed_increment() 1577 elif instance.status == TwisterStatus.NOTRUN: 1578 results.notrun_increment() 1579 else: 1580 logger.debug(f"Unknown status = {instance.status}") 1581 status = Fore.YELLOW + "UNKNOWN" + Fore.RESET 1582 1583 if self.options.verbose: 1584 if self.options.cmake_only: 1585 more_info = "cmake" 1586 elif instance.status in [TwisterStatus.SKIP, TwisterStatus.FILTER]: 1587 more_info = instance.reason 1588 else: 1589 if instance.handler.ready and instance.run: 1590 more_info = instance.handler.type_str 1591 htime = instance.execution_time 1592 if instance.dut: 1593 more_info += f": {instance.dut}," 1594 if htime: 1595 more_info += f" {htime:.3f}s" 1596 else: 1597 more_info = "build" 1598 1599 if ( instance.status in [TwisterStatus.ERROR, TwisterStatus.FAIL] 1600 and hasattr(self.instance.handler, 'seed') 1601 and self.instance.handler.seed is not None ): 1602 more_info += "/seed: " + str(self.options.seed) 1603 if instance.toolchain: 1604 more_info += f" <{instance.toolchain}>" 1605 logger.info( 1606 f"{results.done - results.filtered_static:>{total_tests_width}}/{total_to_do}" 1607 f" {instance.platform.name:<25} {instance.testsuite.name:<50}" 1608 f" {status} ({more_info})" 1609 ) 1610 1611 if self.options.verbose > 1: 1612 for tc in self.instance.testcases: 1613 color = TwisterStatus.get_color(tc.status) 1614 logger.info(f' {" ":<{total_tests_width+25+4}} {tc.name:<75} ' 1615 f'{color}{str.upper(tc.status.value):<12}{Fore.RESET}' 1616 f'{" " + tc.reason if tc.reason else ""}') 1617 1618 if instance.status in [TwisterStatus.ERROR, TwisterStatus.FAIL]: 1619 self.log_info_file(self.options.inline_logs) 1620 else: 1621 completed_perc = 0 1622 if total_to_do > 0: 1623 completed_perc = int( 1624 (float(results.done - results.filtered_static) / total_to_do) * 100 1625 ) 1626 1627 unfiltered = results.done - results.filtered_static 1628 complete_section = ( 1629 f"{TwisterStatus.get_color(TwisterStatus.PASS)}" 1630 f"{unfiltered:>4}/{total_to_do:>4}" 1631 f"{Fore.RESET} {completed_perc:>2}%" 1632 ) 1633 notrun_section = ( 1634 f"{TwisterStatus.get_color(TwisterStatus.NOTRUN)}{results.notrun:>4}{Fore.RESET}" 1635 ) 1636 filtered_section_color = ( 1637 TwisterStatus.get_color(TwisterStatus.SKIP) 1638 if results.filtered_configs > 0 1639 else Fore.RESET 1640 ) 1641 filtered_section = ( 1642 f"{filtered_section_color}{results.filtered_configs:>4}{Fore.RESET}" 1643 ) 1644 failed_section_color = ( 1645 TwisterStatus.get_color(TwisterStatus.FAIL) if results.failed > 0 else Fore.RESET 1646 ) 1647 failed_section = ( 1648 f"{failed_section_color}{results.failed:>4}{Fore.RESET}" 1649 ) 1650 error_section_color = ( 1651 TwisterStatus.get_color(TwisterStatus.ERROR) if results.error > 0 else Fore.RESET 1652 ) 1653 error_section = ( 1654 f"{error_section_color}{results.error:>4}{Fore.RESET}" 1655 ) 1656 sys.stdout.write( 1657 f"INFO - Total complete: {complete_section}" 1658 f" built (not run): {notrun_section}," 1659 f" filtered: {filtered_section}," 1660 f" failed: {failed_section}," 1661 f" error: {error_section}\r" 1662 ) 1663 sys.stdout.flush() 1664 1665 @staticmethod 1666 def cmake_assemble_args(extra_args, handler, extra_conf_files, extra_overlay_confs, 1667 extra_dtc_overlay_files, cmake_extra_args, 1668 build_dir): 1669 # Retain quotes around config options 1670 config_options = [arg for arg in extra_args if arg.startswith("CONFIG_")] 1671 args = [arg for arg in extra_args if not arg.startswith("CONFIG_")] 1672 1673 args_expanded = ["-D{}".format(a.replace('"', '\"')) for a in config_options] 1674 1675 if handler.ready: 1676 args.extend(handler.args) 1677 1678 if extra_conf_files: 1679 args.append(f"CONF_FILE=\"{';'.join(extra_conf_files)}\"") 1680 1681 if extra_dtc_overlay_files: 1682 args.append(f"DTC_OVERLAY_FILE=\"{';'.join(extra_dtc_overlay_files)}\"") 1683 1684 # merge overlay files into one variable 1685 overlays = extra_overlay_confs.copy() 1686 1687 additional_overlay_path = os.path.join( 1688 build_dir, "twister", "testsuite_extra.conf" 1689 ) 1690 if os.path.exists(additional_overlay_path): 1691 overlays.append(additional_overlay_path) 1692 1693 if overlays: 1694 args.append(f"OVERLAY_CONFIG=\"{' '.join(overlays)}\"") 1695 1696 # Build the final argument list 1697 args_expanded.extend(["-D{}".format(a.replace('"', '\"')) for a in cmake_extra_args]) 1698 args_expanded.extend(["-D{}".format(a.replace('"', '')) for a in args]) 1699 1700 return args_expanded 1701 1702 def cmake(self, filter_stages=None): 1703 if filter_stages is None: 1704 filter_stages = [] 1705 args = [] 1706 for va in self.testsuite.extra_args.copy(): 1707 cond_args = va.split(":") 1708 if cond_args[0] == "arch" and len(cond_args) == 3: 1709 if self.instance.platform.arch == cond_args[1]: 1710 args.append(cond_args[2]) 1711 elif cond_args[0] == "platform" and len(cond_args) == 3: 1712 if self.instance.platform.name == cond_args[1]: 1713 args.append(cond_args[2]) 1714 elif cond_args[0] == "simulation" and len(cond_args) == 3: 1715 if self.instance.platform.simulation == cond_args[1]: 1716 args.append(cond_args[2]) 1717 else: 1718 if cond_args[0] in ["arch", "platform", "simulation"]: 1719 logger.warning(f"Unexpected extra_args: {va}") 1720 args.append(va) 1721 1722 1723 args = self.cmake_assemble_args( 1724 args, 1725 self.instance.handler, 1726 self.testsuite.extra_conf_files, 1727 self.testsuite.extra_overlay_confs, 1728 self.testsuite.extra_dtc_overlay_files, 1729 self.options.extra_args, # CMake extra args 1730 self.instance.build_dir, 1731 ) 1732 return self.run_cmake(args,filter_stages) 1733 1734 def build(self): 1735 harness = HarnessImporter.get_harness(self.instance.testsuite.harness.capitalize()) 1736 build_result = self.run_build(['--build', self.build_dir]) 1737 try: 1738 if harness: 1739 harness.instance = self.instance 1740 harness.build() 1741 except ConfigurationError as error: 1742 self.instance.status = TwisterStatus.ERROR 1743 self.instance.reason = str(error) 1744 logger.error(self.instance.reason) 1745 return 1746 return build_result 1747 1748 def run(self): 1749 1750 instance = self.instance 1751 1752 if instance.handler.ready: 1753 logger.debug(f"Reset instance status from '{instance.status}' to None before run.") 1754 instance.status = TwisterStatus.NONE 1755 1756 if instance.handler.type_str == "device": 1757 instance.handler.duts = self.duts 1758 1759 if(self.options.seed is not None and instance.platform.name.startswith("native_")): 1760 self.parse_generated() 1761 if('CONFIG_FAKE_ENTROPY_NATIVE_SIM' in self.defconfig and 1762 self.defconfig['CONFIG_FAKE_ENTROPY_NATIVE_SIM'] == 'y'): 1763 instance.handler.seed = self.options.seed 1764 1765 if self.options.extra_test_args and instance.platform.arch == "posix": 1766 instance.handler.extra_test_args = self.options.extra_test_args 1767 1768 harness = HarnessImporter.get_harness(instance.testsuite.harness.capitalize()) 1769 try: 1770 harness.configure(instance) 1771 except ConfigurationError as error: 1772 instance.status = TwisterStatus.ERROR 1773 instance.reason = str(error) 1774 logger.error(instance.reason) 1775 return 1776 # 1777 if isinstance(harness, Pytest): 1778 harness.pytest_run(instance.handler.get_test_timeout()) 1779 elif isinstance(harness, Ctest): 1780 harness.ctest_run(instance.handler.get_test_timeout()) 1781 else: 1782 instance.handler.handle(harness) 1783 1784 sys.stdout.flush() 1785 1786 def gather_metrics(self, instance: TestInstance): 1787 build_result = {"returncode": 0} 1788 if self.options.create_rom_ram_report: 1789 build_result = self.run_build(['--build', self.build_dir, "--target", "footprint"]) 1790 if self.options.enable_size_report and not self.options.cmake_only: 1791 self.calc_size(instance=instance, from_buildlog=self.options.footprint_from_buildlog) 1792 else: 1793 instance.metrics["used_ram"] = 0 1794 instance.metrics["used_rom"] = 0 1795 instance.metrics["available_rom"] = 0 1796 instance.metrics["available_ram"] = 0 1797 instance.metrics["unrecognized"] = [] 1798 return build_result 1799 1800 @staticmethod 1801 def calc_size(instance: TestInstance, from_buildlog: bool): 1802 if instance.status not in [TwisterStatus.ERROR, TwisterStatus.FAIL, TwisterStatus.SKIP]: 1803 if instance.platform.type not in ["native", "qemu", "unit"]: 1804 generate_warning = bool(instance.platform.type == "mcu") 1805 size_calc = instance.calculate_sizes( 1806 from_buildlog=from_buildlog, 1807 generate_warning=generate_warning 1808 ) 1809 instance.metrics["used_ram"] = size_calc.get_used_ram() 1810 instance.metrics["used_rom"] = size_calc.get_used_rom() 1811 instance.metrics["available_rom"] = size_calc.get_available_rom() 1812 instance.metrics["available_ram"] = size_calc.get_available_ram() 1813 instance.metrics["unrecognized"] = size_calc.unrecognized_sections() 1814 else: 1815 instance.metrics["used_ram"] = 0 1816 instance.metrics["used_rom"] = 0 1817 instance.metrics["available_rom"] = 0 1818 instance.metrics["available_ram"] = 0 1819 instance.metrics["unrecognized"] = [] 1820 instance.metrics["handler_time"] = instance.execution_time 1821 1822class TwisterRunner: 1823 1824 def __init__(self, instances, suites, env=None) -> None: 1825 self.pipeline = None 1826 self.options = env.options 1827 self.env = env 1828 self.instances = instances 1829 self.suites = suites 1830 self.duts = None 1831 self.jobs = 1 1832 self.results = None 1833 self.jobserver = None 1834 1835 def run(self): 1836 1837 retries = self.options.retry_failed + 1 1838 1839 BaseManager.register('LifoQueue', queue.LifoQueue) 1840 manager = BaseManager() 1841 manager.start() 1842 1843 self.results = ExecutionCounter(total=len(self.instances)) 1844 self.iteration = 0 1845 pipeline = manager.LifoQueue() 1846 done_queue = manager.LifoQueue() 1847 1848 # Set number of jobs 1849 if self.options.jobs: 1850 self.jobs = self.options.jobs 1851 elif self.options.build_only: 1852 self.jobs = multiprocessing.cpu_count() * 2 1853 else: 1854 self.jobs = multiprocessing.cpu_count() 1855 1856 if sys.platform == "linux": 1857 if os.name == 'posix': 1858 self.jobserver = GNUMakeJobClient.from_environ(jobs=self.options.jobs) 1859 if not self.jobserver: 1860 self.jobserver = GNUMakeJobServer(self.jobs) 1861 elif self.jobserver.jobs: 1862 self.jobs = self.jobserver.jobs 1863 # TODO: Implement this on windows/mac also 1864 else: 1865 self.jobserver = JobClient() 1866 1867 logger.info(f"JOBS: {self.jobs}") 1868 1869 self.update_counting_before_pipeline() 1870 1871 while True: 1872 self.results.iteration_increment() 1873 1874 if self.results.iteration > 1: 1875 logger.info(f"{self.results.iteration} Iteration:") 1876 time.sleep(self.options.retry_interval) # waiting for the system to settle down 1877 self.results.done = self.results.total - self.results.failed 1878 self.results.failed = 0 1879 if self.options.retry_build_errors: 1880 self.results.done -= self.results.error 1881 self.results.error = 0 1882 else: 1883 self.results.done = self.results.filtered_static 1884 1885 self.execute(pipeline, done_queue) 1886 1887 while True: 1888 try: 1889 inst = done_queue.get_nowait() 1890 except queue.Empty: 1891 break 1892 else: 1893 inst.metrics.update(self.instances[inst.name].metrics) 1894 inst.metrics["handler_time"] = inst.execution_time 1895 inst.metrics["unrecognized"] = [] 1896 self.instances[inst.name] = inst 1897 1898 print("") 1899 1900 retry_errors = False 1901 if self.results.error and self.options.retry_build_errors: 1902 retry_errors = True 1903 1904 retries = retries - 1 1905 if retries == 0 or ( self.results.failed == 0 and not retry_errors): 1906 break 1907 1908 self.show_brief() 1909 1910 def update_counting_before_pipeline(self): 1911 ''' 1912 Updating counting before pipeline is necessary because statically filterd 1913 test instance never enter the pipeline. While some pipeline output needs 1914 the static filter stats. So need to prepare them before pipline starts. 1915 ''' 1916 for instance in self.instances.values(): 1917 if instance.status == TwisterStatus.FILTER and instance.reason != 'runtime filter': 1918 self.results.filtered_static_increment() 1919 self.results.filtered_configs_increment() 1920 self.results.filtered_cases_increment(len(instance.testsuite.testcases)) 1921 self.results.cases_increment(len(instance.testsuite.testcases)) 1922 elif instance.status == TwisterStatus.ERROR: 1923 self.results.error_increment() 1924 1925 def show_brief(self): 1926 logger.info( 1927 f"{len(self.suites)} test scenarios ({len(self.instances)} configurations) selected," 1928 f" {self.results.filtered_configs} configurations filtered" 1929 f" ({self.results.filtered_static} by static filter," 1930 f" {self.results.filtered_configs - self.results.filtered_static} at runtime)." 1931 ) 1932 1933 def add_tasks_to_queue( 1934 self, 1935 pipeline, 1936 build_only=False, 1937 test_only=False, 1938 retry_build_errors=False 1939 ): 1940 for instance in self.instances.values(): 1941 if build_only: 1942 instance.run = False 1943 1944 no_retry_statuses = [ 1945 TwisterStatus.PASS, 1946 TwisterStatus.SKIP, 1947 TwisterStatus.FILTER, 1948 TwisterStatus.NOTRUN 1949 ] 1950 if not retry_build_errors: 1951 no_retry_statuses.append(TwisterStatus.ERROR) 1952 1953 if instance.status not in no_retry_statuses: 1954 logger.debug(f"adding {instance.name}") 1955 if instance.status != TwisterStatus.NONE: 1956 instance.retries += 1 1957 instance.status = TwisterStatus.NONE 1958 # Previous states should be removed from the stats 1959 if self.results.iteration > 1: 1960 ProjectBuilder._add_instance_testcases_to_status_counts( 1961 instance, 1962 self.results, 1963 decrement=True 1964 ) 1965 1966 # Check if cmake package_helper script can be run in advance. 1967 instance.filter_stages = [] 1968 if instance.testsuite.filter: 1969 instance.filter_stages = self.get_cmake_filter_stages( 1970 instance.testsuite.filter, 1971 expr_parser.reserved.keys() 1972 ) 1973 1974 if test_only and instance.run: 1975 pipeline.put({"op": "run", "test": instance}) 1976 elif instance.filter_stages and "full" not in instance.filter_stages: 1977 pipeline.put({"op": "filter", "test": instance}) 1978 else: 1979 cache_file = os.path.join(instance.build_dir, "CMakeCache.txt") 1980 if os.path.exists(cache_file) and self.env.options.aggressive_no_clean: 1981 pipeline.put({"op": "build", "test": instance}) 1982 else: 1983 pipeline.put({"op": "cmake", "test": instance}) 1984 1985 1986 def pipeline_mgr(self, pipeline, done_queue, lock, results): 1987 try: 1988 if sys.platform == 'linux': 1989 with self.jobserver.get_job(): 1990 while True: 1991 try: 1992 task = pipeline.get_nowait() 1993 except queue.Empty: 1994 break 1995 else: 1996 instance = task['test'] 1997 pb = ProjectBuilder(instance, self.env, self.jobserver) 1998 pb.duts = self.duts 1999 pb.process(pipeline, done_queue, task, lock, results) 2000 if self.env.options.quit_on_failure and \ 2001 pb.instance.status in [TwisterStatus.FAIL, TwisterStatus.ERROR]: 2002 with pipeline.mutex: 2003 pipeline.queue.clear() 2004 break 2005 2006 return True 2007 else: 2008 while True: 2009 try: 2010 task = pipeline.get_nowait() 2011 except queue.Empty: 2012 break 2013 else: 2014 instance = task['test'] 2015 pb = ProjectBuilder(instance, self.env, self.jobserver) 2016 pb.duts = self.duts 2017 pb.process(pipeline, done_queue, task, lock, results) 2018 if self.env.options.quit_on_failure and \ 2019 pb.instance.status in [TwisterStatus.FAIL, TwisterStatus.ERROR]: 2020 with pipeline.mutex: 2021 pipeline.queue.clear() 2022 break 2023 return True 2024 except Exception as e: 2025 logger.error(f"General exception: {e}\n{traceback.format_exc()}") 2026 sys.exit(1) 2027 2028 def execute(self, pipeline, done): 2029 lock = Lock() 2030 logger.info("Adding tasks to the queue...") 2031 self.add_tasks_to_queue(pipeline, self.options.build_only, self.options.test_only, 2032 retry_build_errors=self.options.retry_build_errors) 2033 logger.info("Added initial list of jobs to queue") 2034 2035 processes = [] 2036 2037 for _ in range(self.jobs): 2038 p = Process(target=self.pipeline_mgr, args=(pipeline, done, lock, self.results, )) 2039 processes.append(p) 2040 p.start() 2041 logger.debug(f"Launched {self.jobs} jobs") 2042 2043 try: 2044 for p in processes: 2045 p.join() 2046 if p.exitcode != 0: 2047 logger.error(f"Process {p.pid} failed, aborting execution") 2048 for proc in processes: 2049 proc.terminate() 2050 sys.exit(1) 2051 except KeyboardInterrupt: 2052 logger.info("Execution interrupted") 2053 for p in processes: 2054 p.terminate() 2055 2056 @staticmethod 2057 def get_cmake_filter_stages(filt, logic_keys): 2058 """Analyze filter expressions from test yaml 2059 and decide if dts and/or kconfig based filtering will be needed. 2060 """ 2061 dts_required = False 2062 kconfig_required = False 2063 full_required = False 2064 filter_stages = [] 2065 2066 # Compress args in expressions like "function('x', 'y')" 2067 # so they are not split when splitting by whitespaces 2068 filt = filt.replace(", ", ",") 2069 # Remove logic words 2070 for k in logic_keys: 2071 filt = filt.replace(f"{k} ", "") 2072 # Remove brackets 2073 filt = filt.replace("(", "") 2074 filt = filt.replace(")", "") 2075 # Splite by whitespaces 2076 filt = filt.split() 2077 for expression in filt: 2078 if expression.startswith("dt_"): 2079 dts_required = True 2080 elif expression.startswith("CONFIG"): 2081 kconfig_required = True 2082 else: 2083 full_required = True 2084 2085 if full_required: 2086 return ["full"] 2087 if dts_required: 2088 filter_stages.append("dts") 2089 if kconfig_required: 2090 filter_stages.append("kconfig") 2091 2092 return filter_stages 2093