1# vim: set syntax=python ts=4 :
2#
3# Copyright (c) 2018-2025 Intel Corporation
4# Copyright 2022 NXP
5# SPDX-License-Identifier: Apache-2.0
6
7import logging
8import multiprocessing
9import os
10import pathlib
11import pickle
12import queue
13import re
14import shutil
15import subprocess
16import sys
17import time
18import traceback
19from math import log10
20from multiprocessing import Lock, Process, Value
21from multiprocessing.managers import BaseManager
22
23import elftools
24import yaml
25from colorama import Fore
26from elftools.elf.elffile import ELFFile
27from elftools.elf.sections import SymbolTableSection
28from packaging import version
29from twisterlib.cmakecache import CMakeCache
30from twisterlib.environment import canonical_zephyr_base
31from twisterlib.error import BuildError, ConfigurationError, StatusAttributeError
32from twisterlib.log_helper import setup_logging
33from twisterlib.statuses import TwisterStatus
34
35if version.parse(elftools.__version__) < version.parse('0.24'):
36    sys.exit("pyelftools is out of date, need version 0.24 or later")
37
38# Job server only works on Linux for now.
39if sys.platform == 'linux':
40    from twisterlib.jobserver import GNUMakeJobClient, GNUMakeJobServer, JobClient
41
42from twisterlib.environment import ZEPHYR_BASE
43
44sys.path.insert(0, os.path.join(ZEPHYR_BASE, "scripts/pylib/build_helpers"))
45from domains import Domains
46from twisterlib.coverage import run_coverage_instance
47from twisterlib.environment import TwisterEnv
48from twisterlib.harness import Ctest, HarnessImporter, Pytest
49from twisterlib.log_helper import log_command
50from twisterlib.platform import Platform
51from twisterlib.testinstance import TestInstance
52from twisterlib.testplan import change_skip_to_error_if_integration
53from twisterlib.testsuite import TestSuite
54
55try:
56    from yaml import CSafeLoader as SafeLoader
57except ImportError:
58    from yaml import SafeLoader
59
60import expr_parser
61from anytree import Node, RenderTree
62
63logger = logging.getLogger('twister')
64logger.setLevel(logging.DEBUG)
65
66
67class ExecutionCounter:
68    def __init__(self, total=0):
69        '''
70        Most of the stats are at test instance level
71        Except that case statistics are for cases of ALL test instances
72
73        total = yaml test scenarios * applicable platforms
74        done := instances that reached report_out stage of the pipeline
75        done = filtered_configs + passed + failed + error
76        completed = done - filtered_static
77        filtered_configs = filtered_runtime + filtered_static
78
79        pass rate = passed / (total - filtered_configs)
80        case pass rate = passed_cases / (cases - filtered_cases - skipped_cases)
81        '''
82        # instances that go through the pipeline
83        # updated by report_out()
84        self._done = Value('i', 0)
85
86        # iteration
87        self._iteration = Value('i', 0)
88
89        # instances that actually executed and passed
90        # updated by report_out()
91        self._passed = Value('i', 0)
92
93        # instances that are built but not runnable
94        # updated by report_out()
95        self._notrun = Value('i', 0)
96
97        # static filter + runtime filter + build skipped
98        # updated by update_counting_before_pipeline() and report_out()
99        self._filtered_configs = Value('i', 0)
100
101        # cmake filter + build skipped
102        # updated by report_out()
103        self._filtered_runtime = Value('i', 0)
104
105        # static filtered at yaml parsing time
106        # updated by update_counting_before_pipeline()
107        self._filtered_static = Value('i', 0)
108
109        # updated by report_out() in pipeline
110        self._error = Value('i', 0)
111        self._failed = Value('i', 0)
112        self._skipped = Value('i', 0)
113
114        # initialized to number of test instances
115        self._total = Value('i', total)
116
117        #######################################
118        # TestCase counters for all instances #
119        #######################################
120        # updated in report_out
121        self._cases = Value('i', 0)
122
123        # updated by update_counting_before_pipeline() and report_out()
124        self._skipped_cases = Value('i', 0)
125        self._filtered_cases = Value('i', 0)
126
127        # updated by report_out() in pipeline
128        self._passed_cases = Value('i', 0)
129        self._notrun_cases = Value('i', 0)
130        self._failed_cases = Value('i', 0)
131        self._error_cases = Value('i', 0)
132        self._blocked_cases = Value('i', 0)
133
134        # Incorrect statuses
135        self._none_cases = Value('i', 0)
136        self._started_cases = Value('i', 0)
137
138        self._warnings = Value('i', 0)
139
140        self.lock = Lock()
141
142    @staticmethod
143    def _find_number_length(n):
144        if n > 0:
145            length = int(log10(n))+1
146        elif n == 0:
147            length = 1
148        else:
149            length = int(log10(-n))+2
150        return length
151
152    def summary(self):
153        selected_cases = self.cases - self.filtered_cases
154        selected_configs = self.done - self.filtered_static - self.filtered_runtime
155
156
157        root = Node("Summary")
158
159        Node(f"Total test suites: {self.total}", parent=root)
160        processed_suites = Node(f"Processed test suites: {self.done}", parent=root)
161        filtered_suites = Node(
162            f"Filtered test suites: {self.filtered_configs}",
163            parent=processed_suites
164        )
165        Node(f"Filtered test suites (static): {self.filtered_static}", parent=filtered_suites)
166        Node(f"Filtered test suites (at runtime): {self.filtered_runtime}", parent=filtered_suites)
167        selected_suites = Node(f"Selected test suites: {selected_configs}", parent=processed_suites)
168        Node(f"Skipped test suites: {self.skipped}", parent=selected_suites)
169        Node(f"Passed test suites: {self.passed}", parent=selected_suites)
170        Node(f"Built only test suites: {self.notrun}", parent=selected_suites)
171        Node(f"Failed test suites: {self.failed}", parent=selected_suites)
172        Node(f"Errors in test suites: {self.error}", parent=selected_suites)
173
174        total_cases = Node(f"Total test cases: {self.cases}", parent=root)
175        Node(f"Filtered test cases: {self.filtered_cases}", parent=total_cases)
176        selected_cases_node = Node(f"Selected test cases: {selected_cases}", parent=total_cases)
177        Node(f"Passed test cases: {self.passed_cases}", parent=selected_cases_node)
178        Node(f"Skipped test cases: {self.skipped_cases}", parent=selected_cases_node)
179        Node(f"Built only test cases: {self.notrun_cases}", parent=selected_cases_node)
180        Node(f"Blocked test cases: {self.blocked_cases}", parent=selected_cases_node)
181        Node(f"Failed test cases: {self.failed_cases}", parent=selected_cases_node)
182        error_cases_node = Node(
183            f"Errors in test cases: {self.error_cases}",
184            parent=selected_cases_node
185        )
186
187        if self.none_cases or self.started_cases:
188            Node(
189                "The following test case statuses should not appear in a proper execution",
190                parent=error_cases_node
191            )
192        if self.none_cases:
193            Node(f"Statusless test cases: {self.none_cases}", parent=error_cases_node)
194        if self.started_cases:
195            Node(f"Test cases only started: {self.started_cases}", parent=error_cases_node)
196
197        for pre, _, node in RenderTree(root):
198            print(f"{pre}{node.name}")
199
200    @property
201    def warnings(self):
202        with self._warnings.get_lock():
203            return self._warnings.value
204
205    @warnings.setter
206    def warnings(self, value):
207        with self._warnings.get_lock():
208            self._warnings.value = value
209
210    def warnings_increment(self, value=1):
211        with self._warnings.get_lock():
212            self._warnings.value += value
213
214    @property
215    def cases(self):
216        with self._cases.get_lock():
217            return self._cases.value
218
219    @cases.setter
220    def cases(self, value):
221        with self._cases.get_lock():
222            self._cases.value = value
223
224    def cases_increment(self, value=1):
225        with self._cases.get_lock():
226            self._cases.value += value
227
228    @property
229    def skipped_cases(self):
230        with self._skipped_cases.get_lock():
231            return self._skipped_cases.value
232
233    @skipped_cases.setter
234    def skipped_cases(self, value):
235        with self._skipped_cases.get_lock():
236            self._skipped_cases.value = value
237
238    def skipped_cases_increment(self, value=1):
239        with self._skipped_cases.get_lock():
240            self._skipped_cases.value += value
241
242    @property
243    def filtered_cases(self):
244        with self._filtered_cases.get_lock():
245            return self._filtered_cases.value
246
247    @filtered_cases.setter
248    def filtered_cases(self, value):
249        with self._filtered_cases.get_lock():
250            self._filtered_cases.value = value
251
252    def filtered_cases_increment(self, value=1):
253        with self._filtered_cases.get_lock():
254            self._filtered_cases.value += value
255
256    @property
257    def passed_cases(self):
258        with self._passed_cases.get_lock():
259            return self._passed_cases.value
260
261    @passed_cases.setter
262    def passed_cases(self, value):
263        with self._passed_cases.get_lock():
264            self._passed_cases.value = value
265
266    def passed_cases_increment(self, value=1):
267        with self._passed_cases.get_lock():
268            self._passed_cases.value += value
269
270    @property
271    def notrun_cases(self):
272        with self._notrun_cases.get_lock():
273            return self._notrun_cases.value
274
275    @notrun_cases.setter
276    def notrun_cases(self, value):
277        with self._notrun.get_lock():
278            self._notrun.value = value
279
280    def notrun_cases_increment(self, value=1):
281        with self._notrun_cases.get_lock():
282            self._notrun_cases.value += value
283
284    @property
285    def failed_cases(self):
286        with self._failed_cases.get_lock():
287            return self._failed_cases.value
288
289    @failed_cases.setter
290    def failed_cases(self, value):
291        with self._failed_cases.get_lock():
292            self._failed_cases.value = value
293
294    def failed_cases_increment(self, value=1):
295        with self._failed_cases.get_lock():
296            self._failed_cases.value += value
297
298    @property
299    def error_cases(self):
300        with self._error_cases.get_lock():
301            return self._error_cases.value
302
303    @error_cases.setter
304    def error_cases(self, value):
305        with self._error_cases.get_lock():
306            self._error_cases.value = value
307
308    def error_cases_increment(self, value=1):
309        with self._error_cases.get_lock():
310            self._error_cases.value += value
311
312    @property
313    def blocked_cases(self):
314        with self._blocked_cases.get_lock():
315            return self._blocked_cases.value
316
317    @blocked_cases.setter
318    def blocked_cases(self, value):
319        with self._blocked_cases.get_lock():
320            self._blocked_cases.value = value
321
322    def blocked_cases_increment(self, value=1):
323        with self._blocked_cases.get_lock():
324            self._blocked_cases.value += value
325
326    @property
327    def none_cases(self):
328        with self._none_cases.get_lock():
329            return self._none_cases.value
330
331    @none_cases.setter
332    def none_cases(self, value):
333        with self._none_cases.get_lock():
334            self._none_cases.value = value
335
336    def none_cases_increment(self, value=1):
337        with self._none_cases.get_lock():
338            self._none_cases.value += value
339
340    @property
341    def started_cases(self):
342        with self._started_cases.get_lock():
343            return self._started_cases.value
344
345    @started_cases.setter
346    def started_cases(self, value):
347        with self._started_cases.get_lock():
348            self._started_cases.value = value
349
350    def started_cases_increment(self, value=1):
351        with self._started_cases.get_lock():
352            self._started_cases.value += value
353
354    @property
355    def skipped(self):
356        with self._skipped.get_lock():
357            return self._skipped.value
358
359    @skipped.setter
360    def skipped(self, value):
361        with self._skipped.get_lock():
362            self._skipped.value = value
363
364    def skipped_increment(self, value=1):
365        with self._skipped.get_lock():
366            self._skipped.value += value
367
368    @property
369    def error(self):
370        with self._error.get_lock():
371            return self._error.value
372
373    @error.setter
374    def error(self, value):
375        with self._error.get_lock():
376            self._error.value = value
377
378    def error_increment(self, value=1):
379        with self._error.get_lock():
380            self._error.value += value
381
382    @property
383    def iteration(self):
384        with self._iteration.get_lock():
385            return self._iteration.value
386
387    @iteration.setter
388    def iteration(self, value):
389        with self._iteration.get_lock():
390            self._iteration.value = value
391
392    def iteration_increment(self, value=1):
393        with self._iteration.get_lock():
394            self._iteration.value += value
395
396    @property
397    def done(self):
398        with self._done.get_lock():
399            return self._done.value
400
401    @done.setter
402    def done(self, value):
403        with self._done.get_lock():
404            self._done.value = value
405
406    def done_increment(self, value=1):
407        with self._done.get_lock():
408            self._done.value += value
409
410    @property
411    def passed(self):
412        with self._passed.get_lock():
413            return self._passed.value
414
415    @passed.setter
416    def passed(self, value):
417        with self._passed.get_lock():
418            self._passed.value = value
419
420    def passed_increment(self, value=1):
421        with self._passed.get_lock():
422            self._passed.value += value
423
424    @property
425    def notrun(self):
426        with self._notrun.get_lock():
427            return self._notrun.value
428
429    @notrun.setter
430    def notrun(self, value):
431        with self._notrun.get_lock():
432            self._notrun.value = value
433
434    def notrun_increment(self, value=1):
435        with self._notrun.get_lock():
436            self._notrun.value += value
437
438    @property
439    def filtered_configs(self):
440        with self._filtered_configs.get_lock():
441            return self._filtered_configs.value
442
443    @filtered_configs.setter
444    def filtered_configs(self, value):
445        with self._filtered_configs.get_lock():
446            self._filtered_configs.value = value
447
448    def filtered_configs_increment(self, value=1):
449        with self._filtered_configs.get_lock():
450            self._filtered_configs.value += value
451
452    @property
453    def filtered_static(self):
454        with self._filtered_static.get_lock():
455            return self._filtered_static.value
456
457    @filtered_static.setter
458    def filtered_static(self, value):
459        with self._filtered_static.get_lock():
460            self._filtered_static.value = value
461
462    def filtered_static_increment(self, value=1):
463        with self._filtered_static.get_lock():
464            self._filtered_static.value += value
465
466    @property
467    def filtered_runtime(self):
468        with self._filtered_runtime.get_lock():
469            return self._filtered_runtime.value
470
471    @filtered_runtime.setter
472    def filtered_runtime(self, value):
473        with self._filtered_runtime.get_lock():
474            self._filtered_runtime.value = value
475
476    def filtered_runtime_increment(self, value=1):
477        with self._filtered_runtime.get_lock():
478            self._filtered_runtime.value += value
479
480    @property
481    def failed(self):
482        with self._failed.get_lock():
483            return self._failed.value
484
485    @failed.setter
486    def failed(self, value):
487        with self._failed.get_lock():
488            self._failed.value = value
489
490    def failed_increment(self, value=1):
491        with self._failed.get_lock():
492            self._failed.value += value
493
494    @property
495    def total(self):
496        with self._total.get_lock():
497            return self._total.value
498
499    @total.setter
500    def total(self, value):
501        with self._total.get_lock():
502            self._total.value = value
503
504    def total_increment(self, value=1):
505        with self._total.get_lock():
506            self._total.value += value
507
508class CMake:
509    config_re = re.compile('(CONFIG_[A-Za-z0-9_]+)[=]\"?([^\"]*)\"?$')
510    dt_re = re.compile('([A-Za-z0-9_]+)[=]\"?([^\"]*)\"?$')
511
512    def __init__(self, testsuite: TestSuite, platform: Platform, source_dir, build_dir, jobserver):
513
514        self.cwd = None
515        self.capture_output = True
516
517        self.defconfig = {}
518        self.cmake_cache = {}
519
520        self.instance = None
521        self.testsuite = testsuite
522        self.platform = platform
523        self.source_dir = source_dir
524        self.build_dir = build_dir
525        self.log = "build.log"
526
527        self.default_encoding = sys.getdefaultencoding()
528        self.jobserver = jobserver
529
530    def parse_generated(self, filter_stages=None):
531        self.defconfig = {}
532        return {}
533
534    def run_build(self, args=None):
535        if args is None:
536            args = []
537
538        logger.debug(f"Building {self.source_dir} for {self.platform.name}")
539
540        cmake_args = []
541        cmake_args.extend(args)
542        cmake = shutil.which('cmake')
543        cmd = [cmake] + cmake_args
544        kwargs = dict()
545
546        if self.capture_output:
547            kwargs['stdout'] = subprocess.PIPE
548            # CMake sends the output of message() to stderr unless it's STATUS
549            kwargs['stderr'] = subprocess.STDOUT
550
551        if self.cwd:
552            kwargs['cwd'] = self.cwd
553
554        start_time = time.time()
555        if sys.platform == 'linux':
556            p = self.jobserver.popen(cmd, **kwargs)
557        else:
558            p = subprocess.Popen(cmd, **kwargs)
559        logger.debug(f'Running {" ".join(cmd)}')
560
561        out, _ = p.communicate()
562
563        ret = {}
564        duration = time.time() - start_time
565        self.instance.build_time += duration
566        if p.returncode == 0:
567            msg = (
568                f"Finished building {self.source_dir} for {self.platform.name}"
569                f" in {duration:.2f} seconds"
570            )
571            logger.debug(msg)
572
573            if not self.instance.run:
574                self.instance.status = TwisterStatus.NOTRUN
575                self.instance.add_missing_case_status(TwisterStatus.NOTRUN, "Test was built only")
576            else:
577                self.instance.status = TwisterStatus.PASS
578            ret = {"returncode": p.returncode}
579
580            if out:
581                log_msg = out.decode(self.default_encoding)
582                with open(
583                    os.path.join(self.build_dir, self.log),
584                    "a",
585                    encoding=self.default_encoding
586                ) as log:
587                    log.write(log_msg)
588            else:
589                return None
590        else:
591            # A real error occurred, raise an exception
592            log_msg = ""
593            if out:
594                log_msg = out.decode(self.default_encoding)
595                with open(
596                    os.path.join(self.build_dir, self.log),
597                    "a",
598                    encoding=self.default_encoding
599                ) as log:
600                    log.write(log_msg)
601
602            if log_msg:
603                pattern = (
604                    r"region `(FLASH|ROM|RAM|ICCM|DCCM|SRAM|"
605                    r"dram\d_\d_seg|iram\d_\d_seg)' "
606                    "overflowed by"
607                )
608                overflow_found = re.findall(pattern, log_msg)
609
610                imgtool_overflow_found = re.findall(
611                    r"Error: Image size \(.*\) \+ trailer \(.*\) exceeds requested size",
612                    log_msg
613                )
614                if overflow_found and not self.options.overflow_as_errors:
615                    logger.debug(f"Test skipped due to {overflow_found[0]} Overflow")
616                    self.instance.status = TwisterStatus.SKIP
617                    self.instance.reason = f"{overflow_found[0]} overflow"
618                    change_skip_to_error_if_integration(self.options, self.instance)
619                elif imgtool_overflow_found and not self.options.overflow_as_errors:
620                    self.instance.status = TwisterStatus.SKIP
621                    self.instance.reason = "imgtool overflow"
622                    change_skip_to_error_if_integration(self.options, self.instance)
623                else:
624                    self.instance.status = TwisterStatus.ERROR
625                    self.instance.reason = "Build failure"
626
627            ret = {
628                "returncode": p.returncode
629            }
630
631        return ret
632
633    def run_cmake(self, args="", filter_stages=None):
634        if filter_stages is None:
635            filter_stages = []
636
637        if not self.options.disable_warnings_as_errors:
638            warnings_as_errors = 'y'
639            gen_edt_args = "--edtlib-Werror"
640        else:
641            warnings_as_errors = 'n'
642            gen_edt_args = ""
643
644        warning_command = 'CONFIG_COMPILER_WARNINGS_AS_ERRORS'
645        if self.instance.sysbuild:
646            warning_command = 'SB_' + warning_command
647
648        logger.debug(f"Running cmake on {self.source_dir} for {self.platform.name}")
649        cmake_args = [
650            f'-B{self.build_dir}',
651            f'-DTC_RUNID={self.instance.run_id}',
652            f'-DTC_NAME={self.instance.testsuite.name}',
653            f'-D{warning_command}={warnings_as_errors}',
654            f'-DEXTRA_GEN_EDT_ARGS={gen_edt_args}',
655            f'-G{self.env.generator}',
656            f'-DPython3_EXECUTABLE={pathlib.Path(sys.executable).as_posix()}'
657        ]
658
659        if self.instance.testsuite.harness == 'bsim':
660            cmake_args.extend([
661                '-DCMAKE_EXPORT_COMPILE_COMMANDS=ON',
662                '-DCONFIG_ASSERT=y',
663                '-DCONFIG_COVERAGE=y'
664            ])
665
666        if self.instance.toolchain:
667            cmake_args.append(f'-DZEPHYR_TOOLCHAIN_VARIANT={self.instance.toolchain}')
668
669        # If needed, run CMake using the package_helper script first, to only run
670        # a subset of all cmake modules. This output will be used to filter
671        # testcases, and the full CMake configuration will be run for
672        # testcases that should be built
673        if filter_stages:
674            cmake_filter_args = [
675                f'-DMODULES={",".join(filter_stages)}',
676                f'-P{canonical_zephyr_base}/cmake/package_helper.cmake',
677            ]
678
679        if self.instance.sysbuild and not filter_stages:
680            logger.debug(f"Building {self.source_dir} using sysbuild")
681            source_args = [
682                f'-S{canonical_zephyr_base}/share/sysbuild',
683                f'-DAPP_DIR={self.source_dir}'
684            ]
685        else:
686            source_args = [
687                f'-S{self.source_dir}'
688            ]
689        cmake_args.extend(source_args)
690
691        cmake_args.extend(args)
692
693        cmake_opts = [f'-DBOARD={self.platform.name}']
694        cmake_args.extend(cmake_opts)
695
696        if self.instance.testsuite.required_snippets:
697            cmake_opts = [
698                '-DSNIPPET={}'.format(';'.join(self.instance.testsuite.required_snippets))
699            ]
700            cmake_args.extend(cmake_opts)
701
702        cmake = shutil.which('cmake')
703        cmd = [cmake] + cmake_args
704
705        if filter_stages:
706            cmd += cmake_filter_args
707
708        kwargs = dict()
709
710        log_command(logger, "Calling cmake", cmd)
711
712        if self.capture_output:
713            kwargs['stdout'] = subprocess.PIPE
714            # CMake sends the output of message() to stderr unless it's STATUS
715            kwargs['stderr'] = subprocess.STDOUT
716
717        if self.cwd:
718            kwargs['cwd'] = self.cwd
719
720        start_time = time.time()
721        if sys.platform == 'linux':
722            p = self.jobserver.popen(cmd, **kwargs)
723        else:
724            p = subprocess.Popen(cmd, **kwargs)
725        out, _ = p.communicate()
726
727        duration = time.time() - start_time
728        self.instance.build_time += duration
729
730        if p.returncode == 0:
731            filter_results = self.parse_generated(filter_stages)
732            msg = (
733                f"Finished running cmake {self.source_dir} for {self.platform.name}"
734                f" in {duration:.2f} seconds"
735            )
736            logger.debug(msg)
737            ret = {
738                    'returncode': p.returncode,
739                    'filter': filter_results
740                    }
741        else:
742            self.instance.status = TwisterStatus.ERROR
743            self.instance.reason = "CMake build failure"
744
745            for tc in self.instance.testcases:
746                tc.status = self.instance.status
747
748            logger.error(f"CMake build failure: {self.source_dir} for {self.platform.name}")
749            ret = {"returncode": p.returncode}
750
751        if out:
752            os.makedirs(self.build_dir, exist_ok=True)
753            with open(
754                os.path.join(self.build_dir, self.log),
755                "a",
756                encoding=self.default_encoding
757            ) as log:
758                log_msg = out.decode(self.default_encoding)
759                log.write(log_msg)
760
761        return ret
762
763
764class FilterBuilder(CMake):
765
766    def __init__(self, testsuite: TestSuite, platform: Platform, source_dir, build_dir, jobserver):
767        super().__init__(testsuite, platform, source_dir, build_dir, jobserver)
768
769        self.log = "config-twister.log"
770
771    def parse_generated(self, filter_stages=None):
772        if filter_stages is None:
773            filter_stages = []
774
775        if self.platform.name == "unit_testing":
776            return {}
777
778        if self.instance.sysbuild and not filter_stages:
779            # Load domain yaml to get default domain build directory
780            domain_path = os.path.join(self.build_dir, "domains.yaml")
781            domains = Domains.from_file(domain_path)
782            logger.debug(f"Loaded sysbuild domain data from {domain_path}")
783            self.instance.domains = domains
784            domain_build = domains.get_default_domain().build_dir
785            cmake_cache_path = os.path.join(domain_build, "CMakeCache.txt")
786            defconfig_path = os.path.join(domain_build, "zephyr", ".config")
787            edt_pickle = os.path.join(domain_build, "zephyr", "edt.pickle")
788        else:
789            cmake_cache_path = os.path.join(self.build_dir, "CMakeCache.txt")
790            # .config is only available after kconfig stage in cmake.
791            # If only dt based filtration is required package helper call won't produce .config
792            if not filter_stages or "kconfig" in filter_stages:
793                defconfig_path = os.path.join(self.build_dir, "zephyr", ".config")
794            # dt is compiled before kconfig,
795            # so edt_pickle is available regardless of choice of filter stages
796            edt_pickle = os.path.join(self.build_dir, "zephyr", "edt.pickle")
797
798
799        if not filter_stages or "kconfig" in filter_stages:
800            with open(defconfig_path) as fp:
801                defconfig = {}
802                for line in fp.readlines():
803                    m = self.config_re.match(line)
804                    if not m:
805                        if line.strip() and not line.startswith("#"):
806                            sys.stderr.write(f"Unrecognized line {line}\n")
807                        continue
808                    defconfig[m.group(1)] = m.group(2).strip()
809
810            self.defconfig = defconfig
811
812        cmake_conf = {}
813        try:
814            cache = CMakeCache.from_file(cmake_cache_path)
815        except FileNotFoundError:
816            cache = {}
817
818        for k in iter(cache):
819            cmake_conf[k.name] = k.value
820
821        self.cmake_cache = cmake_conf
822
823        filter_data = {
824            "ARCH": self.platform.arch,
825            "PLATFORM": self.platform.name
826        }
827        filter_data.update(os.environ)
828        if not filter_stages or "kconfig" in filter_stages:
829            filter_data.update(self.defconfig)
830        filter_data.update(self.cmake_cache)
831
832        # Verify that twister's arguments support sysbuild.
833        # Twister sysbuild flashing currently only works with west,
834        # so --west-flash must be passed.
835        if (
836            self.instance.sysbuild
837            and self.env.options.device_testing
838            and self.env.options.west_flash is None
839        ):
840            logger.warning("Sysbuild test will be skipped. West must be used for flashing.")
841            return {
842                os.path.join(
843                    self.platform.name,
844                    self.instance.toolchain,
845                    self.testsuite.name
846                ): True
847            }
848
849        if self.testsuite and self.testsuite.filter:
850            try:
851                if os.path.exists(edt_pickle):
852                    with open(edt_pickle, 'rb') as f:
853                        edt = pickle.load(f)
854                else:
855                    edt = None
856                ret = expr_parser.parse(self.testsuite.filter, filter_data, edt)
857
858            except (ValueError, SyntaxError) as se:
859                sys.stderr.write(f"Failed processing {self.testsuite.yamlfile}\n")
860                raise se
861
862            if not ret:
863                return {
864                    os.path.join(
865                        self.platform.name,
866                        self.instance.toolchain,
867                        self.testsuite.name
868                    ): True
869                }
870            else:
871                return {
872                    os.path.join(
873                        self.platform.name,
874                        self.instance.toolchain,
875                        self.testsuite.name
876                    ): False
877                }
878        else:
879            self.platform.filter_data = filter_data
880            return filter_data
881
882
883class ProjectBuilder(FilterBuilder):
884
885    def __init__(self, instance: TestInstance, env: TwisterEnv, jobserver, **kwargs):
886        super().__init__(
887            instance.testsuite,
888            instance.platform,
889            instance.testsuite.source_dir,
890            instance.build_dir,
891            jobserver
892        )
893
894        self.log = "build.log"
895        self.instance = instance
896        self.filtered_tests = 0
897        self.options = env.options
898        self.env = env
899        self.duts = None
900
901    @property
902    def trace(self) -> bool:
903        return self.options.verbose > 2
904
905    def log_info(self, filename, inline_logs, log_testcases=False):
906        filename = os.path.abspath(os.path.realpath(filename))
907        if inline_logs:
908            logger.info(f"{filename:-^100}")
909
910            try:
911                with open(filename) as fp:
912                    data = fp.read()
913            except Exception as e:
914                data = f"Unable to read log data ({e!s})\n"
915
916            # Remove any coverage data from the dumped logs
917            data = re.sub(
918                r"GCOV_COVERAGE_DUMP_START.*GCOV_COVERAGE_DUMP_END",
919                "GCOV_COVERAGE_DUMP_START\n...\nGCOV_COVERAGE_DUMP_END",
920                data,
921                flags=re.DOTALL,
922            )
923            logger.error(data)
924
925            logger.info(f"{filename:-^100}")
926
927            if log_testcases:
928                for tc in self.instance.testcases:
929                    if not tc.reason:
930                        continue
931                    logger.info(
932                        f"\n{str(tc.name).center(100, '_')}\n"
933                        f"{tc.reason}\n"
934                        f"{100*'_'}\n"
935                        f"{tc.output}"
936                    )
937        else:
938            logger.error("see: " + Fore.YELLOW + filename + Fore.RESET)
939
940    def log_info_file(self, inline_logs):
941        build_dir = self.instance.build_dir
942        h_log = f"{build_dir}/handler.log"
943        he_log = f"{build_dir}/handler_stderr.log"
944        b_log = f"{build_dir}/build.log"
945        v_log = f"{build_dir}/valgrind.log"
946        d_log = f"{build_dir}/device.log"
947        pytest_log = f"{build_dir}/twister_harness.log"
948
949        if os.path.exists(v_log) and "Valgrind" in self.instance.reason:
950            self.log_info(f"{v_log}", inline_logs)
951        elif os.path.exists(pytest_log) and os.path.getsize(pytest_log) > 0:
952            self.log_info(f"{pytest_log}", inline_logs, log_testcases=True)
953        elif os.path.exists(h_log) and os.path.getsize(h_log) > 0:
954            self.log_info(f"{h_log}", inline_logs)
955        elif os.path.exists(he_log) and os.path.getsize(he_log) > 0:
956            self.log_info(f"{he_log}", inline_logs)
957        elif os.path.exists(d_log) and os.path.getsize(d_log) > 0:
958            self.log_info(f"{d_log}", inline_logs)
959        else:
960            self.log_info(f"{b_log}", inline_logs)
961
962
963    def _add_to_pipeline(self, pipeline, op: str, additionals: dict=None):
964        if additionals is None:
965            additionals = {}
966        try:
967            if op:
968                task = dict({'op': op, 'test': self.instance}, **additionals)
969                pipeline.put(task)
970        # Only possible RuntimeError source here is a mutation of the pipeline during iteration.
971        # If that happens, we ought to consider the whole pipeline corrupted.
972        except RuntimeError as e:
973            logger.error(f"RuntimeError: {e}")
974            traceback.print_exc()
975
976
977    def process(self, pipeline, done, message, lock, results):
978        next_op = None
979        additionals = {}
980
981        op = message.get('op')
982        options = self.options
983        if not logger.handlers:
984            setup_logging(options.outdir, options.log_file, options.log_level, options.timestamps)
985        self.instance.setup_handler(self.env)
986
987        if op == "filter":
988            try:
989                ret = self.cmake(filter_stages=self.instance.filter_stages)
990                if self.instance.status in [TwisterStatus.FAIL, TwisterStatus.ERROR]:
991                    next_op = 'report'
992                else:
993                    # Here we check the dt/kconfig filter results coming from running cmake
994                    if self.instance.name in ret['filter'] and ret['filter'][self.instance.name]:
995                        logger.debug(f"filtering {self.instance.name}")
996                        self.instance.status = TwisterStatus.FILTER
997                        self.instance.reason = "runtime filter"
998                        results.filtered_runtime_increment()
999                        self.instance.add_missing_case_status(TwisterStatus.FILTER)
1000                        next_op = 'report'
1001                    else:
1002                        next_op = 'cmake'
1003            except StatusAttributeError as sae:
1004                logger.error(str(sae))
1005                self.instance.status = TwisterStatus.ERROR
1006                reason = 'Incorrect status assignment'
1007                self.instance.reason = reason
1008                self.instance.add_missing_case_status(TwisterStatus.BLOCK, reason)
1009                next_op = 'report'
1010            finally:
1011                self._add_to_pipeline(pipeline, next_op)
1012
1013        # The build process, call cmake and build with configured generator
1014        elif op == "cmake":
1015            try:
1016                ret = self.cmake()
1017                if self.instance.status in [TwisterStatus.FAIL, TwisterStatus.ERROR]:
1018                    next_op = 'report'
1019                elif self.options.cmake_only:
1020                    if self.instance.status == TwisterStatus.NONE:
1021                        logger.debug(f"CMake only: PASS {self.instance.name}")
1022                        self.instance.status = TwisterStatus.NOTRUN
1023                        self.instance.add_missing_case_status(TwisterStatus.NOTRUN, 'CMake only')
1024                    next_op = 'report'
1025                else:
1026                    # Here we check the runtime filter results coming from running cmake
1027                    if self.instance.name in ret['filter'] and ret['filter'][self.instance.name]:
1028                        logger.debug(f"filtering {self.instance.name}")
1029                        self.instance.status = TwisterStatus.FILTER
1030                        self.instance.reason = "runtime filter"
1031                        results.filtered_runtime_increment()
1032                        self.instance.add_missing_case_status(TwisterStatus.FILTER)
1033                        next_op = 'report'
1034                    else:
1035                        next_op = 'build'
1036            except StatusAttributeError as sae:
1037                logger.error(str(sae))
1038                self.instance.status = TwisterStatus.ERROR
1039                reason = 'Incorrect status assignment'
1040                self.instance.reason = reason
1041                self.instance.add_missing_case_status(TwisterStatus.BLOCK, reason)
1042                next_op = 'report'
1043            finally:
1044                self._add_to_pipeline(pipeline, next_op)
1045
1046        elif op == "build":
1047            try:
1048                logger.debug(f"build test: {self.instance.name}")
1049                ret = self.build()
1050                if not ret:
1051                    self.instance.status = TwisterStatus.ERROR
1052                    self.instance.reason = "Build Failure"
1053                    next_op = 'report'
1054                else:
1055                    # Count skipped cases during build, for example
1056                    # due to ram/rom overflow.
1057                    if  self.instance.status == TwisterStatus.SKIP:
1058                        results.skipped_increment()
1059                        self.instance.add_missing_case_status(
1060                            TwisterStatus.SKIP,
1061                            self.instance.reason
1062                        )
1063
1064                    if ret.get('returncode', 1) > 0:
1065                        self.instance.add_missing_case_status(
1066                            TwisterStatus.BLOCK,
1067                            self.instance.reason
1068                        )
1069                        next_op = 'report'
1070                    else:
1071                        if self.instance.testsuite.harness in ['ztest', 'test']:
1072                            logger.debug(
1073                                f"Determine test cases for test instance: {self.instance.name}"
1074                            )
1075                            try:
1076                                self.determine_testcases(results)
1077                                next_op = 'gather_metrics'
1078                            except BuildError as e:
1079                                logger.error(str(e))
1080                                self.instance.status = TwisterStatus.ERROR
1081                                self.instance.reason = str(e)
1082                                next_op = 'report'
1083                        else:
1084                            next_op = 'gather_metrics'
1085            except StatusAttributeError as sae:
1086                logger.error(str(sae))
1087                self.instance.status = TwisterStatus.ERROR
1088                reason = 'Incorrect status assignment'
1089                self.instance.reason = reason
1090                self.instance.add_missing_case_status(TwisterStatus.BLOCK, reason)
1091                next_op = 'report'
1092            finally:
1093                self._add_to_pipeline(pipeline, next_op)
1094
1095        elif op == "gather_metrics":
1096            try:
1097                ret = self.gather_metrics(self.instance)
1098                if not ret or ret.get('returncode', 1) > 0:
1099                    self.instance.status = TwisterStatus.ERROR
1100                    self.instance.reason = "Build Failure at gather_metrics."
1101                    next_op = 'report'
1102                elif self.instance.run and self.instance.handler.ready:
1103                    next_op = 'run'
1104                else:
1105                    if self.instance.status == TwisterStatus.NOTRUN:
1106                        run_conditions =  (
1107                            f"(run:{self.instance.run},"
1108                            f" handler.ready:{self.instance.handler.ready})"
1109                        )
1110                        logger.debug(f"Instance {self.instance.name} can't run {run_conditions}")
1111                        self.instance.add_missing_case_status(
1112                            TwisterStatus.NOTRUN,
1113                            "Nowhere to run"
1114                        )
1115                    next_op = 'report'
1116            except StatusAttributeError as sae:
1117                logger.error(str(sae))
1118                self.instance.status = TwisterStatus.ERROR
1119                reason = 'Incorrect status assignment'
1120                self.instance.reason = reason
1121                self.instance.add_missing_case_status(TwisterStatus.BLOCK, reason)
1122                next_op = 'report'
1123            finally:
1124                self._add_to_pipeline(pipeline, next_op)
1125
1126        # Run the generated binary using one of the supported handlers
1127        elif op == "run":
1128            try:
1129                logger.debug(f"run test: {self.instance.name}")
1130                self.run()
1131                logger.debug(f"run status: {self.instance.name} {self.instance.status}")
1132
1133                # to make it work with pickle
1134                self.instance.handler.thread = None
1135                self.instance.handler.duts = None
1136
1137                next_op = "coverage" if self.options.coverage else "report"
1138                additionals = {
1139                    "status": self.instance.status,
1140                    "reason": self.instance.reason
1141                }
1142            except StatusAttributeError as sae:
1143                logger.error(str(sae))
1144                self.instance.status = TwisterStatus.ERROR
1145                reason = 'Incorrect status assignment'
1146                self.instance.reason = reason
1147                self.instance.add_missing_case_status(TwisterStatus.BLOCK, reason)
1148                next_op = 'report'
1149                additionals = {}
1150            finally:
1151                self._add_to_pipeline(pipeline, next_op, additionals)
1152
1153        # Run per-instance code coverage
1154        elif op == "coverage":
1155            try:
1156                logger.debug(f"Run coverage for '{self.instance.name}'")
1157                self.instance.coverage_status, self.instance.coverage = \
1158                        run_coverage_instance(self.options, self.instance)
1159                next_op = 'report'
1160                additionals = {
1161                    "status": self.instance.status,
1162                    "reason": self.instance.reason
1163                }
1164            except StatusAttributeError as sae:
1165                logger.error(str(sae))
1166                self.instance.status = TwisterStatus.ERROR
1167                reason = f"Incorrect status assignment on {op}"
1168                self.instance.reason = reason
1169                self.instance.add_missing_case_status(TwisterStatus.BLOCK, reason)
1170                next_op = 'report'
1171                additionals = {}
1172            finally:
1173                self._add_to_pipeline(pipeline, next_op, additionals)
1174
1175        # Report results and output progress to screen
1176        elif op == "report":
1177            try:
1178                with lock:
1179                    done.put(self.instance)
1180                    self.report_out(results)
1181
1182                if not self.options.coverage:
1183                    if self.options.prep_artifacts_for_testing:
1184                        next_op = 'cleanup'
1185                        additionals = {"mode": "device"}
1186                    elif self.options.runtime_artifact_cleanup == "pass" and \
1187                        self.instance.status in [TwisterStatus.PASS, TwisterStatus.NOTRUN]:
1188                        next_op = 'cleanup'
1189                        additionals = {"mode": "passed"}
1190                    elif self.options.runtime_artifact_cleanup == "all":
1191                        next_op = 'cleanup'
1192                        additionals = {"mode": "all"}
1193            except StatusAttributeError as sae:
1194                logger.error(str(sae))
1195                self.instance.status = TwisterStatus.ERROR
1196                reason = 'Incorrect status assignment'
1197                self.instance.reason = reason
1198                self.instance.add_missing_case_status(TwisterStatus.BLOCK, reason)
1199                next_op = None
1200                additionals = {}
1201            finally:
1202                self._add_to_pipeline(pipeline, next_op, additionals)
1203
1204        elif op == "cleanup":
1205            try:
1206                mode = message.get("mode")
1207                if mode == "device":
1208                    self.cleanup_device_testing_artifacts()
1209                elif (
1210                    mode == "passed"
1211                    or (mode == "all" and self.instance.reason != "CMake build failure")
1212                ):
1213                    self.cleanup_artifacts(self.options.keep_artifacts)
1214            except StatusAttributeError as sae:
1215                logger.error(str(sae))
1216                self.instance.status = TwisterStatus.ERROR
1217                reason = 'Incorrect status assignment'
1218                self.instance.reason = reason
1219                self.instance.add_missing_case_status(TwisterStatus.BLOCK, reason)
1220
1221    def demangle(self, symbol_name):
1222        if symbol_name[:2] == '_Z':
1223            try:
1224                cpp_filt = subprocess.run(
1225                    'c++filt',
1226                    input=symbol_name,
1227                    text=True,
1228                    check=True,
1229                    capture_output=True
1230                )
1231                if self.trace:
1232                    logger.debug(f"Demangle: '{symbol_name}'==>'{cpp_filt.stdout}'")
1233                return cpp_filt.stdout.strip()
1234            except Exception as e:
1235                logger.error(f"Failed to demangle '{symbol_name}': {e}")
1236        return symbol_name
1237
1238    def determine_testcases(self, results):
1239        logger.debug(f"Determine test cases for test suite: {self.instance.testsuite.id}")
1240
1241        new_ztest_unit_test_regex = re.compile(r"z_ztest_unit_test__([^\s]+?)__([^\s]*)")
1242        detected_cases = []
1243
1244        elf_file = self.instance.get_elf_file()
1245        with open(elf_file, "rb") as elf_fp:
1246            elf = ELFFile(elf_fp)
1247
1248            for section in elf.iter_sections():
1249                if isinstance(section, SymbolTableSection):
1250                    for sym in section.iter_symbols():
1251                        # It is only meant for new ztest fx
1252                        # because only new ztest fx exposes test functions precisely.
1253                        m_ = new_ztest_unit_test_regex.search(sym.name)
1254                        if not m_:
1255                            continue
1256                        # Demangle C++ symbols
1257                        m_ = new_ztest_unit_test_regex.search(self.demangle(sym.name))
1258                        if not m_:
1259                            continue
1260                        # The 1st capture group is new ztest suite name.
1261                        # The 2nd capture group is new ztest unit test name.
1262                        new_ztest_suite = m_[1]
1263                        if self.trace and \
1264                           new_ztest_suite not in self.instance.testsuite.ztest_suite_names:
1265                            # This can happen if a ZTEST_SUITE name is macro-generated
1266                            # in the test source files, e.g. based on DT information.
1267                            logger.debug(
1268                                f"Unexpected Ztest suite '{new_ztest_suite}' is "
1269                                f"not present in: {self.instance.testsuite.ztest_suite_names}"
1270                            )
1271                        test_func_name = m_[2].replace("test_", "", 1)
1272                        testcase_id = self.instance.compose_case_name(
1273                            f"{new_ztest_suite}.{test_func_name}"
1274                        )
1275                        detected_cases.append(testcase_id)
1276
1277        logger.debug(
1278            f"Test instance {self.instance.name} already has {len(self.instance.testcases)} "
1279            f"testcase(s) known: {self.instance.testcases}"
1280        )
1281        if detected_cases:
1282            logger.debug(f"Detected {len(detected_cases)} Ztest case(s): "
1283                         f"[{', '.join(detected_cases)}] in {elf_file}")
1284            tc_keeper = {
1285                tc.name: {'status': tc.status, 'reason': tc.reason}
1286                for tc in self.instance.testcases
1287            }
1288            self.instance.testcases.clear()
1289            self.instance.testsuite.testcases.clear()
1290
1291            for testcase_id in detected_cases:
1292                testcase = self.instance.add_testcase(name=testcase_id)
1293                self.instance.testsuite.add_testcase(name=testcase_id)
1294
1295                # Keep previous statuses and reasons
1296                tc_info = tc_keeper.get(testcase_id, {})
1297                if not tc_info and self.trace:
1298                    # Also happens when Ztest uses macroses, eg. DEFINE_TEST_VARIANT
1299                    logger.debug(f"Ztest case '{testcase_id}' discovered for "
1300                                 f"'{self.instance.testsuite.source_dir_rel}' "
1301                                 f"with {list(tc_keeper)}")
1302                testcase.status = tc_info.get('status', TwisterStatus.NONE)
1303                testcase.reason = tc_info.get('reason')
1304
1305
1306    def cleanup_artifacts(self, additional_keep: list[str] = None):
1307        if additional_keep is None:
1308            additional_keep = []
1309        logger.debug(f"Cleaning up {self.instance.build_dir}")
1310        allow = [
1311            os.path.join('zephyr', '.config'),
1312            'handler.log',
1313            'handler_stderr.log',
1314            'build.log',
1315            'device.log',
1316            'recording.csv',
1317            'rom.json',
1318            'ram.json',
1319            'build_info.yml',
1320            'zephyr/zephyr.dts',
1321            # below ones are needed to make --test-only work as well
1322            'Makefile',
1323            'CMakeCache.txt',
1324            'build.ninja',
1325            os.path.join('CMakeFiles', 'rules.ninja')
1326            ]
1327
1328        allow += additional_keep
1329
1330        if self.options.runtime_artifact_cleanup == 'all':
1331            allow += [os.path.join('twister', 'testsuite_extra.conf')]
1332
1333        allow = [os.path.join(self.instance.build_dir, file) for file in allow]
1334
1335        for dirpath, dirnames, filenames in os.walk(self.instance.build_dir, topdown=False):
1336            for name in filenames:
1337                path = os.path.join(dirpath, name)
1338                if path not in allow:
1339                    os.remove(path)
1340            # Remove empty directories and symbolic links to directories
1341            for dir in dirnames:
1342                path = os.path.join(dirpath, dir)
1343                if os.path.islink(path):
1344                    os.remove(path)
1345                elif not os.listdir(path):
1346                    os.rmdir(path)
1347
1348    def cleanup_device_testing_artifacts(self):
1349        logger.debug(f"Cleaning up for Device Testing {self.instance.build_dir}")
1350
1351        files_to_keep = self._get_binaries()
1352        files_to_keep.append(os.path.join('zephyr', 'runners.yaml'))
1353
1354        if self.instance.sysbuild:
1355            files_to_keep.append('domains.yaml')
1356            for domain in self.instance.domains.get_domains():
1357                files_to_keep += self._get_artifact_allow_list_for_domain(domain.name)
1358
1359        self.cleanup_artifacts(files_to_keep)
1360
1361        self._sanitize_files()
1362
1363    def _get_artifact_allow_list_for_domain(self, domain: str) -> list[str]:
1364        """
1365        Return a list of files needed to test a given domain.
1366        """
1367        allow = [
1368            os.path.join(domain, 'build.ninja'),
1369            os.path.join(domain, 'CMakeCache.txt'),
1370            os.path.join(domain, 'CMakeFiles', 'rules.ninja'),
1371            os.path.join(domain, 'Makefile'),
1372            os.path.join(domain, 'zephyr', '.config'),
1373            os.path.join(domain, 'zephyr', 'runners.yaml')
1374            ]
1375        return allow
1376
1377    def _get_binaries(self) -> list[str]:
1378        """
1379        Get list of binaries paths (absolute or relative to the
1380        self.instance.build_dir), basing on information from platform.binaries
1381        or runners.yaml. If they are not found take default binaries like
1382        "zephyr/zephyr.hex" etc.
1383        """
1384        binaries: list[str] = []
1385
1386        platform = self.instance.platform
1387        if platform.binaries:
1388            for binary in platform.binaries:
1389                binaries.append(os.path.join('zephyr', binary))
1390
1391        # Get binaries for a single-domain build
1392        binaries += self._get_binaries_from_runners()
1393        # Get binaries in the case of a multiple-domain build
1394        if self.instance.sysbuild:
1395            for domain in self.instance.domains.get_domains():
1396                binaries += self._get_binaries_from_runners(domain.name)
1397
1398        # if binaries was not found in platform.binaries and runners.yaml take default ones
1399        if len(binaries) == 0:
1400            binaries = [
1401                os.path.join('zephyr', 'zephyr.hex'),
1402                os.path.join('zephyr', 'zephyr.bin'),
1403                os.path.join('zephyr', 'zephyr.elf'),
1404                os.path.join('zephyr', 'zephyr.exe'),
1405            ]
1406        return binaries
1407
1408    def _get_binaries_from_runners(self, domain='') -> list[str]:
1409        """
1410        Get list of binaries paths (absolute or relative to the
1411        self.instance.build_dir) from runners.yaml file. May be used for
1412        multiple-domain builds by passing in one domain at a time.
1413        """
1414
1415        runners_file_path: str = os.path.join(self.instance.build_dir,
1416                                              domain, 'zephyr', 'runners.yaml')
1417        if not os.path.exists(runners_file_path):
1418            return []
1419
1420        with open(runners_file_path) as file:
1421            runners_content: dict = yaml.load(file, Loader=SafeLoader)
1422
1423        if 'config' not in runners_content:
1424            return []
1425
1426        runners_config: dict = runners_content['config']
1427        binary_keys: list[str] = ['elf_file', 'hex_file', 'bin_file']
1428
1429        binaries: list[str] = []
1430        for binary_key in binary_keys:
1431            binary_path = runners_config.get(binary_key)
1432            if binary_path is None:
1433                continue
1434            if os.path.isabs(binary_path):
1435                binaries.append(binary_path)
1436            else:
1437                binaries.append(os.path.join(domain, 'zephyr', binary_path))
1438
1439        return binaries
1440
1441    def _sanitize_files(self):
1442        """
1443        Sanitize files to make it possible to flash those file on different
1444        computer/system.
1445        """
1446        self._sanitize_runners_file()
1447        self._sanitize_zephyr_base_from_files()
1448
1449    def _sanitize_runners_file(self):
1450        """
1451        Replace absolute paths of binary files for relative ones. The base
1452        directory for those files is f"{self.instance.build_dir}/zephyr"
1453        """
1454        runners_dir_path: str = os.path.join(self.instance.build_dir, 'zephyr')
1455        runners_file_path: str = os.path.join(runners_dir_path, 'runners.yaml')
1456        if not os.path.exists(runners_file_path):
1457            return
1458
1459        with open(runners_file_path) as file:
1460            runners_content_text = file.read()
1461            runners_content_yaml: dict = yaml.load(runners_content_text, Loader=SafeLoader)
1462
1463        if 'config' not in runners_content_yaml:
1464            return
1465
1466        runners_config: dict = runners_content_yaml['config']
1467        binary_keys: list[str] = ['elf_file', 'hex_file', 'bin_file']
1468
1469        for binary_key in binary_keys:
1470            binary_path = runners_config.get(binary_key)
1471            # sanitize only paths which exist and are absolute
1472            if binary_path is None or not os.path.isabs(binary_path):
1473                continue
1474            binary_path_relative = os.path.relpath(binary_path, start=runners_dir_path)
1475            runners_content_text = runners_content_text.replace(binary_path, binary_path_relative)
1476
1477        with open(runners_file_path, 'w') as file:
1478            file.write(runners_content_text)
1479
1480    def _sanitize_zephyr_base_from_files(self):
1481        """
1482        Remove Zephyr base paths from selected files.
1483        """
1484        files_to_sanitize = [
1485            'CMakeCache.txt',
1486            os.path.join('zephyr', 'runners.yaml'),
1487        ]
1488        for file_path in files_to_sanitize:
1489            file_path = os.path.join(self.instance.build_dir, file_path)
1490            if not os.path.exists(file_path):
1491                continue
1492
1493            with open(file_path) as file:
1494                data = file.read()
1495
1496            # add trailing slash at the end of canonical_zephyr_base if it does not exist:
1497            path_to_remove = os.path.join(canonical_zephyr_base, "")
1498            data = data.replace(path_to_remove, "")
1499
1500            with open(file_path, 'w') as file:
1501                file.write(data)
1502
1503    @staticmethod
1504    def _add_instance_testcases_to_status_counts(instance, results, decrement=False):
1505        increment_value = -1 if decrement else 1
1506        for tc in instance.testcases:
1507            match tc.status:
1508                case TwisterStatus.PASS:
1509                    results.passed_cases_increment(increment_value)
1510                case TwisterStatus.NOTRUN:
1511                    results.notrun_cases_increment(increment_value)
1512                case TwisterStatus.BLOCK:
1513                    results.blocked_cases_increment(increment_value)
1514                case TwisterStatus.SKIP:
1515                    results.skipped_cases_increment(increment_value)
1516                case TwisterStatus.FILTER:
1517                    results.filtered_cases_increment(increment_value)
1518                case TwisterStatus.ERROR:
1519                    results.error_cases_increment(increment_value)
1520                case TwisterStatus.FAIL:
1521                    results.failed_cases_increment(increment_value)
1522                # Statuses that should not appear.
1523                # Crashing Twister at this point would be counterproductive,
1524                # but having those statuses in this part of processing is an error.
1525                case TwisterStatus.NONE:
1526                    results.none_cases_increment(increment_value)
1527                    logger.warning(f'A None status detected in instance {instance.name},'
1528                                 f' test case {tc.name}.')
1529                    results.warnings_increment(1)
1530                case TwisterStatus.STARTED:
1531                    results.started_cases_increment(increment_value)
1532                    logger.warning(f'A started status detected in instance {instance.name},'
1533                                 f' test case {tc.name}.')
1534                    results.warnings_increment(1)
1535                case _:
1536                    logger.warning(
1537                        f'An unknown status "{tc.status}" detected in instance {instance.name},'
1538                        f' test case {tc.name}.'
1539                    )
1540                    results.warnings_increment(1)
1541
1542
1543    def report_out(self, results):
1544        total_to_do = results.total - results.filtered_static
1545        total_tests_width = len(str(total_to_do))
1546        results.done_increment()
1547        instance = self.instance
1548        if results.iteration == 1:
1549            results.cases_increment(len(instance.testcases))
1550
1551        self._add_instance_testcases_to_status_counts(instance, results)
1552
1553        status = (
1554            f'{TwisterStatus.get_color(instance.status)}{str.upper(instance.status)}{Fore.RESET}'
1555        )
1556
1557        if instance.status in [TwisterStatus.ERROR, TwisterStatus.FAIL]:
1558            if instance.status == TwisterStatus.ERROR:
1559                results.error_increment()
1560            else:
1561                results.failed_increment()
1562            if self.options.verbose:
1563                status += " " + instance.reason
1564            else:
1565                logger.error(
1566                    f"{instance.platform.name:<25} {instance.testsuite.name:<50}"
1567                    f" {status}: {instance.reason}"
1568                )
1569            if not self.options.verbose:
1570                self.log_info_file(self.options.inline_logs)
1571        elif instance.status == TwisterStatus.SKIP:
1572            results.skipped_increment()
1573        elif instance.status == TwisterStatus.FILTER:
1574            results.filtered_configs_increment()
1575        elif instance.status == TwisterStatus.PASS:
1576            results.passed_increment()
1577        elif instance.status == TwisterStatus.NOTRUN:
1578            results.notrun_increment()
1579        else:
1580            logger.debug(f"Unknown status = {instance.status}")
1581            status = Fore.YELLOW + "UNKNOWN" + Fore.RESET
1582
1583        if self.options.verbose:
1584            if self.options.cmake_only:
1585                more_info = "cmake"
1586            elif instance.status in [TwisterStatus.SKIP, TwisterStatus.FILTER]:
1587                more_info = instance.reason
1588            else:
1589                if instance.handler.ready and instance.run:
1590                    more_info = instance.handler.type_str
1591                    htime = instance.execution_time
1592                    if instance.dut:
1593                        more_info += f": {instance.dut},"
1594                    if htime:
1595                        more_info += f" {htime:.3f}s"
1596                else:
1597                    more_info = "build"
1598
1599                if ( instance.status in [TwisterStatus.ERROR, TwisterStatus.FAIL]
1600                     and hasattr(self.instance.handler, 'seed')
1601                     and self.instance.handler.seed is not None ):
1602                    more_info += "/seed: " + str(self.options.seed)
1603                if instance.toolchain:
1604                    more_info += f" <{instance.toolchain}>"
1605            logger.info(
1606                f"{results.done - results.filtered_static:>{total_tests_width}}/{total_to_do}"
1607                f" {instance.platform.name:<25} {instance.testsuite.name:<50}"
1608                f" {status} ({more_info})"
1609            )
1610
1611            if self.options.verbose > 1:
1612                for tc in self.instance.testcases:
1613                    color = TwisterStatus.get_color(tc.status)
1614                    logger.info(f'    {" ":<{total_tests_width+25+4}} {tc.name:<75} '
1615                                f'{color}{str.upper(tc.status.value):<12}{Fore.RESET}'
1616                                f'{" " + tc.reason if tc.reason else ""}')
1617
1618            if instance.status in [TwisterStatus.ERROR, TwisterStatus.FAIL]:
1619                self.log_info_file(self.options.inline_logs)
1620        else:
1621            completed_perc = 0
1622            if total_to_do > 0:
1623                completed_perc = int(
1624                    (float(results.done - results.filtered_static) / total_to_do) * 100
1625                )
1626
1627            unfiltered = results.done - results.filtered_static
1628            complete_section = (
1629                f"{TwisterStatus.get_color(TwisterStatus.PASS)}"
1630                f"{unfiltered:>4}/{total_to_do:>4}"
1631                f"{Fore.RESET}  {completed_perc:>2}%"
1632            )
1633            notrun_section = (
1634                f"{TwisterStatus.get_color(TwisterStatus.NOTRUN)}{results.notrun:>4}{Fore.RESET}"
1635            )
1636            filtered_section_color = (
1637                TwisterStatus.get_color(TwisterStatus.SKIP)
1638                if results.filtered_configs > 0
1639                else Fore.RESET
1640            )
1641            filtered_section = (
1642                f"{filtered_section_color}{results.filtered_configs:>4}{Fore.RESET}"
1643            )
1644            failed_section_color = (
1645                TwisterStatus.get_color(TwisterStatus.FAIL) if results.failed > 0 else Fore.RESET
1646            )
1647            failed_section = (
1648                f"{failed_section_color}{results.failed:>4}{Fore.RESET}"
1649            )
1650            error_section_color = (
1651                TwisterStatus.get_color(TwisterStatus.ERROR) if results.error > 0 else Fore.RESET
1652            )
1653            error_section = (
1654                f"{error_section_color}{results.error:>4}{Fore.RESET}"
1655            )
1656            sys.stdout.write(
1657                f"INFO    - Total complete: {complete_section}"
1658                f"  built (not run): {notrun_section},"
1659                f" filtered: {filtered_section},"
1660                f" failed: {failed_section},"
1661                f" error: {error_section}\r"
1662            )
1663        sys.stdout.flush()
1664
1665    @staticmethod
1666    def cmake_assemble_args(extra_args, handler, extra_conf_files, extra_overlay_confs,
1667                            extra_dtc_overlay_files, cmake_extra_args,
1668                            build_dir):
1669        # Retain quotes around config options
1670        config_options = [arg for arg in extra_args if arg.startswith("CONFIG_")]
1671        args = [arg for arg in extra_args if not arg.startswith("CONFIG_")]
1672
1673        args_expanded = ["-D{}".format(a.replace('"', '\"')) for a in config_options]
1674
1675        if handler.ready:
1676            args.extend(handler.args)
1677
1678        if extra_conf_files:
1679            args.append(f"CONF_FILE=\"{';'.join(extra_conf_files)}\"")
1680
1681        if extra_dtc_overlay_files:
1682            args.append(f"DTC_OVERLAY_FILE=\"{';'.join(extra_dtc_overlay_files)}\"")
1683
1684        # merge overlay files into one variable
1685        overlays = extra_overlay_confs.copy()
1686
1687        additional_overlay_path = os.path.join(
1688            build_dir, "twister", "testsuite_extra.conf"
1689        )
1690        if os.path.exists(additional_overlay_path):
1691            overlays.append(additional_overlay_path)
1692
1693        if overlays:
1694            args.append(f"OVERLAY_CONFIG=\"{' '.join(overlays)}\"")
1695
1696        # Build the final argument list
1697        args_expanded.extend(["-D{}".format(a.replace('"', '\"')) for a in cmake_extra_args])
1698        args_expanded.extend(["-D{}".format(a.replace('"', '')) for a in args])
1699
1700        return args_expanded
1701
1702    def cmake(self, filter_stages=None):
1703        if filter_stages is None:
1704            filter_stages = []
1705        args = []
1706        for va in self.testsuite.extra_args.copy():
1707            cond_args = va.split(":")
1708            if cond_args[0] == "arch" and len(cond_args) == 3:
1709                if self.instance.platform.arch == cond_args[1]:
1710                    args.append(cond_args[2])
1711            elif cond_args[0] == "platform" and len(cond_args) == 3:
1712                if self.instance.platform.name == cond_args[1]:
1713                    args.append(cond_args[2])
1714            elif cond_args[0] == "simulation" and len(cond_args) == 3:
1715                if self.instance.platform.simulation == cond_args[1]:
1716                    args.append(cond_args[2])
1717            else:
1718                if cond_args[0] in ["arch", "platform", "simulation"]:
1719                    logger.warning(f"Unexpected extra_args: {va}")
1720                args.append(va)
1721
1722
1723        args = self.cmake_assemble_args(
1724            args,
1725            self.instance.handler,
1726            self.testsuite.extra_conf_files,
1727            self.testsuite.extra_overlay_confs,
1728            self.testsuite.extra_dtc_overlay_files,
1729            self.options.extra_args, # CMake extra args
1730            self.instance.build_dir,
1731        )
1732        return self.run_cmake(args,filter_stages)
1733
1734    def build(self):
1735        harness = HarnessImporter.get_harness(self.instance.testsuite.harness.capitalize())
1736        build_result = self.run_build(['--build', self.build_dir])
1737        try:
1738            if harness:
1739                harness.instance = self.instance
1740                harness.build()
1741        except ConfigurationError as error:
1742            self.instance.status = TwisterStatus.ERROR
1743            self.instance.reason = str(error)
1744            logger.error(self.instance.reason)
1745            return
1746        return build_result
1747
1748    def run(self):
1749
1750        instance = self.instance
1751
1752        if instance.handler.ready:
1753            logger.debug(f"Reset instance status from '{instance.status}' to None before run.")
1754            instance.status = TwisterStatus.NONE
1755
1756            if instance.handler.type_str == "device":
1757                instance.handler.duts = self.duts
1758
1759            if(self.options.seed is not None and instance.platform.name.startswith("native_")):
1760                self.parse_generated()
1761                if('CONFIG_FAKE_ENTROPY_NATIVE_SIM' in self.defconfig and
1762                    self.defconfig['CONFIG_FAKE_ENTROPY_NATIVE_SIM'] == 'y'):
1763                    instance.handler.seed = self.options.seed
1764
1765            if self.options.extra_test_args and instance.platform.arch == "posix":
1766                instance.handler.extra_test_args = self.options.extra_test_args
1767
1768            harness = HarnessImporter.get_harness(instance.testsuite.harness.capitalize())
1769            try:
1770                harness.configure(instance)
1771            except ConfigurationError as error:
1772                instance.status = TwisterStatus.ERROR
1773                instance.reason = str(error)
1774                logger.error(instance.reason)
1775                return
1776            #
1777            if isinstance(harness, Pytest):
1778                harness.pytest_run(instance.handler.get_test_timeout())
1779            elif isinstance(harness, Ctest):
1780                harness.ctest_run(instance.handler.get_test_timeout())
1781            else:
1782                instance.handler.handle(harness)
1783
1784        sys.stdout.flush()
1785
1786    def gather_metrics(self, instance: TestInstance):
1787        build_result = {"returncode": 0}
1788        if self.options.create_rom_ram_report:
1789            build_result = self.run_build(['--build', self.build_dir, "--target", "footprint"])
1790        if self.options.enable_size_report and not self.options.cmake_only:
1791            self.calc_size(instance=instance, from_buildlog=self.options.footprint_from_buildlog)
1792        else:
1793            instance.metrics["used_ram"] = 0
1794            instance.metrics["used_rom"] = 0
1795            instance.metrics["available_rom"] = 0
1796            instance.metrics["available_ram"] = 0
1797            instance.metrics["unrecognized"] = []
1798        return build_result
1799
1800    @staticmethod
1801    def calc_size(instance: TestInstance, from_buildlog: bool):
1802        if instance.status not in [TwisterStatus.ERROR, TwisterStatus.FAIL, TwisterStatus.SKIP]:
1803            if instance.platform.type not in ["native", "qemu", "unit"]:
1804                generate_warning = bool(instance.platform.type == "mcu")
1805                size_calc = instance.calculate_sizes(
1806                    from_buildlog=from_buildlog,
1807                    generate_warning=generate_warning
1808                )
1809                instance.metrics["used_ram"] = size_calc.get_used_ram()
1810                instance.metrics["used_rom"] = size_calc.get_used_rom()
1811                instance.metrics["available_rom"] = size_calc.get_available_rom()
1812                instance.metrics["available_ram"] = size_calc.get_available_ram()
1813                instance.metrics["unrecognized"] = size_calc.unrecognized_sections()
1814            else:
1815                instance.metrics["used_ram"] = 0
1816                instance.metrics["used_rom"] = 0
1817                instance.metrics["available_rom"] = 0
1818                instance.metrics["available_ram"] = 0
1819                instance.metrics["unrecognized"] = []
1820            instance.metrics["handler_time"] = instance.execution_time
1821
1822class TwisterRunner:
1823
1824    def __init__(self, instances, suites, env=None) -> None:
1825        self.pipeline = None
1826        self.options = env.options
1827        self.env = env
1828        self.instances = instances
1829        self.suites = suites
1830        self.duts = None
1831        self.jobs = 1
1832        self.results = None
1833        self.jobserver = None
1834
1835    def run(self):
1836
1837        retries = self.options.retry_failed + 1
1838
1839        BaseManager.register('LifoQueue', queue.LifoQueue)
1840        manager = BaseManager()
1841        manager.start()
1842
1843        self.results = ExecutionCounter(total=len(self.instances))
1844        self.iteration = 0
1845        pipeline = manager.LifoQueue()
1846        done_queue = manager.LifoQueue()
1847
1848        # Set number of jobs
1849        if self.options.jobs:
1850            self.jobs = self.options.jobs
1851        elif self.options.build_only:
1852            self.jobs = multiprocessing.cpu_count() * 2
1853        else:
1854            self.jobs = multiprocessing.cpu_count()
1855
1856        if sys.platform == "linux":
1857            if os.name == 'posix':
1858                self.jobserver = GNUMakeJobClient.from_environ(jobs=self.options.jobs)
1859                if not self.jobserver:
1860                    self.jobserver = GNUMakeJobServer(self.jobs)
1861                elif self.jobserver.jobs:
1862                    self.jobs = self.jobserver.jobs
1863            # TODO: Implement this on windows/mac also
1864            else:
1865                self.jobserver = JobClient()
1866
1867            logger.info(f"JOBS: {self.jobs}")
1868
1869        self.update_counting_before_pipeline()
1870
1871        while True:
1872            self.results.iteration_increment()
1873
1874            if self.results.iteration > 1:
1875                logger.info(f"{self.results.iteration} Iteration:")
1876                time.sleep(self.options.retry_interval)  # waiting for the system to settle down
1877                self.results.done = self.results.total - self.results.failed
1878                self.results.failed = 0
1879                if self.options.retry_build_errors:
1880                    self.results.done -= self.results.error
1881                    self.results.error = 0
1882            else:
1883                self.results.done = self.results.filtered_static
1884
1885            self.execute(pipeline, done_queue)
1886
1887            while True:
1888                try:
1889                    inst = done_queue.get_nowait()
1890                except queue.Empty:
1891                    break
1892                else:
1893                    inst.metrics.update(self.instances[inst.name].metrics)
1894                    inst.metrics["handler_time"] = inst.execution_time
1895                    inst.metrics["unrecognized"] = []
1896                    self.instances[inst.name] = inst
1897
1898            print("")
1899
1900            retry_errors = False
1901            if self.results.error and self.options.retry_build_errors:
1902                retry_errors = True
1903
1904            retries = retries - 1
1905            if retries == 0 or ( self.results.failed == 0 and not retry_errors):
1906                break
1907
1908        self.show_brief()
1909
1910    def update_counting_before_pipeline(self):
1911        '''
1912        Updating counting before pipeline is necessary because statically filterd
1913        test instance never enter the pipeline. While some pipeline output needs
1914        the static filter stats. So need to prepare them before pipline starts.
1915        '''
1916        for instance in self.instances.values():
1917            if instance.status == TwisterStatus.FILTER and instance.reason != 'runtime filter':
1918                self.results.filtered_static_increment()
1919                self.results.filtered_configs_increment()
1920                self.results.filtered_cases_increment(len(instance.testsuite.testcases))
1921                self.results.cases_increment(len(instance.testsuite.testcases))
1922            elif instance.status == TwisterStatus.ERROR:
1923                self.results.error_increment()
1924
1925    def show_brief(self):
1926        logger.info(
1927            f"{len(self.suites)} test scenarios ({len(self.instances)} configurations) selected,"
1928            f" {self.results.filtered_configs} configurations filtered"
1929            f" ({self.results.filtered_static} by static filter,"
1930            f" {self.results.filtered_configs - self.results.filtered_static} at runtime)."
1931        )
1932
1933    def add_tasks_to_queue(
1934        self,
1935        pipeline,
1936        build_only=False,
1937        test_only=False,
1938        retry_build_errors=False
1939    ):
1940        for instance in self.instances.values():
1941            if build_only:
1942                instance.run = False
1943
1944            no_retry_statuses = [
1945                TwisterStatus.PASS,
1946                TwisterStatus.SKIP,
1947                TwisterStatus.FILTER,
1948                TwisterStatus.NOTRUN
1949            ]
1950            if not retry_build_errors:
1951                no_retry_statuses.append(TwisterStatus.ERROR)
1952
1953            if instance.status not in no_retry_statuses:
1954                logger.debug(f"adding {instance.name}")
1955                if instance.status != TwisterStatus.NONE:
1956                    instance.retries += 1
1957                instance.status = TwisterStatus.NONE
1958                # Previous states should be removed from the stats
1959                if self.results.iteration > 1:
1960                    ProjectBuilder._add_instance_testcases_to_status_counts(
1961                        instance,
1962                        self.results,
1963                        decrement=True
1964                    )
1965
1966                # Check if cmake package_helper script can be run in advance.
1967                instance.filter_stages = []
1968                if instance.testsuite.filter:
1969                    instance.filter_stages = self.get_cmake_filter_stages(
1970                        instance.testsuite.filter,
1971                        expr_parser.reserved.keys()
1972                    )
1973
1974                if test_only and instance.run:
1975                    pipeline.put({"op": "run", "test": instance})
1976                elif instance.filter_stages and "full" not in instance.filter_stages:
1977                    pipeline.put({"op": "filter", "test": instance})
1978                else:
1979                    cache_file = os.path.join(instance.build_dir, "CMakeCache.txt")
1980                    if os.path.exists(cache_file) and self.env.options.aggressive_no_clean:
1981                        pipeline.put({"op": "build", "test": instance})
1982                    else:
1983                        pipeline.put({"op": "cmake", "test": instance})
1984
1985
1986    def pipeline_mgr(self, pipeline, done_queue, lock, results):
1987        try:
1988            if sys.platform == 'linux':
1989                with self.jobserver.get_job():
1990                    while True:
1991                        try:
1992                            task = pipeline.get_nowait()
1993                        except queue.Empty:
1994                            break
1995                        else:
1996                            instance = task['test']
1997                            pb = ProjectBuilder(instance, self.env, self.jobserver)
1998                            pb.duts = self.duts
1999                            pb.process(pipeline, done_queue, task, lock, results)
2000                            if self.env.options.quit_on_failure and \
2001                                pb.instance.status in [TwisterStatus.FAIL, TwisterStatus.ERROR]:
2002                                with pipeline.mutex:
2003                                    pipeline.queue.clear()
2004                                break
2005
2006                    return True
2007            else:
2008                while True:
2009                    try:
2010                        task = pipeline.get_nowait()
2011                    except queue.Empty:
2012                        break
2013                    else:
2014                        instance = task['test']
2015                        pb = ProjectBuilder(instance, self.env, self.jobserver)
2016                        pb.duts = self.duts
2017                        pb.process(pipeline, done_queue, task, lock, results)
2018                        if self.env.options.quit_on_failure and \
2019                            pb.instance.status in [TwisterStatus.FAIL, TwisterStatus.ERROR]:
2020                            with pipeline.mutex:
2021                                pipeline.queue.clear()
2022                            break
2023                return True
2024        except Exception as e:
2025            logger.error(f"General exception: {e}\n{traceback.format_exc()}")
2026            sys.exit(1)
2027
2028    def execute(self, pipeline, done):
2029        lock = Lock()
2030        logger.info("Adding tasks to the queue...")
2031        self.add_tasks_to_queue(pipeline, self.options.build_only, self.options.test_only,
2032                                retry_build_errors=self.options.retry_build_errors)
2033        logger.info("Added initial list of jobs to queue")
2034
2035        processes = []
2036
2037        for _ in range(self.jobs):
2038            p = Process(target=self.pipeline_mgr, args=(pipeline, done, lock, self.results, ))
2039            processes.append(p)
2040            p.start()
2041        logger.debug(f"Launched {self.jobs} jobs")
2042
2043        try:
2044            for p in processes:
2045                p.join()
2046                if p.exitcode != 0:
2047                    logger.error(f"Process {p.pid} failed, aborting execution")
2048                    for proc in processes:
2049                        proc.terminate()
2050                    sys.exit(1)
2051        except KeyboardInterrupt:
2052            logger.info("Execution interrupted")
2053            for p in processes:
2054                p.terminate()
2055
2056    @staticmethod
2057    def get_cmake_filter_stages(filt, logic_keys):
2058        """Analyze filter expressions from test yaml
2059        and decide if dts and/or kconfig based filtering will be needed.
2060        """
2061        dts_required = False
2062        kconfig_required = False
2063        full_required = False
2064        filter_stages = []
2065
2066        # Compress args in expressions like "function('x', 'y')"
2067        # so they are not split when splitting by whitespaces
2068        filt = filt.replace(", ", ",")
2069        # Remove logic words
2070        for k in logic_keys:
2071            filt = filt.replace(f"{k} ", "")
2072        # Remove brackets
2073        filt = filt.replace("(", "")
2074        filt = filt.replace(")", "")
2075        # Splite by whitespaces
2076        filt = filt.split()
2077        for expression in filt:
2078            if expression.startswith("dt_"):
2079                dts_required = True
2080            elif expression.startswith("CONFIG"):
2081                kconfig_required = True
2082            else:
2083                full_required = True
2084
2085        if full_required:
2086            return ["full"]
2087        if dts_required:
2088            filter_stages.append("dts")
2089        if kconfig_required:
2090            filter_stages.append("kconfig")
2091
2092        return filter_stages
2093