1# vim: set syntax=python ts=4 :
2#
3# Copyright (c) 20180-2022 Intel Corporation
4# Copyright 2022 NXP
5# SPDX-License-Identifier: Apache-2.0
6
7import logging
8import multiprocessing
9import os
10import pickle
11import queue
12import re
13import shutil
14import subprocess
15import sys
16import time
17import traceback
18import yaml
19from multiprocessing import Lock, Process, Value
20from multiprocessing.managers import BaseManager
21from typing import List
22from packaging import version
23
24from colorama import Fore
25from domains import Domains
26from twisterlib.cmakecache import CMakeCache
27from twisterlib.environment import canonical_zephyr_base
28from twisterlib.error import BuildError, ConfigurationError
29
30import elftools
31from elftools.elf.elffile import ELFFile
32from elftools.elf.sections import SymbolTableSection
33
34if version.parse(elftools.__version__) < version.parse('0.24'):
35    sys.exit("pyelftools is out of date, need version 0.24 or later")
36
37# Job server only works on Linux for now.
38if sys.platform == 'linux':
39    from twisterlib.jobserver import GNUMakeJobClient, GNUMakeJobServer, JobClient
40
41from twisterlib.log_helper import log_command
42from twisterlib.testinstance import TestInstance
43from twisterlib.environment import TwisterEnv
44from twisterlib.testsuite import TestSuite
45from twisterlib.platform import Platform
46from twisterlib.testplan import change_skip_to_error_if_integration
47from twisterlib.harness import HarnessImporter, Pytest
48
49logger = logging.getLogger('twister')
50logger.setLevel(logging.DEBUG)
51import expr_parser
52
53
54class ExecutionCounter(object):
55    def __init__(self, total=0):
56        '''
57        Most of the stats are at test instance level
58        Except that "_cases" and "_skipped_cases" are for cases of ALL test instances
59
60        total complete = done + skipped_filter
61        total = yaml test scenarios * applicable platforms
62        complete perctenage = (done + skipped_filter) / total
63        pass rate = passed / (total - skipped_configs)
64        '''
65        # instances that go through the pipeline
66        # updated by report_out()
67        self._done = Value('i', 0)
68
69        # iteration
70        self._iteration = Value('i', 0)
71
72        # instances that actually executed and passed
73        # updated by report_out()
74        self._passed = Value('i', 0)
75
76        # static filter + runtime filter + build skipped
77        # updated by update_counting_before_pipeline() and report_out()
78        self._skipped_configs = Value('i', 0)
79
80        # cmake filter + build skipped
81        # updated by report_out()
82        self._skipped_runtime = Value('i', 0)
83
84        # staic filtered at yaml parsing time
85        # updated by update_counting_before_pipeline()
86        self._skipped_filter = Value('i', 0)
87
88        # updated by update_counting_before_pipeline() and report_out()
89        self._skipped_cases = Value('i', 0)
90
91        # updated by report_out() in pipeline
92        self._error = Value('i', 0)
93        self._failed = Value('i', 0)
94
95        # initialized to number of test instances
96        self._total = Value('i', total)
97
98        # updated in report_out
99        self._cases = Value('i', 0)
100        self.lock = Lock()
101
102    def summary(self):
103        print("--------------------------------")
104        print(f"Total test suites: {self.total}") # actually test instances
105        print(f"Total test cases: {self.cases}")
106        print(f"Executed test cases: {self.cases - self.skipped_cases}")
107        print(f"Skipped test cases: {self.skipped_cases}")
108        print(f"Completed test suites: {self.done}")
109        print(f"Passing test suites: {self.passed}")
110        print(f"Failing test suites: {self.failed}")
111        print(f"Skipped test suites: {self.skipped_configs}")
112        print(f"Skipped test suites (runtime): {self.skipped_runtime}")
113        print(f"Skipped test suites (filter): {self.skipped_filter}")
114        print(f"Errors: {self.error}")
115        print("--------------------------------")
116
117    @property
118    def cases(self):
119        with self._cases.get_lock():
120            return self._cases.value
121
122    @cases.setter
123    def cases(self, value):
124        with self._cases.get_lock():
125            self._cases.value = value
126
127    @property
128    def skipped_cases(self):
129        with self._skipped_cases.get_lock():
130            return self._skipped_cases.value
131
132    @skipped_cases.setter
133    def skipped_cases(self, value):
134        with self._skipped_cases.get_lock():
135            self._skipped_cases.value = value
136
137    @property
138    def error(self):
139        with self._error.get_lock():
140            return self._error.value
141
142    @error.setter
143    def error(self, value):
144        with self._error.get_lock():
145            self._error.value = value
146
147    @property
148    def iteration(self):
149        with self._iteration.get_lock():
150            return self._iteration.value
151
152    @iteration.setter
153    def iteration(self, value):
154        with self._iteration.get_lock():
155            self._iteration.value = value
156
157    @property
158    def done(self):
159        with self._done.get_lock():
160            return self._done.value
161
162    @done.setter
163    def done(self, value):
164        with self._done.get_lock():
165            self._done.value = value
166
167    @property
168    def passed(self):
169        with self._passed.get_lock():
170            return self._passed.value
171
172    @passed.setter
173    def passed(self, value):
174        with self._passed.get_lock():
175            self._passed.value = value
176
177    @property
178    def skipped_configs(self):
179        with self._skipped_configs.get_lock():
180            return self._skipped_configs.value
181
182    @skipped_configs.setter
183    def skipped_configs(self, value):
184        with self._skipped_configs.get_lock():
185            self._skipped_configs.value = value
186
187    @property
188    def skipped_filter(self):
189        with self._skipped_filter.get_lock():
190            return self._skipped_filter.value
191
192    @skipped_filter.setter
193    def skipped_filter(self, value):
194        with self._skipped_filter.get_lock():
195            self._skipped_filter.value = value
196
197    @property
198    def skipped_runtime(self):
199        with self._skipped_runtime.get_lock():
200            return self._skipped_runtime.value
201
202    @skipped_runtime.setter
203    def skipped_runtime(self, value):
204        with self._skipped_runtime.get_lock():
205            self._skipped_runtime.value = value
206
207    @property
208    def failed(self):
209        with self._failed.get_lock():
210            return self._failed.value
211
212    @failed.setter
213    def failed(self, value):
214        with self._failed.get_lock():
215            self._failed.value = value
216
217    @property
218    def total(self):
219        with self._total.get_lock():
220            return self._total.value
221
222class CMake:
223    config_re = re.compile('(CONFIG_[A-Za-z0-9_]+)[=]\"?([^\"]*)\"?$')
224    dt_re = re.compile('([A-Za-z0-9_]+)[=]\"?([^\"]*)\"?$')
225
226    def __init__(self, testsuite: TestSuite, platform: Platform, source_dir, build_dir, jobserver):
227
228        self.cwd = None
229        self.capture_output = True
230
231        self.defconfig = {}
232        self.cmake_cache = {}
233
234        self.instance = None
235        self.testsuite = testsuite
236        self.platform = platform
237        self.source_dir = source_dir
238        self.build_dir = build_dir
239        self.log = "build.log"
240
241        self.default_encoding = sys.getdefaultencoding()
242        self.jobserver = jobserver
243
244    def parse_generated(self, filter_stages=[]):
245        self.defconfig = {}
246        return {}
247
248    def run_build(self, args=[]):
249
250        logger.debug("Building %s for %s" % (self.source_dir, self.platform.name))
251
252        cmake_args = []
253        cmake_args.extend(args)
254        cmake = shutil.which('cmake')
255        cmd = [cmake] + cmake_args
256        kwargs = dict()
257
258        if self.capture_output:
259            kwargs['stdout'] = subprocess.PIPE
260            # CMake sends the output of message() to stderr unless it's STATUS
261            kwargs['stderr'] = subprocess.STDOUT
262
263        if self.cwd:
264            kwargs['cwd'] = self.cwd
265
266        start_time = time.time()
267        if sys.platform == 'linux':
268            p = self.jobserver.popen(cmd, **kwargs)
269        else:
270            p = subprocess.Popen(cmd, **kwargs)
271        logger.debug(f'Running {"".join(cmd)}')
272
273        out, _ = p.communicate()
274
275        ret = {}
276        duration = time.time() - start_time
277        self.instance.build_time += duration
278        if p.returncode == 0:
279            msg = f"Finished building {self.source_dir} for {self.platform.name} in {duration:.2f} seconds"
280            logger.debug(msg)
281
282            self.instance.status = "passed"
283            if not self.instance.run:
284                self.instance.add_missing_case_status("skipped", "Test was built only")
285            ret = {"returncode": p.returncode}
286
287            if out:
288                log_msg = out.decode(self.default_encoding)
289                with open(os.path.join(self.build_dir, self.log), "a", encoding=self.default_encoding) as log:
290                    log.write(log_msg)
291            else:
292                return None
293        else:
294            # A real error occurred, raise an exception
295            log_msg = ""
296            if out:
297                log_msg = out.decode(self.default_encoding)
298                with open(os.path.join(self.build_dir, self.log), "a", encoding=self.default_encoding) as log:
299                    log.write(log_msg)
300
301            if log_msg:
302                overflow_found = re.findall("region `(FLASH|ROM|RAM|ICCM|DCCM|SRAM|dram0_1_seg)' overflowed by", log_msg)
303                imgtool_overflow_found = re.findall(r"Error: Image size \(.*\) \+ trailer \(.*\) exceeds requested size", log_msg)
304                if overflow_found and not self.options.overflow_as_errors:
305                    logger.debug("Test skipped due to {} Overflow".format(overflow_found[0]))
306                    self.instance.status = "skipped"
307                    self.instance.reason = "{} overflow".format(overflow_found[0])
308                    change_skip_to_error_if_integration(self.options, self.instance)
309                elif imgtool_overflow_found and not self.options.overflow_as_errors:
310                    self.instance.status = "skipped"
311                    self.instance.reason = "imgtool overflow"
312                    change_skip_to_error_if_integration(self.options, self.instance)
313                else:
314                    self.instance.status = "error"
315                    self.instance.reason = "Build failure"
316
317            ret = {
318                "returncode": p.returncode
319            }
320
321        return ret
322
323    def run_cmake(self, args="", filter_stages=[]):
324
325        if not self.options.disable_warnings_as_errors:
326            warnings_as_errors = 'y'
327            gen_defines_args = "--edtlib-Werror"
328        else:
329            warnings_as_errors = 'n'
330            gen_defines_args = ""
331
332        logger.debug("Running cmake on %s for %s" % (self.source_dir, self.platform.name))
333        cmake_args = [
334            f'-B{self.build_dir}',
335            f'-DTC_RUNID={self.instance.run_id}',
336            f'-DCONFIG_COMPILER_WARNINGS_AS_ERRORS={warnings_as_errors}',
337            f'-DEXTRA_GEN_DEFINES_ARGS={gen_defines_args}',
338            f'-G{self.env.generator}'
339        ]
340
341        # If needed, run CMake using the package_helper script first, to only run
342        # a subset of all cmake modules. This output will be used to filter
343        # testcases, and the full CMake configuration will be run for
344        # testcases that should be built
345        if filter_stages:
346            cmake_filter_args = [
347                f'-DMODULES={",".join(filter_stages)}',
348                f'-P{canonical_zephyr_base}/cmake/package_helper.cmake',
349            ]
350
351        if self.testsuite.sysbuild and not filter_stages:
352            logger.debug("Building %s using sysbuild" % (self.source_dir))
353            source_args = [
354                f'-S{canonical_zephyr_base}/share/sysbuild',
355                f'-DAPP_DIR={self.source_dir}'
356            ]
357        else:
358            source_args = [
359                f'-S{self.source_dir}'
360            ]
361        cmake_args.extend(source_args)
362
363        cmake_args.extend(args)
364
365        cmake_opts = ['-DBOARD={}'.format(self.platform.name)]
366        cmake_args.extend(cmake_opts)
367
368        if self.instance.testsuite.required_snippets:
369            cmake_opts = ['-DSNIPPET={}'.format(';'.join(self.instance.testsuite.required_snippets))]
370            cmake_args.extend(cmake_opts)
371
372        cmake = shutil.which('cmake')
373        cmd = [cmake] + cmake_args
374
375        if filter_stages:
376            cmd += cmake_filter_args
377
378        kwargs = dict()
379
380        log_command(logger, "Calling cmake", cmd)
381
382        if self.capture_output:
383            kwargs['stdout'] = subprocess.PIPE
384            # CMake sends the output of message() to stderr unless it's STATUS
385            kwargs['stderr'] = subprocess.STDOUT
386
387        if self.cwd:
388            kwargs['cwd'] = self.cwd
389
390        start_time = time.time()
391        if sys.platform == 'linux':
392            p = self.jobserver.popen(cmd, **kwargs)
393        else:
394            p = subprocess.Popen(cmd, **kwargs)
395        out, _ = p.communicate()
396
397        duration = time.time() - start_time
398        self.instance.build_time += duration
399
400        if p.returncode == 0:
401            filter_results = self.parse_generated(filter_stages)
402            msg = f"Finished running cmake {self.source_dir} for {self.platform.name} in {duration:.2f} seconds"
403            logger.debug(msg)
404            ret = {
405                    'returncode': p.returncode,
406                    'filter': filter_results
407                    }
408        else:
409            self.instance.status = "error"
410            self.instance.reason = "Cmake build failure"
411
412            for tc in self.instance.testcases:
413                tc.status = self.instance.status
414
415            logger.error("Cmake build failure: %s for %s" % (self.source_dir, self.platform.name))
416            ret = {"returncode": p.returncode}
417
418        if out:
419            os.makedirs(self.build_dir, exist_ok=True)
420            with open(os.path.join(self.build_dir, self.log), "a", encoding=self.default_encoding) as log:
421                log_msg = out.decode(self.default_encoding)
422                log.write(log_msg)
423
424        return ret
425
426
427class FilterBuilder(CMake):
428
429    def __init__(self, testsuite: TestSuite, platform: Platform, source_dir, build_dir, jobserver):
430        super().__init__(testsuite, platform, source_dir, build_dir, jobserver)
431
432        self.log = "config-twister.log"
433
434    def parse_generated(self, filter_stages=[]):
435
436        if self.platform.name == "unit_testing":
437            return {}
438
439        if self.testsuite.sysbuild and not filter_stages:
440            # Load domain yaml to get default domain build directory
441            domain_path = os.path.join(self.build_dir, "domains.yaml")
442            domains = Domains.from_file(domain_path)
443            logger.debug("Loaded sysbuild domain data from %s" % (domain_path))
444            self.instance.domains = domains
445            domain_build = domains.get_default_domain().build_dir
446            cmake_cache_path = os.path.join(domain_build, "CMakeCache.txt")
447            defconfig_path = os.path.join(domain_build, "zephyr", ".config")
448            edt_pickle = os.path.join(domain_build, "zephyr", "edt.pickle")
449        else:
450            cmake_cache_path = os.path.join(self.build_dir, "CMakeCache.txt")
451            # .config is only available after kconfig stage in cmake. If only dt based filtration is required
452            # package helper call won't produce .config
453            if not filter_stages or "kconfig" in filter_stages:
454                defconfig_path = os.path.join(self.build_dir, "zephyr", ".config")
455            # dt is compiled before kconfig, so edt_pickle is available regardless of choice of filter stages
456            edt_pickle = os.path.join(self.build_dir, "zephyr", "edt.pickle")
457
458
459        if not filter_stages or "kconfig" in filter_stages:
460            with open(defconfig_path, "r") as fp:
461                defconfig = {}
462                for line in fp.readlines():
463                    m = self.config_re.match(line)
464                    if not m:
465                        if line.strip() and not line.startswith("#"):
466                            sys.stderr.write("Unrecognized line %s\n" % line)
467                        continue
468                    defconfig[m.group(1)] = m.group(2).strip()
469
470            self.defconfig = defconfig
471
472        cmake_conf = {}
473        try:
474            cache = CMakeCache.from_file(cmake_cache_path)
475        except FileNotFoundError:
476            cache = {}
477
478        for k in iter(cache):
479            cmake_conf[k.name] = k.value
480
481        self.cmake_cache = cmake_conf
482
483        filter_data = {
484            "ARCH": self.platform.arch,
485            "PLATFORM": self.platform.name
486        }
487        filter_data.update(os.environ)
488        if not filter_stages or "kconfig" in filter_stages:
489            filter_data.update(self.defconfig)
490        filter_data.update(self.cmake_cache)
491
492        if self.testsuite.sysbuild and self.env.options.device_testing:
493            # Verify that twister's arguments support sysbuild.
494            # Twister sysbuild flashing currently only works with west, so
495            # --west-flash must be passed. Additionally, erasing the DUT
496            # before each test with --west-flash=--erase will inherently not
497            # work with sysbuild.
498            if self.env.options.west_flash is None:
499                logger.warning("Sysbuild test will be skipped. " +
500                    "West must be used for flashing.")
501                return {os.path.join(self.platform.name, self.testsuite.name): True}
502            elif "--erase" in self.env.options.west_flash:
503                logger.warning("Sysbuild test will be skipped, " +
504                    "--erase is not supported with --west-flash")
505                return {os.path.join(self.platform.name, self.testsuite.name): True}
506
507        if self.testsuite and self.testsuite.filter:
508            try:
509                if os.path.exists(edt_pickle):
510                    with open(edt_pickle, 'rb') as f:
511                        edt = pickle.load(f)
512                else:
513                    edt = None
514                ret = expr_parser.parse(self.testsuite.filter, filter_data, edt)
515
516            except (ValueError, SyntaxError) as se:
517                sys.stderr.write(
518                    "Failed processing %s\n" % self.testsuite.yamlfile)
519                raise se
520
521            if not ret:
522                return {os.path.join(self.platform.name, self.testsuite.name): True}
523            else:
524                return {os.path.join(self.platform.name, self.testsuite.name): False}
525        else:
526            self.platform.filter_data = filter_data
527            return filter_data
528
529
530class ProjectBuilder(FilterBuilder):
531
532    def __init__(self, instance: TestInstance, env: TwisterEnv, jobserver, **kwargs):
533        super().__init__(instance.testsuite, instance.platform, instance.testsuite.source_dir, instance.build_dir, jobserver)
534
535        self.log = "build.log"
536        self.instance = instance
537        self.filtered_tests = 0
538        self.options = env.options
539        self.env = env
540        self.duts = None
541
542    def log_info(self, filename, inline_logs, log_testcases=False):
543        filename = os.path.abspath(os.path.realpath(filename))
544        if inline_logs:
545            logger.info("{:-^100}".format(filename))
546
547            try:
548                with open(filename) as fp:
549                    data = fp.read()
550            except Exception as e:
551                data = "Unable to read log data (%s)\n" % (str(e))
552
553            logger.error(data)
554
555            logger.info("{:-^100}".format(filename))
556
557            if log_testcases:
558                for tc in self.instance.testcases:
559                    if not tc.reason:
560                        continue
561                    logger.info(
562                        f"\n{str(tc.name).center(100, '_')}\n"
563                        f"{tc.reason}\n"
564                        f"{100*'_'}\n"
565                        f"{tc.output}"
566                    )
567        else:
568            logger.error("see: " + Fore.YELLOW + filename + Fore.RESET)
569
570    def log_info_file(self, inline_logs):
571        build_dir = self.instance.build_dir
572        h_log = "{}/handler.log".format(build_dir)
573        b_log = "{}/build.log".format(build_dir)
574        v_log = "{}/valgrind.log".format(build_dir)
575        d_log = "{}/device.log".format(build_dir)
576        pytest_log = "{}/twister_harness.log".format(build_dir)
577
578        if os.path.exists(v_log) and "Valgrind" in self.instance.reason:
579            self.log_info("{}".format(v_log), inline_logs)
580        elif os.path.exists(pytest_log) and os.path.getsize(pytest_log) > 0:
581            self.log_info("{}".format(pytest_log), inline_logs, log_testcases=True)
582        elif os.path.exists(h_log) and os.path.getsize(h_log) > 0:
583            self.log_info("{}".format(h_log), inline_logs)
584        elif os.path.exists(d_log) and os.path.getsize(d_log) > 0:
585            self.log_info("{}".format(d_log), inline_logs)
586        else:
587            self.log_info("{}".format(b_log), inline_logs)
588
589
590    def process(self, pipeline, done, message, lock, results):
591        op = message.get('op')
592
593        self.instance.setup_handler(self.env)
594
595        if op == "filter":
596            ret = self.cmake(filter_stages=self.instance.filter_stages)
597            if self.instance.status in ["failed", "error"]:
598                pipeline.put({"op": "report", "test": self.instance})
599            else:
600                # Here we check the dt/kconfig filter results coming from running cmake
601                if self.instance.name in ret['filter'] and ret['filter'][self.instance.name]:
602                    logger.debug("filtering %s" % self.instance.name)
603                    self.instance.status = "filtered"
604                    self.instance.reason = "runtime filter"
605                    results.skipped_runtime += 1
606                    self.instance.add_missing_case_status("skipped")
607                    pipeline.put({"op": "report", "test": self.instance})
608                else:
609                    pipeline.put({"op": "cmake", "test": self.instance})
610
611        # The build process, call cmake and build with configured generator
612        elif op == "cmake":
613            ret = self.cmake()
614            if self.instance.status in ["failed", "error"]:
615                pipeline.put({"op": "report", "test": self.instance})
616            elif self.options.cmake_only:
617                if self.instance.status is None:
618                    self.instance.status = "passed"
619                pipeline.put({"op": "report", "test": self.instance})
620            else:
621                # Here we check the runtime filter results coming from running cmake
622                if self.instance.name in ret['filter'] and ret['filter'][self.instance.name]:
623                    logger.debug("filtering %s" % self.instance.name)
624                    self.instance.status = "filtered"
625                    self.instance.reason = "runtime filter"
626                    results.skipped_runtime += 1
627                    self.instance.add_missing_case_status("skipped")
628                    pipeline.put({"op": "report", "test": self.instance})
629                else:
630                    pipeline.put({"op": "build", "test": self.instance})
631
632        elif op == "build":
633            logger.debug("build test: %s" % self.instance.name)
634            ret = self.build()
635            if not ret:
636                self.instance.status = "error"
637                self.instance.reason = "Build Failure"
638                pipeline.put({"op": "report", "test": self.instance})
639            else:
640                # Count skipped cases during build, for example
641                # due to ram/rom overflow.
642                if  self.instance.status == "skipped":
643                    results.skipped_runtime += 1
644                    self.instance.add_missing_case_status("skipped", self.instance.reason)
645
646                if ret.get('returncode', 1) > 0:
647                    self.instance.add_missing_case_status("blocked", self.instance.reason)
648                    pipeline.put({"op": "report", "test": self.instance})
649                else:
650                    logger.debug(f"Determine test cases for test instance: {self.instance.name}")
651                    try:
652                        self.determine_testcases(results)
653                        pipeline.put({"op": "gather_metrics", "test": self.instance})
654                    except BuildError as e:
655                        logger.error(str(e))
656                        self.instance.status = "error"
657                        self.instance.reason = str(e)
658                        pipeline.put({"op": "report", "test": self.instance})
659
660        elif op == "gather_metrics":
661            self.gather_metrics(self.instance)
662            if self.instance.run and self.instance.handler.ready:
663                pipeline.put({"op": "run", "test": self.instance})
664            else:
665                pipeline.put({"op": "report", "test": self.instance})
666
667        # Run the generated binary using one of the supported handlers
668        elif op == "run":
669            logger.debug("run test: %s" % self.instance.name)
670            self.run()
671            logger.debug(f"run status: {self.instance.name} {self.instance.status}")
672            try:
673                # to make it work with pickle
674                self.instance.handler.thread = None
675                self.instance.handler.duts = None
676                pipeline.put({
677                    "op": "report",
678                    "test": self.instance,
679                    "status": self.instance.status,
680                    "reason": self.instance.reason
681                    }
682                )
683            except RuntimeError as e:
684                logger.error(f"RuntimeError: {e}")
685                traceback.print_exc()
686
687        # Report results and output progress to screen
688        elif op == "report":
689            with lock:
690                done.put(self.instance)
691                self.report_out(results)
692
693            if not self.options.coverage:
694                if self.options.prep_artifacts_for_testing:
695                    pipeline.put({"op": "cleanup", "mode": "device", "test": self.instance})
696                elif self.options.runtime_artifact_cleanup == "pass" and self.instance.status == "passed":
697                    pipeline.put({"op": "cleanup", "mode": "passed", "test": self.instance})
698                elif self.options.runtime_artifact_cleanup == "all":
699                    pipeline.put({"op": "cleanup", "mode": "all", "test": self.instance})
700
701        elif op == "cleanup":
702            mode = message.get("mode")
703            if mode == "device":
704                self.cleanup_device_testing_artifacts()
705            elif mode == "passed" or (mode == "all" and self.instance.reason != "Cmake build failure"):
706                self.cleanup_artifacts()
707
708    def determine_testcases(self, results):
709        yaml_testsuite_name = self.instance.testsuite.id
710        logger.debug(f"Determine test cases for test suite: {yaml_testsuite_name}")
711
712        elf_file = self.instance.get_elf_file()
713        elf = ELFFile(open(elf_file, "rb"))
714
715        logger.debug(f"Test instance {self.instance.name} already has {len(self.instance.testcases)} cases.")
716        new_ztest_unit_test_regex = re.compile(r"z_ztest_unit_test__([^\s]+?)__([^\s]*)")
717        detected_cases = []
718        for section in elf.iter_sections():
719            if isinstance(section, SymbolTableSection):
720                for sym in section.iter_symbols():
721                    # It is only meant for new ztest fx because only new ztest fx exposes test functions
722                    # precisely.
723
724                    # The 1st capture group is new ztest suite name.
725                    # The 2nd capture group is new ztest unit test name.
726                    matches = new_ztest_unit_test_regex.findall(sym.name)
727                    if matches:
728                        for m in matches:
729                            # new_ztest_suite = m[0] # not used for now
730                            test_func_name = m[1].replace("test_", "")
731                            testcase_id = f"{yaml_testsuite_name}.{test_func_name}"
732                            detected_cases.append(testcase_id)
733
734        if detected_cases:
735            logger.debug(f"{', '.join(detected_cases)} in {elf_file}")
736            self.instance.testcases.clear()
737            self.instance.testsuite.testcases.clear()
738
739            # When the old regex-based test case collection is fully deprecated,
740            # this will be the sole place where test cases get added to the test instance.
741            # Then we can further include the new_ztest_suite info in the testcase_id.
742
743            for testcase_id in detected_cases:
744                self.instance.add_testcase(name=testcase_id)
745                self.instance.testsuite.add_testcase(name=testcase_id)
746
747
748    def cleanup_artifacts(self, additional_keep: List[str] = []):
749        logger.debug("Cleaning up {}".format(self.instance.build_dir))
750        allow = [
751            os.path.join('zephyr', '.config'),
752            'handler.log',
753            'build.log',
754            'device.log',
755            'recording.csv',
756            # below ones are needed to make --test-only work as well
757            'Makefile',
758            'CMakeCache.txt',
759            'build.ninja',
760            os.path.join('CMakeFiles', 'rules.ninja')
761            ]
762
763        allow += additional_keep
764
765        if self.options.runtime_artifact_cleanup == 'all':
766            allow += [os.path.join('twister', 'testsuite_extra.conf')]
767
768        allow = [os.path.join(self.instance.build_dir, file) for file in allow]
769
770        for dirpath, dirnames, filenames in os.walk(self.instance.build_dir, topdown=False):
771            for name in filenames:
772                path = os.path.join(dirpath, name)
773                if path not in allow:
774                    os.remove(path)
775            # Remove empty directories and symbolic links to directories
776            for dir in dirnames:
777                path = os.path.join(dirpath, dir)
778                if os.path.islink(path):
779                    os.remove(path)
780                elif not os.listdir(path):
781                    os.rmdir(path)
782
783    def cleanup_device_testing_artifacts(self):
784        logger.debug("Cleaning up for Device Testing {}".format(self.instance.build_dir))
785
786        files_to_keep = self._get_binaries()
787        files_to_keep.append(os.path.join('zephyr', 'runners.yaml'))
788
789        if self.testsuite.sysbuild:
790            files_to_keep.append('domains.yaml')
791            for domain in self.instance.domains.get_domains():
792                files_to_keep += self._get_artifact_allow_list_for_domain(domain.name)
793
794        self.cleanup_artifacts(files_to_keep)
795
796        self._sanitize_files()
797
798    def _get_artifact_allow_list_for_domain(self, domain: str) -> List[str]:
799        """
800        Return a list of files needed to test a given domain.
801        """
802        allow = [
803            os.path.join(domain, 'build.ninja'),
804            os.path.join(domain, 'CMakeCache.txt'),
805            os.path.join(domain, 'CMakeFiles', 'rules.ninja'),
806            os.path.join(domain, 'Makefile'),
807            os.path.join(domain, 'zephyr', '.config'),
808            os.path.join(domain, 'zephyr', 'runners.yaml')
809            ]
810        return allow
811
812    def _get_binaries(self) -> List[str]:
813        """
814        Get list of binaries paths (absolute or relative to the
815        self.instance.build_dir), basing on information from platform.binaries
816        or runners.yaml. If they are not found take default binaries like
817        "zephyr/zephyr.hex" etc.
818        """
819        binaries: List[str] = []
820
821        platform = self.instance.platform
822        if platform.binaries:
823            for binary in platform.binaries:
824                binaries.append(os.path.join('zephyr', binary))
825
826        # Get binaries for a single-domain build
827        binaries += self._get_binaries_from_runners()
828        # Get binaries in the case of a multiple-domain build
829        if self.testsuite.sysbuild:
830            for domain in self.instance.domains.get_domains():
831                binaries += self._get_binaries_from_runners(domain.name)
832
833        # if binaries was not found in platform.binaries and runners.yaml take default ones
834        if len(binaries) == 0:
835            binaries = [
836                os.path.join('zephyr', 'zephyr.hex'),
837                os.path.join('zephyr', 'zephyr.bin'),
838                os.path.join('zephyr', 'zephyr.elf'),
839                os.path.join('zephyr', 'zephyr.exe'),
840            ]
841        return binaries
842
843    def _get_binaries_from_runners(self, domain='') -> List[str]:
844        """
845        Get list of binaries paths (absolute or relative to the
846        self.instance.build_dir) from runners.yaml file. May be used for
847        multiple-domain builds by passing in one domain at a time.
848        """
849
850        runners_file_path: str = os.path.join(self.instance.build_dir,
851                                              domain, 'zephyr', 'runners.yaml')
852        if not os.path.exists(runners_file_path):
853            return []
854
855        with open(runners_file_path, 'r') as file:
856            runners_content: dict = yaml.safe_load(file)
857
858        if 'config' not in runners_content:
859            return []
860
861        runners_config: dict = runners_content['config']
862        binary_keys: List[str] = ['elf_file', 'hex_file', 'bin_file']
863
864        binaries: List[str] = []
865        for binary_key in binary_keys:
866            binary_path = runners_config.get(binary_key)
867            if binary_path is None:
868                continue
869            if os.path.isabs(binary_path):
870                binaries.append(binary_path)
871            else:
872                binaries.append(os.path.join(domain, 'zephyr', binary_path))
873
874        return binaries
875
876    def _sanitize_files(self):
877        """
878        Sanitize files to make it possible to flash those file on different
879        computer/system.
880        """
881        self._sanitize_runners_file()
882        self._sanitize_zephyr_base_from_files()
883
884    def _sanitize_runners_file(self):
885        """
886        Replace absolute paths of binary files for relative ones. The base
887        directory for those files is f"{self.instance.build_dir}/zephyr"
888        """
889        runners_dir_path: str = os.path.join(self.instance.build_dir, 'zephyr')
890        runners_file_path: str = os.path.join(runners_dir_path, 'runners.yaml')
891        if not os.path.exists(runners_file_path):
892            return
893
894        with open(runners_file_path, 'rt') as file:
895            runners_content_text = file.read()
896            runners_content_yaml: dict = yaml.safe_load(runners_content_text)
897
898        if 'config' not in runners_content_yaml:
899            return
900
901        runners_config: dict = runners_content_yaml['config']
902        binary_keys: List[str] = ['elf_file', 'hex_file', 'bin_file']
903
904        for binary_key in binary_keys:
905            binary_path = runners_config.get(binary_key)
906            # sanitize only paths which exist and are absolute
907            if binary_path is None or not os.path.isabs(binary_path):
908                continue
909            binary_path_relative = os.path.relpath(binary_path, start=runners_dir_path)
910            runners_content_text = runners_content_text.replace(binary_path, binary_path_relative)
911
912        with open(runners_file_path, 'wt') as file:
913            file.write(runners_content_text)
914
915    def _sanitize_zephyr_base_from_files(self):
916        """
917        Remove Zephyr base paths from selected files.
918        """
919        files_to_sanitize = [
920            'CMakeCache.txt',
921            os.path.join('zephyr', 'runners.yaml'),
922        ]
923        for file_path in files_to_sanitize:
924            file_path = os.path.join(self.instance.build_dir, file_path)
925            if not os.path.exists(file_path):
926                continue
927
928            with open(file_path, "rt") as file:
929                data = file.read()
930
931            # add trailing slash at the end of canonical_zephyr_base if it does not exist:
932            path_to_remove = os.path.join(canonical_zephyr_base, "")
933            data = data.replace(path_to_remove, "")
934
935            with open(file_path, "wt") as file:
936                file.write(data)
937
938    def report_out(self, results):
939        total_to_do = results.total
940        total_tests_width = len(str(total_to_do))
941        results.done += 1
942        instance = self.instance
943        if results.iteration == 1:
944            results.cases += len(instance.testcases)
945
946        if instance.status in ["error", "failed"]:
947            if instance.status == "error":
948                results.error += 1
949                txt = " ERROR "
950            else:
951                results.failed += 1
952                txt = " FAILED "
953            if self.options.verbose:
954                status = Fore.RED + txt + Fore.RESET + instance.reason
955            else:
956                logger.error(
957                    "{:<25} {:<50} {}{}{}: {}".format(
958                        instance.platform.name,
959                        instance.testsuite.name,
960                        Fore.RED,
961                        txt,
962                        Fore.RESET,
963                        instance.reason))
964            if not self.options.verbose:
965                self.log_info_file(self.options.inline_logs)
966        elif instance.status in ["skipped", "filtered"]:
967            status = Fore.YELLOW + "SKIPPED" + Fore.RESET
968            results.skipped_configs += 1
969            # test cases skipped at the test instance level
970            results.skipped_cases += len(instance.testsuite.testcases)
971        elif instance.status == "passed":
972            status = Fore.GREEN + "PASSED" + Fore.RESET
973            results.passed += 1
974            for case in instance.testcases:
975                # test cases skipped at the test case level
976                if case.status == 'skipped':
977                    results.skipped_cases += 1
978        else:
979            logger.debug(f"Unknown status = {instance.status}")
980            status = Fore.YELLOW + "UNKNOWN" + Fore.RESET
981
982        if self.options.verbose:
983            if self.options.cmake_only:
984                more_info = "cmake"
985            elif instance.status in ["skipped", "filtered"]:
986                more_info = instance.reason
987            else:
988                if instance.handler.ready and instance.run:
989                    more_info = instance.handler.type_str
990                    htime = instance.execution_time
991                    if instance.dut:
992                        more_info += f": {instance.dut},"
993                    if htime:
994                        more_info += " {:.3f}s".format(htime)
995                else:
996                    more_info = "build"
997
998                if ( instance.status in ["error", "failed", "timeout", "flash_error"]
999                     and hasattr(self.instance.handler, 'seed')
1000                     and self.instance.handler.seed is not None ):
1001                    more_info += "/seed: " + str(self.options.seed)
1002            logger.info("{:>{}}/{} {:<25} {:<50} {} ({})".format(
1003                results.done, total_tests_width, total_to_do , instance.platform.name,
1004                instance.testsuite.name, status, more_info))
1005
1006            if instance.status in ["error", "failed", "timeout"]:
1007                self.log_info_file(self.options.inline_logs)
1008        else:
1009            completed_perc = 0
1010            if total_to_do > 0:
1011                completed_perc = int((float(results.done) / total_to_do) * 100)
1012
1013            sys.stdout.write("INFO    - Total complete: %s%4d/%4d%s  %2d%%  skipped: %s%4d%s, failed: %s%4d%s, error: %s%4d%s\r" % (
1014                Fore.GREEN,
1015                results.done,
1016                total_to_do,
1017                Fore.RESET,
1018                completed_perc,
1019                Fore.YELLOW if results.skipped_configs > 0 else Fore.RESET,
1020                results.skipped_configs,
1021                Fore.RESET,
1022                Fore.RED if results.failed > 0 else Fore.RESET,
1023                results.failed,
1024                Fore.RESET,
1025                Fore.RED if results.error > 0 else Fore.RESET,
1026                results.error,
1027                Fore.RESET
1028                )
1029                )
1030        sys.stdout.flush()
1031
1032    @staticmethod
1033    def cmake_assemble_args(extra_args, handler, extra_conf_files, extra_overlay_confs,
1034                            extra_dtc_overlay_files, cmake_extra_args,
1035                            build_dir):
1036        # Retain quotes around config options
1037        config_options = [arg for arg in extra_args if arg.startswith("CONFIG_")]
1038        args = [arg for arg in extra_args if not arg.startswith("CONFIG_")]
1039
1040        args_expanded = ["-D{}".format(a.replace('"', '\"')) for a in config_options]
1041
1042        if handler.ready:
1043            args.extend(handler.args)
1044
1045        if extra_conf_files:
1046            args.append(f"CONF_FILE=\"{';'.join(extra_conf_files)}\"")
1047
1048        if extra_dtc_overlay_files:
1049            args.append(f"DTC_OVERLAY_FILE=\"{';'.join(extra_dtc_overlay_files)}\"")
1050
1051        # merge overlay files into one variable
1052        overlays = extra_overlay_confs.copy()
1053
1054        additional_overlay_path = os.path.join(
1055            build_dir, "twister", "testsuite_extra.conf"
1056        )
1057        if os.path.exists(additional_overlay_path):
1058            overlays.append(additional_overlay_path)
1059
1060        if overlays:
1061            args.append("OVERLAY_CONFIG=\"%s\"" % (" ".join(overlays)))
1062
1063        # Build the final argument list
1064        args_expanded.extend(["-D{}".format(a.replace('"', '\"')) for a in cmake_extra_args])
1065        args_expanded.extend(["-D{}".format(a.replace('"', '')) for a in args])
1066
1067        return args_expanded
1068
1069    def cmake(self, filter_stages=[]):
1070        args = self.cmake_assemble_args(
1071            self.testsuite.extra_args.copy(), # extra_args from YAML
1072            self.instance.handler,
1073            self.testsuite.extra_conf_files,
1074            self.testsuite.extra_overlay_confs,
1075            self.testsuite.extra_dtc_overlay_files,
1076            self.options.extra_args, # CMake extra args
1077            self.instance.build_dir,
1078        )
1079        return self.run_cmake(args,filter_stages)
1080
1081    def build(self):
1082        harness = HarnessImporter.get_harness(self.instance.testsuite.harness.capitalize())
1083        build_result = self.run_build(['--build', self.build_dir])
1084        try:
1085            if harness:
1086                harness.instance = self.instance
1087                harness.build()
1088        except ConfigurationError as error:
1089            self.instance.status = "error"
1090            self.instance.reason = str(error)
1091            logger.error(self.instance.reason)
1092            return
1093        return build_result
1094
1095    def run(self):
1096
1097        instance = self.instance
1098
1099        if instance.handler.ready:
1100            logger.debug(f"Reset instance status from '{instance.status}' to None before run.")
1101            instance.status = None
1102
1103            if instance.handler.type_str == "device":
1104                instance.handler.duts = self.duts
1105
1106            if(self.options.seed is not None and instance.platform.name.startswith("native_")):
1107                self.parse_generated()
1108                if('CONFIG_FAKE_ENTROPY_NATIVE_POSIX' in self.defconfig and
1109                    self.defconfig['CONFIG_FAKE_ENTROPY_NATIVE_POSIX'] == 'y'):
1110                    instance.handler.seed = self.options.seed
1111
1112            if self.options.extra_test_args and instance.platform.arch == "posix":
1113                instance.handler.extra_test_args = self.options.extra_test_args
1114
1115            harness = HarnessImporter.get_harness(instance.testsuite.harness.capitalize())
1116            try:
1117                harness.configure(instance)
1118            except ConfigurationError as error:
1119                instance.status = "error"
1120                instance.reason = str(error)
1121                logger.error(instance.reason)
1122                return
1123            #
1124            if isinstance(harness, Pytest):
1125                harness.pytest_run(instance.handler.get_test_timeout())
1126            else:
1127                instance.handler.handle(harness)
1128
1129        sys.stdout.flush()
1130
1131    def gather_metrics(self, instance: TestInstance):
1132        if self.options.create_rom_ram_report:
1133            self.run_build(['--build', self.build_dir, "--target", "footprint"])
1134        if self.options.enable_size_report and not self.options.cmake_only:
1135            self.calc_size(instance=instance, from_buildlog=self.options.footprint_from_buildlog)
1136        else:
1137            instance.metrics["used_ram"] = 0
1138            instance.metrics["used_rom"] = 0
1139            instance.metrics["available_rom"] = 0
1140            instance.metrics["available_ram"] = 0
1141            instance.metrics["unrecognized"] = []
1142
1143    @staticmethod
1144    def calc_size(instance: TestInstance, from_buildlog: bool):
1145        if instance.status not in ["error", "failed", "skipped"]:
1146            if not instance.platform.type in ["native", "qemu", "unit"]:
1147                generate_warning = bool(instance.platform.type == "mcu")
1148                size_calc = instance.calculate_sizes(from_buildlog=from_buildlog, generate_warning=generate_warning)
1149                instance.metrics["used_ram"] = size_calc.get_used_ram()
1150                instance.metrics["used_rom"] = size_calc.get_used_rom()
1151                instance.metrics["available_rom"] = size_calc.get_available_rom()
1152                instance.metrics["available_ram"] = size_calc.get_available_ram()
1153                instance.metrics["unrecognized"] = size_calc.unrecognized_sections()
1154            else:
1155                instance.metrics["used_ram"] = 0
1156                instance.metrics["used_rom"] = 0
1157                instance.metrics["available_rom"] = 0
1158                instance.metrics["available_ram"] = 0
1159                instance.metrics["unrecognized"] = []
1160            instance.metrics["handler_time"] = instance.execution_time
1161
1162class TwisterRunner:
1163
1164    def __init__(self, instances, suites, env=None) -> None:
1165        self.pipeline = None
1166        self.options = env.options
1167        self.env = env
1168        self.instances = instances
1169        self.suites = suites
1170        self.duts = None
1171        self.jobs = 1
1172        self.results = None
1173        self.jobserver = None
1174
1175    def run(self):
1176
1177        retries = self.options.retry_failed + 1
1178
1179        BaseManager.register('LifoQueue', queue.LifoQueue)
1180        manager = BaseManager()
1181        manager.start()
1182
1183        self.results = ExecutionCounter(total=len(self.instances))
1184        self.iteration = 0
1185        pipeline = manager.LifoQueue()
1186        done_queue = manager.LifoQueue()
1187
1188        # Set number of jobs
1189        if self.options.jobs:
1190            self.jobs = self.options.jobs
1191        elif self.options.build_only:
1192            self.jobs = multiprocessing.cpu_count() * 2
1193        else:
1194            self.jobs = multiprocessing.cpu_count()
1195
1196        if sys.platform == "linux":
1197            if os.name == 'posix':
1198                self.jobserver = GNUMakeJobClient.from_environ(jobs=self.options.jobs)
1199                if not self.jobserver:
1200                    self.jobserver = GNUMakeJobServer(self.jobs)
1201                elif self.jobserver.jobs:
1202                    self.jobs = self.jobserver.jobs
1203            # TODO: Implement this on windows/mac also
1204            else:
1205                self.jobserver = JobClient()
1206
1207            logger.info("JOBS: %d", self.jobs)
1208
1209        self.update_counting_before_pipeline()
1210
1211        while True:
1212            self.results.iteration += 1
1213
1214            if self.results.iteration > 1:
1215                logger.info("%d Iteration:" % (self.results.iteration))
1216                time.sleep(self.options.retry_interval)  # waiting for the system to settle down
1217                self.results.done = self.results.total - self.results.failed - self.results.error
1218                self.results.failed = 0
1219                if self.options.retry_build_errors:
1220                    self.results.error = 0
1221            else:
1222                self.results.done = self.results.skipped_filter
1223
1224            self.execute(pipeline, done_queue)
1225
1226            while True:
1227                try:
1228                    inst = done_queue.get_nowait()
1229                except queue.Empty:
1230                    break
1231                else:
1232                    inst.metrics.update(self.instances[inst.name].metrics)
1233                    inst.metrics["handler_time"] = inst.execution_time
1234                    inst.metrics["unrecognized"] = []
1235                    self.instances[inst.name] = inst
1236
1237            print("")
1238
1239            retry_errors = False
1240            if self.results.error and self.options.retry_build_errors:
1241                retry_errors = True
1242
1243            retries = retries - 1
1244            if retries == 0 or ( self.results.failed == 0 and not retry_errors):
1245                break
1246
1247        self.show_brief()
1248
1249    def update_counting_before_pipeline(self):
1250        '''
1251        Updating counting before pipeline is necessary because statically filterd
1252        test instance never enter the pipeline. While some pipeline output needs
1253        the static filter stats. So need to prepare them before pipline starts.
1254        '''
1255        for instance in self.instances.values():
1256            if instance.status == 'filtered' and not instance.reason == 'runtime filter':
1257                self.results.skipped_filter += 1
1258                self.results.skipped_configs += 1
1259                self.results.skipped_cases += len(instance.testsuite.testcases)
1260                self.results.cases += len(instance.testsuite.testcases)
1261            elif instance.status == 'error':
1262                self.results.error += 1
1263
1264    def show_brief(self):
1265        logger.info("%d test scenarios (%d test instances) selected, "
1266                    "%d configurations skipped (%d by static filter, %d at runtime)." %
1267                    (len(self.suites), len(self.instances),
1268                    self.results.skipped_configs,
1269                    self.results.skipped_filter,
1270                    self.results.skipped_configs - self.results.skipped_filter))
1271
1272    def add_tasks_to_queue(self, pipeline, build_only=False, test_only=False, retry_build_errors=False):
1273        for instance in self.instances.values():
1274            if build_only:
1275                instance.run = False
1276
1277            no_retry_statuses = ['passed', 'skipped', 'filtered']
1278            if not retry_build_errors:
1279                no_retry_statuses.append("error")
1280
1281            if instance.status not in no_retry_statuses:
1282                logger.debug(f"adding {instance.name}")
1283                if instance.status:
1284                    instance.retries += 1
1285                instance.status = None
1286
1287                # Check if cmake package_helper script can be run in advance.
1288                instance.filter_stages = []
1289                if instance.testsuite.filter:
1290                    instance.filter_stages = self.get_cmake_filter_stages(instance.testsuite.filter, expr_parser.reserved.keys())
1291
1292                if test_only and instance.run:
1293                    pipeline.put({"op": "run", "test": instance})
1294                elif instance.filter_stages and "full" not in instance.filter_stages:
1295                    pipeline.put({"op": "filter", "test": instance})
1296                else:
1297                    cache_file = os.path.join(instance.build_dir, "CMakeCache.txt")
1298                    if os.path.exists(cache_file) and self.env.options.aggressive_no_clean:
1299                        pipeline.put({"op": "build", "test": instance})
1300                    else:
1301                        pipeline.put({"op": "cmake", "test": instance})
1302
1303
1304    def pipeline_mgr(self, pipeline, done_queue, lock, results):
1305        if sys.platform == 'linux':
1306            with self.jobserver.get_job():
1307                while True:
1308                    try:
1309                        task = pipeline.get_nowait()
1310                    except queue.Empty:
1311                        break
1312                    else:
1313                        instance = task['test']
1314                        pb = ProjectBuilder(instance, self.env, self.jobserver)
1315                        pb.duts = self.duts
1316                        pb.process(pipeline, done_queue, task, lock, results)
1317
1318                return True
1319        else:
1320            while True:
1321                try:
1322                    task = pipeline.get_nowait()
1323                except queue.Empty:
1324                    break
1325                else:
1326                    instance = task['test']
1327                    pb = ProjectBuilder(instance, self.env, self.jobserver)
1328                    pb.duts = self.duts
1329                    pb.process(pipeline, done_queue, task, lock, results)
1330            return True
1331
1332    def execute(self, pipeline, done):
1333        lock = Lock()
1334        logger.info("Adding tasks to the queue...")
1335        self.add_tasks_to_queue(pipeline, self.options.build_only, self.options.test_only,
1336                                retry_build_errors=self.options.retry_build_errors)
1337        logger.info("Added initial list of jobs to queue")
1338
1339        processes = []
1340
1341        for _ in range(self.jobs):
1342            p = Process(target=self.pipeline_mgr, args=(pipeline, done, lock, self.results, ))
1343            processes.append(p)
1344            p.start()
1345        logger.debug(f"Launched {self.jobs} jobs")
1346
1347        try:
1348            for p in processes:
1349                p.join()
1350        except KeyboardInterrupt:
1351            logger.info("Execution interrupted")
1352            for p in processes:
1353                p.terminate()
1354
1355    @staticmethod
1356    def get_cmake_filter_stages(filt, logic_keys):
1357        """ Analyze filter expressions from test yaml and decide if dts and/or kconfig based filtering will be needed."""
1358        dts_required = False
1359        kconfig_required = False
1360        full_required = False
1361        filter_stages = []
1362
1363        # Compress args in expressions like "function('x', 'y')" so they are not split when splitting by whitespaces
1364        filt = filt.replace(", ", ",")
1365        # Remove logic words
1366        for k in logic_keys:
1367            filt = filt.replace(f"{k} ", "")
1368        # Remove brackets
1369        filt = filt.replace("(", "")
1370        filt = filt.replace(")", "")
1371        # Splite by whitespaces
1372        filt = filt.split()
1373        for expression in filt:
1374            if expression.startswith("dt_"):
1375                dts_required = True
1376            elif expression.startswith("CONFIG"):
1377                kconfig_required = True
1378            else:
1379                full_required = True
1380
1381        if full_required:
1382            return ["full"]
1383        if dts_required:
1384            filter_stages.append("dts")
1385        if kconfig_required:
1386            filter_stages.append("kconfig")
1387
1388        return filter_stages
1389