1# vim: set syntax=python ts=4 : 2# 3# Copyright (c) 20180-2022 Intel Corporation 4# Copyright 2022 NXP 5# SPDX-License-Identifier: Apache-2.0 6 7import logging 8import multiprocessing 9import os 10import pickle 11import queue 12import re 13import shutil 14import subprocess 15import sys 16import time 17import traceback 18import yaml 19from multiprocessing import Lock, Process, Value 20from multiprocessing.managers import BaseManager 21from typing import List 22from packaging import version 23 24from colorama import Fore 25from domains import Domains 26from twisterlib.cmakecache import CMakeCache 27from twisterlib.environment import canonical_zephyr_base 28from twisterlib.error import BuildError, ConfigurationError 29 30import elftools 31from elftools.elf.elffile import ELFFile 32from elftools.elf.sections import SymbolTableSection 33 34if version.parse(elftools.__version__) < version.parse('0.24'): 35 sys.exit("pyelftools is out of date, need version 0.24 or later") 36 37# Job server only works on Linux for now. 38if sys.platform == 'linux': 39 from twisterlib.jobserver import GNUMakeJobClient, GNUMakeJobServer, JobClient 40 41from twisterlib.log_helper import log_command 42from twisterlib.testinstance import TestInstance 43from twisterlib.environment import TwisterEnv 44from twisterlib.testsuite import TestSuite 45from twisterlib.platform import Platform 46from twisterlib.testplan import change_skip_to_error_if_integration 47from twisterlib.harness import HarnessImporter, Pytest 48 49logger = logging.getLogger('twister') 50logger.setLevel(logging.DEBUG) 51import expr_parser 52 53 54class ExecutionCounter(object): 55 def __init__(self, total=0): 56 ''' 57 Most of the stats are at test instance level 58 Except that "_cases" and "_skipped_cases" are for cases of ALL test instances 59 60 total complete = done + skipped_filter 61 total = yaml test scenarios * applicable platforms 62 complete perctenage = (done + skipped_filter) / total 63 pass rate = passed / (total - skipped_configs) 64 ''' 65 # instances that go through the pipeline 66 # updated by report_out() 67 self._done = Value('i', 0) 68 69 # iteration 70 self._iteration = Value('i', 0) 71 72 # instances that actually executed and passed 73 # updated by report_out() 74 self._passed = Value('i', 0) 75 76 # static filter + runtime filter + build skipped 77 # updated by update_counting_before_pipeline() and report_out() 78 self._skipped_configs = Value('i', 0) 79 80 # cmake filter + build skipped 81 # updated by report_out() 82 self._skipped_runtime = Value('i', 0) 83 84 # staic filtered at yaml parsing time 85 # updated by update_counting_before_pipeline() 86 self._skipped_filter = Value('i', 0) 87 88 # updated by update_counting_before_pipeline() and report_out() 89 self._skipped_cases = Value('i', 0) 90 91 # updated by report_out() in pipeline 92 self._error = Value('i', 0) 93 self._failed = Value('i', 0) 94 95 # initialized to number of test instances 96 self._total = Value('i', total) 97 98 # updated in report_out 99 self._cases = Value('i', 0) 100 self.lock = Lock() 101 102 def summary(self): 103 print("--------------------------------") 104 print(f"Total test suites: {self.total}") # actually test instances 105 print(f"Total test cases: {self.cases}") 106 print(f"Executed test cases: {self.cases - self.skipped_cases}") 107 print(f"Skipped test cases: {self.skipped_cases}") 108 print(f"Completed test suites: {self.done}") 109 print(f"Passing test suites: {self.passed}") 110 print(f"Failing test suites: {self.failed}") 111 print(f"Skipped test suites: {self.skipped_configs}") 112 print(f"Skipped test suites (runtime): {self.skipped_runtime}") 113 print(f"Skipped test suites (filter): {self.skipped_filter}") 114 print(f"Errors: {self.error}") 115 print("--------------------------------") 116 117 @property 118 def cases(self): 119 with self._cases.get_lock(): 120 return self._cases.value 121 122 @cases.setter 123 def cases(self, value): 124 with self._cases.get_lock(): 125 self._cases.value = value 126 127 @property 128 def skipped_cases(self): 129 with self._skipped_cases.get_lock(): 130 return self._skipped_cases.value 131 132 @skipped_cases.setter 133 def skipped_cases(self, value): 134 with self._skipped_cases.get_lock(): 135 self._skipped_cases.value = value 136 137 @property 138 def error(self): 139 with self._error.get_lock(): 140 return self._error.value 141 142 @error.setter 143 def error(self, value): 144 with self._error.get_lock(): 145 self._error.value = value 146 147 @property 148 def iteration(self): 149 with self._iteration.get_lock(): 150 return self._iteration.value 151 152 @iteration.setter 153 def iteration(self, value): 154 with self._iteration.get_lock(): 155 self._iteration.value = value 156 157 @property 158 def done(self): 159 with self._done.get_lock(): 160 return self._done.value 161 162 @done.setter 163 def done(self, value): 164 with self._done.get_lock(): 165 self._done.value = value 166 167 @property 168 def passed(self): 169 with self._passed.get_lock(): 170 return self._passed.value 171 172 @passed.setter 173 def passed(self, value): 174 with self._passed.get_lock(): 175 self._passed.value = value 176 177 @property 178 def skipped_configs(self): 179 with self._skipped_configs.get_lock(): 180 return self._skipped_configs.value 181 182 @skipped_configs.setter 183 def skipped_configs(self, value): 184 with self._skipped_configs.get_lock(): 185 self._skipped_configs.value = value 186 187 @property 188 def skipped_filter(self): 189 with self._skipped_filter.get_lock(): 190 return self._skipped_filter.value 191 192 @skipped_filter.setter 193 def skipped_filter(self, value): 194 with self._skipped_filter.get_lock(): 195 self._skipped_filter.value = value 196 197 @property 198 def skipped_runtime(self): 199 with self._skipped_runtime.get_lock(): 200 return self._skipped_runtime.value 201 202 @skipped_runtime.setter 203 def skipped_runtime(self, value): 204 with self._skipped_runtime.get_lock(): 205 self._skipped_runtime.value = value 206 207 @property 208 def failed(self): 209 with self._failed.get_lock(): 210 return self._failed.value 211 212 @failed.setter 213 def failed(self, value): 214 with self._failed.get_lock(): 215 self._failed.value = value 216 217 @property 218 def total(self): 219 with self._total.get_lock(): 220 return self._total.value 221 222class CMake: 223 config_re = re.compile('(CONFIG_[A-Za-z0-9_]+)[=]\"?([^\"]*)\"?$') 224 dt_re = re.compile('([A-Za-z0-9_]+)[=]\"?([^\"]*)\"?$') 225 226 def __init__(self, testsuite: TestSuite, platform: Platform, source_dir, build_dir, jobserver): 227 228 self.cwd = None 229 self.capture_output = True 230 231 self.defconfig = {} 232 self.cmake_cache = {} 233 234 self.instance = None 235 self.testsuite = testsuite 236 self.platform = platform 237 self.source_dir = source_dir 238 self.build_dir = build_dir 239 self.log = "build.log" 240 241 self.default_encoding = sys.getdefaultencoding() 242 self.jobserver = jobserver 243 244 def parse_generated(self, filter_stages=[]): 245 self.defconfig = {} 246 return {} 247 248 def run_build(self, args=[]): 249 250 logger.debug("Building %s for %s" % (self.source_dir, self.platform.name)) 251 252 cmake_args = [] 253 cmake_args.extend(args) 254 cmake = shutil.which('cmake') 255 cmd = [cmake] + cmake_args 256 kwargs = dict() 257 258 if self.capture_output: 259 kwargs['stdout'] = subprocess.PIPE 260 # CMake sends the output of message() to stderr unless it's STATUS 261 kwargs['stderr'] = subprocess.STDOUT 262 263 if self.cwd: 264 kwargs['cwd'] = self.cwd 265 266 start_time = time.time() 267 if sys.platform == 'linux': 268 p = self.jobserver.popen(cmd, **kwargs) 269 else: 270 p = subprocess.Popen(cmd, **kwargs) 271 logger.debug(f'Running {"".join(cmd)}') 272 273 out, _ = p.communicate() 274 275 ret = {} 276 duration = time.time() - start_time 277 self.instance.build_time += duration 278 if p.returncode == 0: 279 msg = f"Finished building {self.source_dir} for {self.platform.name} in {duration:.2f} seconds" 280 logger.debug(msg) 281 282 self.instance.status = "passed" 283 if not self.instance.run: 284 self.instance.add_missing_case_status("skipped", "Test was built only") 285 ret = {"returncode": p.returncode} 286 287 if out: 288 log_msg = out.decode(self.default_encoding) 289 with open(os.path.join(self.build_dir, self.log), "a", encoding=self.default_encoding) as log: 290 log.write(log_msg) 291 else: 292 return None 293 else: 294 # A real error occurred, raise an exception 295 log_msg = "" 296 if out: 297 log_msg = out.decode(self.default_encoding) 298 with open(os.path.join(self.build_dir, self.log), "a", encoding=self.default_encoding) as log: 299 log.write(log_msg) 300 301 if log_msg: 302 overflow_found = re.findall("region `(FLASH|ROM|RAM|ICCM|DCCM|SRAM|dram0_1_seg)' overflowed by", log_msg) 303 imgtool_overflow_found = re.findall(r"Error: Image size \(.*\) \+ trailer \(.*\) exceeds requested size", log_msg) 304 if overflow_found and not self.options.overflow_as_errors: 305 logger.debug("Test skipped due to {} Overflow".format(overflow_found[0])) 306 self.instance.status = "skipped" 307 self.instance.reason = "{} overflow".format(overflow_found[0]) 308 change_skip_to_error_if_integration(self.options, self.instance) 309 elif imgtool_overflow_found and not self.options.overflow_as_errors: 310 self.instance.status = "skipped" 311 self.instance.reason = "imgtool overflow" 312 change_skip_to_error_if_integration(self.options, self.instance) 313 else: 314 self.instance.status = "error" 315 self.instance.reason = "Build failure" 316 317 ret = { 318 "returncode": p.returncode 319 } 320 321 return ret 322 323 def run_cmake(self, args="", filter_stages=[]): 324 325 if not self.options.disable_warnings_as_errors: 326 warnings_as_errors = 'y' 327 gen_defines_args = "--edtlib-Werror" 328 else: 329 warnings_as_errors = 'n' 330 gen_defines_args = "" 331 332 logger.debug("Running cmake on %s for %s" % (self.source_dir, self.platform.name)) 333 cmake_args = [ 334 f'-B{self.build_dir}', 335 f'-DTC_RUNID={self.instance.run_id}', 336 f'-DCONFIG_COMPILER_WARNINGS_AS_ERRORS={warnings_as_errors}', 337 f'-DEXTRA_GEN_DEFINES_ARGS={gen_defines_args}', 338 f'-G{self.env.generator}' 339 ] 340 341 # If needed, run CMake using the package_helper script first, to only run 342 # a subset of all cmake modules. This output will be used to filter 343 # testcases, and the full CMake configuration will be run for 344 # testcases that should be built 345 if filter_stages: 346 cmake_filter_args = [ 347 f'-DMODULES={",".join(filter_stages)}', 348 f'-P{canonical_zephyr_base}/cmake/package_helper.cmake', 349 ] 350 351 if self.testsuite.sysbuild and not filter_stages: 352 logger.debug("Building %s using sysbuild" % (self.source_dir)) 353 source_args = [ 354 f'-S{canonical_zephyr_base}/share/sysbuild', 355 f'-DAPP_DIR={self.source_dir}' 356 ] 357 else: 358 source_args = [ 359 f'-S{self.source_dir}' 360 ] 361 cmake_args.extend(source_args) 362 363 cmake_args.extend(args) 364 365 cmake_opts = ['-DBOARD={}'.format(self.platform.name)] 366 cmake_args.extend(cmake_opts) 367 368 if self.instance.testsuite.required_snippets: 369 cmake_opts = ['-DSNIPPET={}'.format(';'.join(self.instance.testsuite.required_snippets))] 370 cmake_args.extend(cmake_opts) 371 372 cmake = shutil.which('cmake') 373 cmd = [cmake] + cmake_args 374 375 if filter_stages: 376 cmd += cmake_filter_args 377 378 kwargs = dict() 379 380 log_command(logger, "Calling cmake", cmd) 381 382 if self.capture_output: 383 kwargs['stdout'] = subprocess.PIPE 384 # CMake sends the output of message() to stderr unless it's STATUS 385 kwargs['stderr'] = subprocess.STDOUT 386 387 if self.cwd: 388 kwargs['cwd'] = self.cwd 389 390 start_time = time.time() 391 if sys.platform == 'linux': 392 p = self.jobserver.popen(cmd, **kwargs) 393 else: 394 p = subprocess.Popen(cmd, **kwargs) 395 out, _ = p.communicate() 396 397 duration = time.time() - start_time 398 self.instance.build_time += duration 399 400 if p.returncode == 0: 401 filter_results = self.parse_generated(filter_stages) 402 msg = f"Finished running cmake {self.source_dir} for {self.platform.name} in {duration:.2f} seconds" 403 logger.debug(msg) 404 ret = { 405 'returncode': p.returncode, 406 'filter': filter_results 407 } 408 else: 409 self.instance.status = "error" 410 self.instance.reason = "Cmake build failure" 411 412 for tc in self.instance.testcases: 413 tc.status = self.instance.status 414 415 logger.error("Cmake build failure: %s for %s" % (self.source_dir, self.platform.name)) 416 ret = {"returncode": p.returncode} 417 418 if out: 419 os.makedirs(self.build_dir, exist_ok=True) 420 with open(os.path.join(self.build_dir, self.log), "a", encoding=self.default_encoding) as log: 421 log_msg = out.decode(self.default_encoding) 422 log.write(log_msg) 423 424 return ret 425 426 427class FilterBuilder(CMake): 428 429 def __init__(self, testsuite: TestSuite, platform: Platform, source_dir, build_dir, jobserver): 430 super().__init__(testsuite, platform, source_dir, build_dir, jobserver) 431 432 self.log = "config-twister.log" 433 434 def parse_generated(self, filter_stages=[]): 435 436 if self.platform.name == "unit_testing": 437 return {} 438 439 if self.testsuite.sysbuild and not filter_stages: 440 # Load domain yaml to get default domain build directory 441 domain_path = os.path.join(self.build_dir, "domains.yaml") 442 domains = Domains.from_file(domain_path) 443 logger.debug("Loaded sysbuild domain data from %s" % (domain_path)) 444 self.instance.domains = domains 445 domain_build = domains.get_default_domain().build_dir 446 cmake_cache_path = os.path.join(domain_build, "CMakeCache.txt") 447 defconfig_path = os.path.join(domain_build, "zephyr", ".config") 448 edt_pickle = os.path.join(domain_build, "zephyr", "edt.pickle") 449 else: 450 cmake_cache_path = os.path.join(self.build_dir, "CMakeCache.txt") 451 # .config is only available after kconfig stage in cmake. If only dt based filtration is required 452 # package helper call won't produce .config 453 if not filter_stages or "kconfig" in filter_stages: 454 defconfig_path = os.path.join(self.build_dir, "zephyr", ".config") 455 # dt is compiled before kconfig, so edt_pickle is available regardless of choice of filter stages 456 edt_pickle = os.path.join(self.build_dir, "zephyr", "edt.pickle") 457 458 459 if not filter_stages or "kconfig" in filter_stages: 460 with open(defconfig_path, "r") as fp: 461 defconfig = {} 462 for line in fp.readlines(): 463 m = self.config_re.match(line) 464 if not m: 465 if line.strip() and not line.startswith("#"): 466 sys.stderr.write("Unrecognized line %s\n" % line) 467 continue 468 defconfig[m.group(1)] = m.group(2).strip() 469 470 self.defconfig = defconfig 471 472 cmake_conf = {} 473 try: 474 cache = CMakeCache.from_file(cmake_cache_path) 475 except FileNotFoundError: 476 cache = {} 477 478 for k in iter(cache): 479 cmake_conf[k.name] = k.value 480 481 self.cmake_cache = cmake_conf 482 483 filter_data = { 484 "ARCH": self.platform.arch, 485 "PLATFORM": self.platform.name 486 } 487 filter_data.update(os.environ) 488 if not filter_stages or "kconfig" in filter_stages: 489 filter_data.update(self.defconfig) 490 filter_data.update(self.cmake_cache) 491 492 if self.testsuite.sysbuild and self.env.options.device_testing: 493 # Verify that twister's arguments support sysbuild. 494 # Twister sysbuild flashing currently only works with west, so 495 # --west-flash must be passed. Additionally, erasing the DUT 496 # before each test with --west-flash=--erase will inherently not 497 # work with sysbuild. 498 if self.env.options.west_flash is None: 499 logger.warning("Sysbuild test will be skipped. " + 500 "West must be used for flashing.") 501 return {os.path.join(self.platform.name, self.testsuite.name): True} 502 elif "--erase" in self.env.options.west_flash: 503 logger.warning("Sysbuild test will be skipped, " + 504 "--erase is not supported with --west-flash") 505 return {os.path.join(self.platform.name, self.testsuite.name): True} 506 507 if self.testsuite and self.testsuite.filter: 508 try: 509 if os.path.exists(edt_pickle): 510 with open(edt_pickle, 'rb') as f: 511 edt = pickle.load(f) 512 else: 513 edt = None 514 ret = expr_parser.parse(self.testsuite.filter, filter_data, edt) 515 516 except (ValueError, SyntaxError) as se: 517 sys.stderr.write( 518 "Failed processing %s\n" % self.testsuite.yamlfile) 519 raise se 520 521 if not ret: 522 return {os.path.join(self.platform.name, self.testsuite.name): True} 523 else: 524 return {os.path.join(self.platform.name, self.testsuite.name): False} 525 else: 526 self.platform.filter_data = filter_data 527 return filter_data 528 529 530class ProjectBuilder(FilterBuilder): 531 532 def __init__(self, instance: TestInstance, env: TwisterEnv, jobserver, **kwargs): 533 super().__init__(instance.testsuite, instance.platform, instance.testsuite.source_dir, instance.build_dir, jobserver) 534 535 self.log = "build.log" 536 self.instance = instance 537 self.filtered_tests = 0 538 self.options = env.options 539 self.env = env 540 self.duts = None 541 542 def log_info(self, filename, inline_logs, log_testcases=False): 543 filename = os.path.abspath(os.path.realpath(filename)) 544 if inline_logs: 545 logger.info("{:-^100}".format(filename)) 546 547 try: 548 with open(filename) as fp: 549 data = fp.read() 550 except Exception as e: 551 data = "Unable to read log data (%s)\n" % (str(e)) 552 553 logger.error(data) 554 555 logger.info("{:-^100}".format(filename)) 556 557 if log_testcases: 558 for tc in self.instance.testcases: 559 if not tc.reason: 560 continue 561 logger.info( 562 f"\n{str(tc.name).center(100, '_')}\n" 563 f"{tc.reason}\n" 564 f"{100*'_'}\n" 565 f"{tc.output}" 566 ) 567 else: 568 logger.error("see: " + Fore.YELLOW + filename + Fore.RESET) 569 570 def log_info_file(self, inline_logs): 571 build_dir = self.instance.build_dir 572 h_log = "{}/handler.log".format(build_dir) 573 b_log = "{}/build.log".format(build_dir) 574 v_log = "{}/valgrind.log".format(build_dir) 575 d_log = "{}/device.log".format(build_dir) 576 pytest_log = "{}/twister_harness.log".format(build_dir) 577 578 if os.path.exists(v_log) and "Valgrind" in self.instance.reason: 579 self.log_info("{}".format(v_log), inline_logs) 580 elif os.path.exists(pytest_log) and os.path.getsize(pytest_log) > 0: 581 self.log_info("{}".format(pytest_log), inline_logs, log_testcases=True) 582 elif os.path.exists(h_log) and os.path.getsize(h_log) > 0: 583 self.log_info("{}".format(h_log), inline_logs) 584 elif os.path.exists(d_log) and os.path.getsize(d_log) > 0: 585 self.log_info("{}".format(d_log), inline_logs) 586 else: 587 self.log_info("{}".format(b_log), inline_logs) 588 589 590 def process(self, pipeline, done, message, lock, results): 591 op = message.get('op') 592 593 self.instance.setup_handler(self.env) 594 595 if op == "filter": 596 ret = self.cmake(filter_stages=self.instance.filter_stages) 597 if self.instance.status in ["failed", "error"]: 598 pipeline.put({"op": "report", "test": self.instance}) 599 else: 600 # Here we check the dt/kconfig filter results coming from running cmake 601 if self.instance.name in ret['filter'] and ret['filter'][self.instance.name]: 602 logger.debug("filtering %s" % self.instance.name) 603 self.instance.status = "filtered" 604 self.instance.reason = "runtime filter" 605 results.skipped_runtime += 1 606 self.instance.add_missing_case_status("skipped") 607 pipeline.put({"op": "report", "test": self.instance}) 608 else: 609 pipeline.put({"op": "cmake", "test": self.instance}) 610 611 # The build process, call cmake and build with configured generator 612 elif op == "cmake": 613 ret = self.cmake() 614 if self.instance.status in ["failed", "error"]: 615 pipeline.put({"op": "report", "test": self.instance}) 616 elif self.options.cmake_only: 617 if self.instance.status is None: 618 self.instance.status = "passed" 619 pipeline.put({"op": "report", "test": self.instance}) 620 else: 621 # Here we check the runtime filter results coming from running cmake 622 if self.instance.name in ret['filter'] and ret['filter'][self.instance.name]: 623 logger.debug("filtering %s" % self.instance.name) 624 self.instance.status = "filtered" 625 self.instance.reason = "runtime filter" 626 results.skipped_runtime += 1 627 self.instance.add_missing_case_status("skipped") 628 pipeline.put({"op": "report", "test": self.instance}) 629 else: 630 pipeline.put({"op": "build", "test": self.instance}) 631 632 elif op == "build": 633 logger.debug("build test: %s" % self.instance.name) 634 ret = self.build() 635 if not ret: 636 self.instance.status = "error" 637 self.instance.reason = "Build Failure" 638 pipeline.put({"op": "report", "test": self.instance}) 639 else: 640 # Count skipped cases during build, for example 641 # due to ram/rom overflow. 642 if self.instance.status == "skipped": 643 results.skipped_runtime += 1 644 self.instance.add_missing_case_status("skipped", self.instance.reason) 645 646 if ret.get('returncode', 1) > 0: 647 self.instance.add_missing_case_status("blocked", self.instance.reason) 648 pipeline.put({"op": "report", "test": self.instance}) 649 else: 650 logger.debug(f"Determine test cases for test instance: {self.instance.name}") 651 try: 652 self.determine_testcases(results) 653 pipeline.put({"op": "gather_metrics", "test": self.instance}) 654 except BuildError as e: 655 logger.error(str(e)) 656 self.instance.status = "error" 657 self.instance.reason = str(e) 658 pipeline.put({"op": "report", "test": self.instance}) 659 660 elif op == "gather_metrics": 661 self.gather_metrics(self.instance) 662 if self.instance.run and self.instance.handler.ready: 663 pipeline.put({"op": "run", "test": self.instance}) 664 else: 665 pipeline.put({"op": "report", "test": self.instance}) 666 667 # Run the generated binary using one of the supported handlers 668 elif op == "run": 669 logger.debug("run test: %s" % self.instance.name) 670 self.run() 671 logger.debug(f"run status: {self.instance.name} {self.instance.status}") 672 try: 673 # to make it work with pickle 674 self.instance.handler.thread = None 675 self.instance.handler.duts = None 676 pipeline.put({ 677 "op": "report", 678 "test": self.instance, 679 "status": self.instance.status, 680 "reason": self.instance.reason 681 } 682 ) 683 except RuntimeError as e: 684 logger.error(f"RuntimeError: {e}") 685 traceback.print_exc() 686 687 # Report results and output progress to screen 688 elif op == "report": 689 with lock: 690 done.put(self.instance) 691 self.report_out(results) 692 693 if not self.options.coverage: 694 if self.options.prep_artifacts_for_testing: 695 pipeline.put({"op": "cleanup", "mode": "device", "test": self.instance}) 696 elif self.options.runtime_artifact_cleanup == "pass" and self.instance.status == "passed": 697 pipeline.put({"op": "cleanup", "mode": "passed", "test": self.instance}) 698 elif self.options.runtime_artifact_cleanup == "all": 699 pipeline.put({"op": "cleanup", "mode": "all", "test": self.instance}) 700 701 elif op == "cleanup": 702 mode = message.get("mode") 703 if mode == "device": 704 self.cleanup_device_testing_artifacts() 705 elif mode == "passed" or (mode == "all" and self.instance.reason != "Cmake build failure"): 706 self.cleanup_artifacts() 707 708 def determine_testcases(self, results): 709 yaml_testsuite_name = self.instance.testsuite.id 710 logger.debug(f"Determine test cases for test suite: {yaml_testsuite_name}") 711 712 elf_file = self.instance.get_elf_file() 713 elf = ELFFile(open(elf_file, "rb")) 714 715 logger.debug(f"Test instance {self.instance.name} already has {len(self.instance.testcases)} cases.") 716 new_ztest_unit_test_regex = re.compile(r"z_ztest_unit_test__([^\s]+?)__([^\s]*)") 717 detected_cases = [] 718 for section in elf.iter_sections(): 719 if isinstance(section, SymbolTableSection): 720 for sym in section.iter_symbols(): 721 # It is only meant for new ztest fx because only new ztest fx exposes test functions 722 # precisely. 723 724 # The 1st capture group is new ztest suite name. 725 # The 2nd capture group is new ztest unit test name. 726 matches = new_ztest_unit_test_regex.findall(sym.name) 727 if matches: 728 for m in matches: 729 # new_ztest_suite = m[0] # not used for now 730 test_func_name = m[1].replace("test_", "") 731 testcase_id = f"{yaml_testsuite_name}.{test_func_name}" 732 detected_cases.append(testcase_id) 733 734 if detected_cases: 735 logger.debug(f"{', '.join(detected_cases)} in {elf_file}") 736 self.instance.testcases.clear() 737 self.instance.testsuite.testcases.clear() 738 739 # When the old regex-based test case collection is fully deprecated, 740 # this will be the sole place where test cases get added to the test instance. 741 # Then we can further include the new_ztest_suite info in the testcase_id. 742 743 for testcase_id in detected_cases: 744 self.instance.add_testcase(name=testcase_id) 745 self.instance.testsuite.add_testcase(name=testcase_id) 746 747 748 def cleanup_artifacts(self, additional_keep: List[str] = []): 749 logger.debug("Cleaning up {}".format(self.instance.build_dir)) 750 allow = [ 751 os.path.join('zephyr', '.config'), 752 'handler.log', 753 'build.log', 754 'device.log', 755 'recording.csv', 756 # below ones are needed to make --test-only work as well 757 'Makefile', 758 'CMakeCache.txt', 759 'build.ninja', 760 os.path.join('CMakeFiles', 'rules.ninja') 761 ] 762 763 allow += additional_keep 764 765 if self.options.runtime_artifact_cleanup == 'all': 766 allow += [os.path.join('twister', 'testsuite_extra.conf')] 767 768 allow = [os.path.join(self.instance.build_dir, file) for file in allow] 769 770 for dirpath, dirnames, filenames in os.walk(self.instance.build_dir, topdown=False): 771 for name in filenames: 772 path = os.path.join(dirpath, name) 773 if path not in allow: 774 os.remove(path) 775 # Remove empty directories and symbolic links to directories 776 for dir in dirnames: 777 path = os.path.join(dirpath, dir) 778 if os.path.islink(path): 779 os.remove(path) 780 elif not os.listdir(path): 781 os.rmdir(path) 782 783 def cleanup_device_testing_artifacts(self): 784 logger.debug("Cleaning up for Device Testing {}".format(self.instance.build_dir)) 785 786 files_to_keep = self._get_binaries() 787 files_to_keep.append(os.path.join('zephyr', 'runners.yaml')) 788 789 if self.testsuite.sysbuild: 790 files_to_keep.append('domains.yaml') 791 for domain in self.instance.domains.get_domains(): 792 files_to_keep += self._get_artifact_allow_list_for_domain(domain.name) 793 794 self.cleanup_artifacts(files_to_keep) 795 796 self._sanitize_files() 797 798 def _get_artifact_allow_list_for_domain(self, domain: str) -> List[str]: 799 """ 800 Return a list of files needed to test a given domain. 801 """ 802 allow = [ 803 os.path.join(domain, 'build.ninja'), 804 os.path.join(domain, 'CMakeCache.txt'), 805 os.path.join(domain, 'CMakeFiles', 'rules.ninja'), 806 os.path.join(domain, 'Makefile'), 807 os.path.join(domain, 'zephyr', '.config'), 808 os.path.join(domain, 'zephyr', 'runners.yaml') 809 ] 810 return allow 811 812 def _get_binaries(self) -> List[str]: 813 """ 814 Get list of binaries paths (absolute or relative to the 815 self.instance.build_dir), basing on information from platform.binaries 816 or runners.yaml. If they are not found take default binaries like 817 "zephyr/zephyr.hex" etc. 818 """ 819 binaries: List[str] = [] 820 821 platform = self.instance.platform 822 if platform.binaries: 823 for binary in platform.binaries: 824 binaries.append(os.path.join('zephyr', binary)) 825 826 # Get binaries for a single-domain build 827 binaries += self._get_binaries_from_runners() 828 # Get binaries in the case of a multiple-domain build 829 if self.testsuite.sysbuild: 830 for domain in self.instance.domains.get_domains(): 831 binaries += self._get_binaries_from_runners(domain.name) 832 833 # if binaries was not found in platform.binaries and runners.yaml take default ones 834 if len(binaries) == 0: 835 binaries = [ 836 os.path.join('zephyr', 'zephyr.hex'), 837 os.path.join('zephyr', 'zephyr.bin'), 838 os.path.join('zephyr', 'zephyr.elf'), 839 os.path.join('zephyr', 'zephyr.exe'), 840 ] 841 return binaries 842 843 def _get_binaries_from_runners(self, domain='') -> List[str]: 844 """ 845 Get list of binaries paths (absolute or relative to the 846 self.instance.build_dir) from runners.yaml file. May be used for 847 multiple-domain builds by passing in one domain at a time. 848 """ 849 850 runners_file_path: str = os.path.join(self.instance.build_dir, 851 domain, 'zephyr', 'runners.yaml') 852 if not os.path.exists(runners_file_path): 853 return [] 854 855 with open(runners_file_path, 'r') as file: 856 runners_content: dict = yaml.safe_load(file) 857 858 if 'config' not in runners_content: 859 return [] 860 861 runners_config: dict = runners_content['config'] 862 binary_keys: List[str] = ['elf_file', 'hex_file', 'bin_file'] 863 864 binaries: List[str] = [] 865 for binary_key in binary_keys: 866 binary_path = runners_config.get(binary_key) 867 if binary_path is None: 868 continue 869 if os.path.isabs(binary_path): 870 binaries.append(binary_path) 871 else: 872 binaries.append(os.path.join(domain, 'zephyr', binary_path)) 873 874 return binaries 875 876 def _sanitize_files(self): 877 """ 878 Sanitize files to make it possible to flash those file on different 879 computer/system. 880 """ 881 self._sanitize_runners_file() 882 self._sanitize_zephyr_base_from_files() 883 884 def _sanitize_runners_file(self): 885 """ 886 Replace absolute paths of binary files for relative ones. The base 887 directory for those files is f"{self.instance.build_dir}/zephyr" 888 """ 889 runners_dir_path: str = os.path.join(self.instance.build_dir, 'zephyr') 890 runners_file_path: str = os.path.join(runners_dir_path, 'runners.yaml') 891 if not os.path.exists(runners_file_path): 892 return 893 894 with open(runners_file_path, 'rt') as file: 895 runners_content_text = file.read() 896 runners_content_yaml: dict = yaml.safe_load(runners_content_text) 897 898 if 'config' not in runners_content_yaml: 899 return 900 901 runners_config: dict = runners_content_yaml['config'] 902 binary_keys: List[str] = ['elf_file', 'hex_file', 'bin_file'] 903 904 for binary_key in binary_keys: 905 binary_path = runners_config.get(binary_key) 906 # sanitize only paths which exist and are absolute 907 if binary_path is None or not os.path.isabs(binary_path): 908 continue 909 binary_path_relative = os.path.relpath(binary_path, start=runners_dir_path) 910 runners_content_text = runners_content_text.replace(binary_path, binary_path_relative) 911 912 with open(runners_file_path, 'wt') as file: 913 file.write(runners_content_text) 914 915 def _sanitize_zephyr_base_from_files(self): 916 """ 917 Remove Zephyr base paths from selected files. 918 """ 919 files_to_sanitize = [ 920 'CMakeCache.txt', 921 os.path.join('zephyr', 'runners.yaml'), 922 ] 923 for file_path in files_to_sanitize: 924 file_path = os.path.join(self.instance.build_dir, file_path) 925 if not os.path.exists(file_path): 926 continue 927 928 with open(file_path, "rt") as file: 929 data = file.read() 930 931 # add trailing slash at the end of canonical_zephyr_base if it does not exist: 932 path_to_remove = os.path.join(canonical_zephyr_base, "") 933 data = data.replace(path_to_remove, "") 934 935 with open(file_path, "wt") as file: 936 file.write(data) 937 938 def report_out(self, results): 939 total_to_do = results.total 940 total_tests_width = len(str(total_to_do)) 941 results.done += 1 942 instance = self.instance 943 if results.iteration == 1: 944 results.cases += len(instance.testcases) 945 946 if instance.status in ["error", "failed"]: 947 if instance.status == "error": 948 results.error += 1 949 txt = " ERROR " 950 else: 951 results.failed += 1 952 txt = " FAILED " 953 if self.options.verbose: 954 status = Fore.RED + txt + Fore.RESET + instance.reason 955 else: 956 logger.error( 957 "{:<25} {:<50} {}{}{}: {}".format( 958 instance.platform.name, 959 instance.testsuite.name, 960 Fore.RED, 961 txt, 962 Fore.RESET, 963 instance.reason)) 964 if not self.options.verbose: 965 self.log_info_file(self.options.inline_logs) 966 elif instance.status in ["skipped", "filtered"]: 967 status = Fore.YELLOW + "SKIPPED" + Fore.RESET 968 results.skipped_configs += 1 969 # test cases skipped at the test instance level 970 results.skipped_cases += len(instance.testsuite.testcases) 971 elif instance.status == "passed": 972 status = Fore.GREEN + "PASSED" + Fore.RESET 973 results.passed += 1 974 for case in instance.testcases: 975 # test cases skipped at the test case level 976 if case.status == 'skipped': 977 results.skipped_cases += 1 978 else: 979 logger.debug(f"Unknown status = {instance.status}") 980 status = Fore.YELLOW + "UNKNOWN" + Fore.RESET 981 982 if self.options.verbose: 983 if self.options.cmake_only: 984 more_info = "cmake" 985 elif instance.status in ["skipped", "filtered"]: 986 more_info = instance.reason 987 else: 988 if instance.handler.ready and instance.run: 989 more_info = instance.handler.type_str 990 htime = instance.execution_time 991 if instance.dut: 992 more_info += f": {instance.dut}," 993 if htime: 994 more_info += " {:.3f}s".format(htime) 995 else: 996 more_info = "build" 997 998 if ( instance.status in ["error", "failed", "timeout", "flash_error"] 999 and hasattr(self.instance.handler, 'seed') 1000 and self.instance.handler.seed is not None ): 1001 more_info += "/seed: " + str(self.options.seed) 1002 logger.info("{:>{}}/{} {:<25} {:<50} {} ({})".format( 1003 results.done, total_tests_width, total_to_do , instance.platform.name, 1004 instance.testsuite.name, status, more_info)) 1005 1006 if instance.status in ["error", "failed", "timeout"]: 1007 self.log_info_file(self.options.inline_logs) 1008 else: 1009 completed_perc = 0 1010 if total_to_do > 0: 1011 completed_perc = int((float(results.done) / total_to_do) * 100) 1012 1013 sys.stdout.write("INFO - Total complete: %s%4d/%4d%s %2d%% skipped: %s%4d%s, failed: %s%4d%s, error: %s%4d%s\r" % ( 1014 Fore.GREEN, 1015 results.done, 1016 total_to_do, 1017 Fore.RESET, 1018 completed_perc, 1019 Fore.YELLOW if results.skipped_configs > 0 else Fore.RESET, 1020 results.skipped_configs, 1021 Fore.RESET, 1022 Fore.RED if results.failed > 0 else Fore.RESET, 1023 results.failed, 1024 Fore.RESET, 1025 Fore.RED if results.error > 0 else Fore.RESET, 1026 results.error, 1027 Fore.RESET 1028 ) 1029 ) 1030 sys.stdout.flush() 1031 1032 @staticmethod 1033 def cmake_assemble_args(extra_args, handler, extra_conf_files, extra_overlay_confs, 1034 extra_dtc_overlay_files, cmake_extra_args, 1035 build_dir): 1036 # Retain quotes around config options 1037 config_options = [arg for arg in extra_args if arg.startswith("CONFIG_")] 1038 args = [arg for arg in extra_args if not arg.startswith("CONFIG_")] 1039 1040 args_expanded = ["-D{}".format(a.replace('"', '\"')) for a in config_options] 1041 1042 if handler.ready: 1043 args.extend(handler.args) 1044 1045 if extra_conf_files: 1046 args.append(f"CONF_FILE=\"{';'.join(extra_conf_files)}\"") 1047 1048 if extra_dtc_overlay_files: 1049 args.append(f"DTC_OVERLAY_FILE=\"{';'.join(extra_dtc_overlay_files)}\"") 1050 1051 # merge overlay files into one variable 1052 overlays = extra_overlay_confs.copy() 1053 1054 additional_overlay_path = os.path.join( 1055 build_dir, "twister", "testsuite_extra.conf" 1056 ) 1057 if os.path.exists(additional_overlay_path): 1058 overlays.append(additional_overlay_path) 1059 1060 if overlays: 1061 args.append("OVERLAY_CONFIG=\"%s\"" % (" ".join(overlays))) 1062 1063 # Build the final argument list 1064 args_expanded.extend(["-D{}".format(a.replace('"', '\"')) for a in cmake_extra_args]) 1065 args_expanded.extend(["-D{}".format(a.replace('"', '')) for a in args]) 1066 1067 return args_expanded 1068 1069 def cmake(self, filter_stages=[]): 1070 args = self.cmake_assemble_args( 1071 self.testsuite.extra_args.copy(), # extra_args from YAML 1072 self.instance.handler, 1073 self.testsuite.extra_conf_files, 1074 self.testsuite.extra_overlay_confs, 1075 self.testsuite.extra_dtc_overlay_files, 1076 self.options.extra_args, # CMake extra args 1077 self.instance.build_dir, 1078 ) 1079 return self.run_cmake(args,filter_stages) 1080 1081 def build(self): 1082 harness = HarnessImporter.get_harness(self.instance.testsuite.harness.capitalize()) 1083 build_result = self.run_build(['--build', self.build_dir]) 1084 try: 1085 if harness: 1086 harness.instance = self.instance 1087 harness.build() 1088 except ConfigurationError as error: 1089 self.instance.status = "error" 1090 self.instance.reason = str(error) 1091 logger.error(self.instance.reason) 1092 return 1093 return build_result 1094 1095 def run(self): 1096 1097 instance = self.instance 1098 1099 if instance.handler.ready: 1100 logger.debug(f"Reset instance status from '{instance.status}' to None before run.") 1101 instance.status = None 1102 1103 if instance.handler.type_str == "device": 1104 instance.handler.duts = self.duts 1105 1106 if(self.options.seed is not None and instance.platform.name.startswith("native_")): 1107 self.parse_generated() 1108 if('CONFIG_FAKE_ENTROPY_NATIVE_POSIX' in self.defconfig and 1109 self.defconfig['CONFIG_FAKE_ENTROPY_NATIVE_POSIX'] == 'y'): 1110 instance.handler.seed = self.options.seed 1111 1112 if self.options.extra_test_args and instance.platform.arch == "posix": 1113 instance.handler.extra_test_args = self.options.extra_test_args 1114 1115 harness = HarnessImporter.get_harness(instance.testsuite.harness.capitalize()) 1116 try: 1117 harness.configure(instance) 1118 except ConfigurationError as error: 1119 instance.status = "error" 1120 instance.reason = str(error) 1121 logger.error(instance.reason) 1122 return 1123 # 1124 if isinstance(harness, Pytest): 1125 harness.pytest_run(instance.handler.get_test_timeout()) 1126 else: 1127 instance.handler.handle(harness) 1128 1129 sys.stdout.flush() 1130 1131 def gather_metrics(self, instance: TestInstance): 1132 if self.options.create_rom_ram_report: 1133 self.run_build(['--build', self.build_dir, "--target", "footprint"]) 1134 if self.options.enable_size_report and not self.options.cmake_only: 1135 self.calc_size(instance=instance, from_buildlog=self.options.footprint_from_buildlog) 1136 else: 1137 instance.metrics["used_ram"] = 0 1138 instance.metrics["used_rom"] = 0 1139 instance.metrics["available_rom"] = 0 1140 instance.metrics["available_ram"] = 0 1141 instance.metrics["unrecognized"] = [] 1142 1143 @staticmethod 1144 def calc_size(instance: TestInstance, from_buildlog: bool): 1145 if instance.status not in ["error", "failed", "skipped"]: 1146 if not instance.platform.type in ["native", "qemu", "unit"]: 1147 generate_warning = bool(instance.platform.type == "mcu") 1148 size_calc = instance.calculate_sizes(from_buildlog=from_buildlog, generate_warning=generate_warning) 1149 instance.metrics["used_ram"] = size_calc.get_used_ram() 1150 instance.metrics["used_rom"] = size_calc.get_used_rom() 1151 instance.metrics["available_rom"] = size_calc.get_available_rom() 1152 instance.metrics["available_ram"] = size_calc.get_available_ram() 1153 instance.metrics["unrecognized"] = size_calc.unrecognized_sections() 1154 else: 1155 instance.metrics["used_ram"] = 0 1156 instance.metrics["used_rom"] = 0 1157 instance.metrics["available_rom"] = 0 1158 instance.metrics["available_ram"] = 0 1159 instance.metrics["unrecognized"] = [] 1160 instance.metrics["handler_time"] = instance.execution_time 1161 1162class TwisterRunner: 1163 1164 def __init__(self, instances, suites, env=None) -> None: 1165 self.pipeline = None 1166 self.options = env.options 1167 self.env = env 1168 self.instances = instances 1169 self.suites = suites 1170 self.duts = None 1171 self.jobs = 1 1172 self.results = None 1173 self.jobserver = None 1174 1175 def run(self): 1176 1177 retries = self.options.retry_failed + 1 1178 1179 BaseManager.register('LifoQueue', queue.LifoQueue) 1180 manager = BaseManager() 1181 manager.start() 1182 1183 self.results = ExecutionCounter(total=len(self.instances)) 1184 self.iteration = 0 1185 pipeline = manager.LifoQueue() 1186 done_queue = manager.LifoQueue() 1187 1188 # Set number of jobs 1189 if self.options.jobs: 1190 self.jobs = self.options.jobs 1191 elif self.options.build_only: 1192 self.jobs = multiprocessing.cpu_count() * 2 1193 else: 1194 self.jobs = multiprocessing.cpu_count() 1195 1196 if sys.platform == "linux": 1197 if os.name == 'posix': 1198 self.jobserver = GNUMakeJobClient.from_environ(jobs=self.options.jobs) 1199 if not self.jobserver: 1200 self.jobserver = GNUMakeJobServer(self.jobs) 1201 elif self.jobserver.jobs: 1202 self.jobs = self.jobserver.jobs 1203 # TODO: Implement this on windows/mac also 1204 else: 1205 self.jobserver = JobClient() 1206 1207 logger.info("JOBS: %d", self.jobs) 1208 1209 self.update_counting_before_pipeline() 1210 1211 while True: 1212 self.results.iteration += 1 1213 1214 if self.results.iteration > 1: 1215 logger.info("%d Iteration:" % (self.results.iteration)) 1216 time.sleep(self.options.retry_interval) # waiting for the system to settle down 1217 self.results.done = self.results.total - self.results.failed - self.results.error 1218 self.results.failed = 0 1219 if self.options.retry_build_errors: 1220 self.results.error = 0 1221 else: 1222 self.results.done = self.results.skipped_filter 1223 1224 self.execute(pipeline, done_queue) 1225 1226 while True: 1227 try: 1228 inst = done_queue.get_nowait() 1229 except queue.Empty: 1230 break 1231 else: 1232 inst.metrics.update(self.instances[inst.name].metrics) 1233 inst.metrics["handler_time"] = inst.execution_time 1234 inst.metrics["unrecognized"] = [] 1235 self.instances[inst.name] = inst 1236 1237 print("") 1238 1239 retry_errors = False 1240 if self.results.error and self.options.retry_build_errors: 1241 retry_errors = True 1242 1243 retries = retries - 1 1244 if retries == 0 or ( self.results.failed == 0 and not retry_errors): 1245 break 1246 1247 self.show_brief() 1248 1249 def update_counting_before_pipeline(self): 1250 ''' 1251 Updating counting before pipeline is necessary because statically filterd 1252 test instance never enter the pipeline. While some pipeline output needs 1253 the static filter stats. So need to prepare them before pipline starts. 1254 ''' 1255 for instance in self.instances.values(): 1256 if instance.status == 'filtered' and not instance.reason == 'runtime filter': 1257 self.results.skipped_filter += 1 1258 self.results.skipped_configs += 1 1259 self.results.skipped_cases += len(instance.testsuite.testcases) 1260 self.results.cases += len(instance.testsuite.testcases) 1261 elif instance.status == 'error': 1262 self.results.error += 1 1263 1264 def show_brief(self): 1265 logger.info("%d test scenarios (%d test instances) selected, " 1266 "%d configurations skipped (%d by static filter, %d at runtime)." % 1267 (len(self.suites), len(self.instances), 1268 self.results.skipped_configs, 1269 self.results.skipped_filter, 1270 self.results.skipped_configs - self.results.skipped_filter)) 1271 1272 def add_tasks_to_queue(self, pipeline, build_only=False, test_only=False, retry_build_errors=False): 1273 for instance in self.instances.values(): 1274 if build_only: 1275 instance.run = False 1276 1277 no_retry_statuses = ['passed', 'skipped', 'filtered'] 1278 if not retry_build_errors: 1279 no_retry_statuses.append("error") 1280 1281 if instance.status not in no_retry_statuses: 1282 logger.debug(f"adding {instance.name}") 1283 if instance.status: 1284 instance.retries += 1 1285 instance.status = None 1286 1287 # Check if cmake package_helper script can be run in advance. 1288 instance.filter_stages = [] 1289 if instance.testsuite.filter: 1290 instance.filter_stages = self.get_cmake_filter_stages(instance.testsuite.filter, expr_parser.reserved.keys()) 1291 1292 if test_only and instance.run: 1293 pipeline.put({"op": "run", "test": instance}) 1294 elif instance.filter_stages and "full" not in instance.filter_stages: 1295 pipeline.put({"op": "filter", "test": instance}) 1296 else: 1297 cache_file = os.path.join(instance.build_dir, "CMakeCache.txt") 1298 if os.path.exists(cache_file) and self.env.options.aggressive_no_clean: 1299 pipeline.put({"op": "build", "test": instance}) 1300 else: 1301 pipeline.put({"op": "cmake", "test": instance}) 1302 1303 1304 def pipeline_mgr(self, pipeline, done_queue, lock, results): 1305 if sys.platform == 'linux': 1306 with self.jobserver.get_job(): 1307 while True: 1308 try: 1309 task = pipeline.get_nowait() 1310 except queue.Empty: 1311 break 1312 else: 1313 instance = task['test'] 1314 pb = ProjectBuilder(instance, self.env, self.jobserver) 1315 pb.duts = self.duts 1316 pb.process(pipeline, done_queue, task, lock, results) 1317 1318 return True 1319 else: 1320 while True: 1321 try: 1322 task = pipeline.get_nowait() 1323 except queue.Empty: 1324 break 1325 else: 1326 instance = task['test'] 1327 pb = ProjectBuilder(instance, self.env, self.jobserver) 1328 pb.duts = self.duts 1329 pb.process(pipeline, done_queue, task, lock, results) 1330 return True 1331 1332 def execute(self, pipeline, done): 1333 lock = Lock() 1334 logger.info("Adding tasks to the queue...") 1335 self.add_tasks_to_queue(pipeline, self.options.build_only, self.options.test_only, 1336 retry_build_errors=self.options.retry_build_errors) 1337 logger.info("Added initial list of jobs to queue") 1338 1339 processes = [] 1340 1341 for _ in range(self.jobs): 1342 p = Process(target=self.pipeline_mgr, args=(pipeline, done, lock, self.results, )) 1343 processes.append(p) 1344 p.start() 1345 logger.debug(f"Launched {self.jobs} jobs") 1346 1347 try: 1348 for p in processes: 1349 p.join() 1350 except KeyboardInterrupt: 1351 logger.info("Execution interrupted") 1352 for p in processes: 1353 p.terminate() 1354 1355 @staticmethod 1356 def get_cmake_filter_stages(filt, logic_keys): 1357 """ Analyze filter expressions from test yaml and decide if dts and/or kconfig based filtering will be needed.""" 1358 dts_required = False 1359 kconfig_required = False 1360 full_required = False 1361 filter_stages = [] 1362 1363 # Compress args in expressions like "function('x', 'y')" so they are not split when splitting by whitespaces 1364 filt = filt.replace(", ", ",") 1365 # Remove logic words 1366 for k in logic_keys: 1367 filt = filt.replace(f"{k} ", "") 1368 # Remove brackets 1369 filt = filt.replace("(", "") 1370 filt = filt.replace(")", "") 1371 # Splite by whitespaces 1372 filt = filt.split() 1373 for expression in filt: 1374 if expression.startswith("dt_"): 1375 dts_required = True 1376 elif expression.startswith("CONFIG"): 1377 kconfig_required = True 1378 else: 1379 full_required = True 1380 1381 if full_required: 1382 return ["full"] 1383 if dts_required: 1384 filter_stages.append("dts") 1385 if kconfig_required: 1386 filter_stages.append("kconfig") 1387 1388 return filter_stages 1389