1# vim: set syntax=python ts=4 :
2#
3# Copyright (c) 2018-2024 Intel Corporation
4# Copyright 2022 NXP
5# SPDX-License-Identifier: Apache-2.0
6
7import logging
8import multiprocessing
9import os
10import pathlib
11import pickle
12import queue
13import re
14import shutil
15import subprocess
16import sys
17import time
18import traceback
19from math import log10
20from multiprocessing import Lock, Process, Value
21from multiprocessing.managers import BaseManager
22
23import elftools
24import yaml
25from colorama import Fore
26from elftools.elf.elffile import ELFFile
27from elftools.elf.sections import SymbolTableSection
28from packaging import version
29from twisterlib.cmakecache import CMakeCache
30from twisterlib.environment import canonical_zephyr_base
31from twisterlib.error import BuildError, ConfigurationError, StatusAttributeError
32from twisterlib.statuses import TwisterStatus
33
34if version.parse(elftools.__version__) < version.parse('0.24'):
35    sys.exit("pyelftools is out of date, need version 0.24 or later")
36
37# Job server only works on Linux for now.
38if sys.platform == 'linux':
39    from twisterlib.jobserver import GNUMakeJobClient, GNUMakeJobServer, JobClient
40
41from twisterlib.environment import ZEPHYR_BASE
42
43sys.path.insert(0, os.path.join(ZEPHYR_BASE, "scripts/pylib/build_helpers"))
44from domains import Domains
45from twisterlib.environment import TwisterEnv
46from twisterlib.harness import HarnessImporter, Pytest
47from twisterlib.log_helper import log_command
48from twisterlib.platform import Platform
49from twisterlib.testinstance import TestInstance
50from twisterlib.testplan import change_skip_to_error_if_integration
51from twisterlib.testsuite import TestSuite
52
53try:
54    from yaml import CSafeLoader as SafeLoader
55except ImportError:
56    from yaml import SafeLoader
57
58import expr_parser
59from anytree import Node, RenderTree
60
61logger = logging.getLogger('twister')
62logger.setLevel(logging.DEBUG)
63
64
65class ExecutionCounter:
66    def __init__(self, total=0):
67        '''
68        Most of the stats are at test instance level
69        Except that case statistics are for cases of ALL test instances
70
71        total = yaml test scenarios * applicable platforms
72        done := instances that reached report_out stage of the pipeline
73        done = filtered_configs + passed + failed + error
74        completed = done - filtered_static
75        filtered_configs = filtered_runtime + filtered_static
76
77        pass rate = passed / (total - filtered_configs)
78        case pass rate = passed_cases / (cases - filtered_cases - skipped_cases)
79        '''
80        # instances that go through the pipeline
81        # updated by report_out()
82        self._done = Value('i', 0)
83
84        # iteration
85        self._iteration = Value('i', 0)
86
87        # instances that actually executed and passed
88        # updated by report_out()
89        self._passed = Value('i', 0)
90
91        # instances that are built but not runnable
92        # updated by report_out()
93        self._notrun = Value('i', 0)
94
95        # static filter + runtime filter + build skipped
96        # updated by update_counting_before_pipeline() and report_out()
97        self._filtered_configs = Value('i', 0)
98
99        # cmake filter + build skipped
100        # updated by report_out()
101        self._filtered_runtime = Value('i', 0)
102
103        # static filtered at yaml parsing time
104        # updated by update_counting_before_pipeline()
105        self._filtered_static = Value('i', 0)
106
107        # updated by report_out() in pipeline
108        self._error = Value('i', 0)
109        self._failed = Value('i', 0)
110        self._skipped = Value('i', 0)
111
112        # initialized to number of test instances
113        self._total = Value('i', total)
114
115        #######################################
116        # TestCase counters for all instances #
117        #######################################
118        # updated in report_out
119        self._cases = Value('i', 0)
120
121        # updated by update_counting_before_pipeline() and report_out()
122        self._skipped_cases = Value('i', 0)
123        self._filtered_cases = Value('i', 0)
124
125        # updated by report_out() in pipeline
126        self._passed_cases = Value('i', 0)
127        self._notrun_cases = Value('i', 0)
128        self._failed_cases = Value('i', 0)
129        self._error_cases = Value('i', 0)
130        self._blocked_cases = Value('i', 0)
131
132        # Incorrect statuses
133        self._none_cases = Value('i', 0)
134        self._started_cases = Value('i', 0)
135
136        self._warnings = Value('i', 0)
137
138        self.lock = Lock()
139
140    @staticmethod
141    def _find_number_length(n):
142        if n > 0:
143            length = int(log10(n))+1
144        elif n == 0:
145            length = 1
146        else:
147            length = int(log10(-n))+2
148        return length
149
150    def summary(self):
151        selected_cases = self.cases - self.filtered_cases
152        selected_configs = self.done - self.filtered_static - self.filtered_runtime
153
154
155        root = Node("Summary")
156
157        Node(f"Total test suites: {self.total}", parent=root)
158        processed_suites = Node(f"Processed test suites: {self.done}", parent=root)
159        filtered_suites = Node(
160            f"Filtered test suites: {self.filtered_configs}",
161            parent=processed_suites
162        )
163        Node(f"Filtered test suites (static): {self.filtered_static}", parent=filtered_suites)
164        Node(f"Filtered test suites (at runtime): {self.filtered_runtime}", parent=filtered_suites)
165        selected_suites = Node(f"Selected test suites: {selected_configs}", parent=processed_suites)
166        Node(f"Skipped test suites: {self.skipped}", parent=selected_suites)
167        Node(f"Passed test suites: {self.passed}", parent=selected_suites)
168        Node(f"Built only test suites: {self.notrun}", parent=selected_suites)
169        Node(f"Failed test suites: {self.failed}", parent=selected_suites)
170        Node(f"Errors in test suites: {self.error}", parent=selected_suites)
171
172        total_cases = Node(f"Total test cases: {self.cases}", parent=root)
173        Node(f"Filtered test cases: {self.filtered_cases}", parent=total_cases)
174        selected_cases_node = Node(f"Selected test cases: {selected_cases}", parent=total_cases)
175        Node(f"Passed test cases: {self.passed_cases}", parent=selected_cases_node)
176        Node(f"Skipped test cases: {self.skipped_cases}", parent=selected_cases_node)
177        Node(f"Built only test cases: {self.notrun_cases}", parent=selected_cases_node)
178        Node(f"Blocked test cases: {self.blocked_cases}", parent=selected_cases_node)
179        Node(f"Failed test cases: {self.failed_cases}", parent=selected_cases_node)
180        error_cases_node = Node(
181            f"Errors in test cases: {self.error_cases}",
182            parent=selected_cases_node
183        )
184
185        if self.none_cases or self.started_cases:
186            Node(
187                "The following test case statuses should not appear in a proper execution",
188                parent=error_cases_node
189            )
190        if self.none_cases:
191            Node(f"Statusless test cases: {self.none_cases}", parent=error_cases_node)
192        if self.started_cases:
193            Node(f"Test cases only started: {self.started_cases}", parent=error_cases_node)
194
195        for pre, _, node in RenderTree(root):
196            print(f"{pre}{node.name}")
197
198    @property
199    def warnings(self):
200        with self._warnings.get_lock():
201            return self._warnings.value
202
203    @warnings.setter
204    def warnings(self, value):
205        with self._warnings.get_lock():
206            self._warnings.value = value
207
208    def warnings_increment(self, value=1):
209        with self._warnings.get_lock():
210            self._warnings.value += value
211
212    @property
213    def cases(self):
214        with self._cases.get_lock():
215            return self._cases.value
216
217    @cases.setter
218    def cases(self, value):
219        with self._cases.get_lock():
220            self._cases.value = value
221
222    def cases_increment(self, value=1):
223        with self._cases.get_lock():
224            self._cases.value += value
225
226    @property
227    def skipped_cases(self):
228        with self._skipped_cases.get_lock():
229            return self._skipped_cases.value
230
231    @skipped_cases.setter
232    def skipped_cases(self, value):
233        with self._skipped_cases.get_lock():
234            self._skipped_cases.value = value
235
236    def skipped_cases_increment(self, value=1):
237        with self._skipped_cases.get_lock():
238            self._skipped_cases.value += value
239
240    @property
241    def filtered_cases(self):
242        with self._filtered_cases.get_lock():
243            return self._filtered_cases.value
244
245    @filtered_cases.setter
246    def filtered_cases(self, value):
247        with self._filtered_cases.get_lock():
248            self._filtered_cases.value = value
249
250    def filtered_cases_increment(self, value=1):
251        with self._filtered_cases.get_lock():
252            self._filtered_cases.value += value
253
254    @property
255    def passed_cases(self):
256        with self._passed_cases.get_lock():
257            return self._passed_cases.value
258
259    @passed_cases.setter
260    def passed_cases(self, value):
261        with self._passed_cases.get_lock():
262            self._passed_cases.value = value
263
264    def passed_cases_increment(self, value=1):
265        with self._passed_cases.get_lock():
266            self._passed_cases.value += value
267
268    @property
269    def notrun_cases(self):
270        with self._notrun_cases.get_lock():
271            return self._notrun_cases.value
272
273    @notrun_cases.setter
274    def notrun_cases(self, value):
275        with self._notrun.get_lock():
276            self._notrun.value = value
277
278    def notrun_cases_increment(self, value=1):
279        with self._notrun_cases.get_lock():
280            self._notrun_cases.value += value
281
282    @property
283    def failed_cases(self):
284        with self._failed_cases.get_lock():
285            return self._failed_cases.value
286
287    @failed_cases.setter
288    def failed_cases(self, value):
289        with self._failed_cases.get_lock():
290            self._failed_cases.value = value
291
292    def failed_cases_increment(self, value=1):
293        with self._failed_cases.get_lock():
294            self._failed_cases.value += value
295
296    @property
297    def error_cases(self):
298        with self._error_cases.get_lock():
299            return self._error_cases.value
300
301    @error_cases.setter
302    def error_cases(self, value):
303        with self._error_cases.get_lock():
304            self._error_cases.value = value
305
306    def error_cases_increment(self, value=1):
307        with self._error_cases.get_lock():
308            self._error_cases.value += value
309
310    @property
311    def blocked_cases(self):
312        with self._blocked_cases.get_lock():
313            return self._blocked_cases.value
314
315    @blocked_cases.setter
316    def blocked_cases(self, value):
317        with self._blocked_cases.get_lock():
318            self._blocked_cases.value = value
319
320    def blocked_cases_increment(self, value=1):
321        with self._blocked_cases.get_lock():
322            self._blocked_cases.value += value
323
324    @property
325    def none_cases(self):
326        with self._none_cases.get_lock():
327            return self._none_cases.value
328
329    @none_cases.setter
330    def none_cases(self, value):
331        with self._none_cases.get_lock():
332            self._none_cases.value = value
333
334    def none_cases_increment(self, value=1):
335        with self._none_cases.get_lock():
336            self._none_cases.value += value
337
338    @property
339    def started_cases(self):
340        with self._started_cases.get_lock():
341            return self._started_cases.value
342
343    @started_cases.setter
344    def started_cases(self, value):
345        with self._started_cases.get_lock():
346            self._started_cases.value = value
347
348    def started_cases_increment(self, value=1):
349        with self._started_cases.get_lock():
350            self._started_cases.value += value
351
352    @property
353    def skipped(self):
354        with self._skipped.get_lock():
355            return self._skipped.value
356
357    @skipped.setter
358    def skipped(self, value):
359        with self._skipped.get_lock():
360            self._skipped.value = value
361
362    def skipped_increment(self, value=1):
363        with self._skipped.get_lock():
364            self._skipped.value += value
365
366    @property
367    def error(self):
368        with self._error.get_lock():
369            return self._error.value
370
371    @error.setter
372    def error(self, value):
373        with self._error.get_lock():
374            self._error.value = value
375
376    def error_increment(self, value=1):
377        with self._error.get_lock():
378            self._error.value += value
379
380    @property
381    def iteration(self):
382        with self._iteration.get_lock():
383            return self._iteration.value
384
385    @iteration.setter
386    def iteration(self, value):
387        with self._iteration.get_lock():
388            self._iteration.value = value
389
390    def iteration_increment(self, value=1):
391        with self._iteration.get_lock():
392            self._iteration.value += value
393
394    @property
395    def done(self):
396        with self._done.get_lock():
397            return self._done.value
398
399    @done.setter
400    def done(self, value):
401        with self._done.get_lock():
402            self._done.value = value
403
404    def done_increment(self, value=1):
405        with self._done.get_lock():
406            self._done.value += value
407
408    @property
409    def passed(self):
410        with self._passed.get_lock():
411            return self._passed.value
412
413    @passed.setter
414    def passed(self, value):
415        with self._passed.get_lock():
416            self._passed.value = value
417
418    def passed_increment(self, value=1):
419        with self._passed.get_lock():
420            self._passed.value += value
421
422    @property
423    def notrun(self):
424        with self._notrun.get_lock():
425            return self._notrun.value
426
427    @notrun.setter
428    def notrun(self, value):
429        with self._notrun.get_lock():
430            self._notrun.value = value
431
432    def notrun_increment(self, value=1):
433        with self._notrun.get_lock():
434            self._notrun.value += value
435
436    @property
437    def filtered_configs(self):
438        with self._filtered_configs.get_lock():
439            return self._filtered_configs.value
440
441    @filtered_configs.setter
442    def filtered_configs(self, value):
443        with self._filtered_configs.get_lock():
444            self._filtered_configs.value = value
445
446    def filtered_configs_increment(self, value=1):
447        with self._filtered_configs.get_lock():
448            self._filtered_configs.value += value
449
450    @property
451    def filtered_static(self):
452        with self._filtered_static.get_lock():
453            return self._filtered_static.value
454
455    @filtered_static.setter
456    def filtered_static(self, value):
457        with self._filtered_static.get_lock():
458            self._filtered_static.value = value
459
460    def filtered_static_increment(self, value=1):
461        with self._filtered_static.get_lock():
462            self._filtered_static.value += value
463
464    @property
465    def filtered_runtime(self):
466        with self._filtered_runtime.get_lock():
467            return self._filtered_runtime.value
468
469    @filtered_runtime.setter
470    def filtered_runtime(self, value):
471        with self._filtered_runtime.get_lock():
472            self._filtered_runtime.value = value
473
474    def filtered_runtime_increment(self, value=1):
475        with self._filtered_runtime.get_lock():
476            self._filtered_runtime.value += value
477
478    @property
479    def failed(self):
480        with self._failed.get_lock():
481            return self._failed.value
482
483    @failed.setter
484    def failed(self, value):
485        with self._failed.get_lock():
486            self._failed.value = value
487
488    def failed_increment(self, value=1):
489        with self._failed.get_lock():
490            self._failed.value += value
491
492    @property
493    def total(self):
494        with self._total.get_lock():
495            return self._total.value
496
497    @total.setter
498    def total(self, value):
499        with self._total.get_lock():
500            self._total.value = value
501
502    def total_increment(self, value=1):
503        with self._total.get_lock():
504            self._total.value += value
505
506class CMake:
507    config_re = re.compile('(CONFIG_[A-Za-z0-9_]+)[=]\"?([^\"]*)\"?$')
508    dt_re = re.compile('([A-Za-z0-9_]+)[=]\"?([^\"]*)\"?$')
509
510    def __init__(self, testsuite: TestSuite, platform: Platform, source_dir, build_dir, jobserver):
511
512        self.cwd = None
513        self.capture_output = True
514
515        self.defconfig = {}
516        self.cmake_cache = {}
517
518        self.instance = None
519        self.testsuite = testsuite
520        self.platform = platform
521        self.source_dir = source_dir
522        self.build_dir = build_dir
523        self.log = "build.log"
524
525        self.default_encoding = sys.getdefaultencoding()
526        self.jobserver = jobserver
527
528    def parse_generated(self, filter_stages=None):
529        self.defconfig = {}
530        return {}
531
532    def run_build(self, args=None):
533        if args is None:
534            args = []
535
536        logger.debug(f"Building {self.source_dir} for {self.platform.name}")
537
538        cmake_args = []
539        cmake_args.extend(args)
540        cmake = shutil.which('cmake')
541        cmd = [cmake] + cmake_args
542        kwargs = dict()
543
544        if self.capture_output:
545            kwargs['stdout'] = subprocess.PIPE
546            # CMake sends the output of message() to stderr unless it's STATUS
547            kwargs['stderr'] = subprocess.STDOUT
548
549        if self.cwd:
550            kwargs['cwd'] = self.cwd
551
552        start_time = time.time()
553        if sys.platform == 'linux':
554            p = self.jobserver.popen(cmd, **kwargs)
555        else:
556            p = subprocess.Popen(cmd, **kwargs)
557        logger.debug(f'Running {" ".join(cmd)}')
558
559        out, _ = p.communicate()
560
561        ret = {}
562        duration = time.time() - start_time
563        self.instance.build_time += duration
564        if p.returncode == 0:
565            msg = (
566                f"Finished building {self.source_dir} for {self.platform.name}"
567                f" in {duration:.2f} seconds"
568            )
569            logger.debug(msg)
570
571            if not self.instance.run:
572                self.instance.status = TwisterStatus.NOTRUN
573                self.instance.add_missing_case_status(TwisterStatus.NOTRUN, "Test was built only")
574            else:
575                self.instance.status = TwisterStatus.PASS
576            ret = {"returncode": p.returncode}
577
578            if out:
579                log_msg = out.decode(self.default_encoding)
580                with open(
581                    os.path.join(self.build_dir, self.log),
582                    "a",
583                    encoding=self.default_encoding
584                ) as log:
585                    log.write(log_msg)
586            else:
587                return None
588        else:
589            # A real error occurred, raise an exception
590            log_msg = ""
591            if out:
592                log_msg = out.decode(self.default_encoding)
593                with open(
594                    os.path.join(self.build_dir, self.log),
595                    "a",
596                    encoding=self.default_encoding
597                ) as log:
598                    log.write(log_msg)
599
600            if log_msg:
601                overflow_found = re.findall(
602                    "region `(FLASH|ROM|RAM|ICCM|DCCM|SRAM|dram\\d_\\d_seg)' overflowed by",
603                    log_msg
604                )
605                imgtool_overflow_found = re.findall(
606                    r"Error: Image size \(.*\) \+ trailer \(.*\) exceeds requested size",
607                    log_msg
608                )
609                if overflow_found and not self.options.overflow_as_errors:
610                    logger.debug(f"Test skipped due to {overflow_found[0]} Overflow")
611                    self.instance.status = TwisterStatus.SKIP
612                    self.instance.reason = f"{overflow_found[0]} overflow"
613                    change_skip_to_error_if_integration(self.options, self.instance)
614                elif imgtool_overflow_found and not self.options.overflow_as_errors:
615                    self.instance.status = TwisterStatus.SKIP
616                    self.instance.reason = "imgtool overflow"
617                    change_skip_to_error_if_integration(self.options, self.instance)
618                else:
619                    self.instance.status = TwisterStatus.ERROR
620                    self.instance.reason = "Build failure"
621
622            ret = {
623                "returncode": p.returncode
624            }
625
626        return ret
627
628    def run_cmake(self, args="", filter_stages=None):
629        if filter_stages is None:
630            filter_stages = []
631
632        if not self.options.disable_warnings_as_errors:
633            warnings_as_errors = 'y'
634            gen_edt_args = "--edtlib-Werror"
635        else:
636            warnings_as_errors = 'n'
637            gen_edt_args = ""
638
639        warning_command = 'CONFIG_COMPILER_WARNINGS_AS_ERRORS'
640        if self.instance.sysbuild:
641            warning_command = 'SB_' + warning_command
642
643        logger.debug(f"Running cmake on {self.source_dir} for {self.platform.name}")
644        cmake_args = [
645            f'-B{self.build_dir}',
646            f'-DTC_RUNID={self.instance.run_id}',
647            f'-DTC_NAME={self.instance.testsuite.name}',
648            f'-D{warning_command}={warnings_as_errors}',
649            f'-DEXTRA_GEN_EDT_ARGS={gen_edt_args}',
650            f'-G{self.env.generator}',
651            f'-DPython3_EXECUTABLE={pathlib.Path(sys.executable).as_posix()}'
652        ]
653
654        if self.instance.testsuite.harness == 'bsim':
655            cmake_args.extend([
656                '-DCMAKE_EXPORT_COMPILE_COMMANDS=ON',
657                '-DCONFIG_ASSERT=y',
658                '-DCONFIG_COVERAGE=y'
659            ])
660
661        # If needed, run CMake using the package_helper script first, to only run
662        # a subset of all cmake modules. This output will be used to filter
663        # testcases, and the full CMake configuration will be run for
664        # testcases that should be built
665        if filter_stages:
666            cmake_filter_args = [
667                f'-DMODULES={",".join(filter_stages)}',
668                f'-P{canonical_zephyr_base}/cmake/package_helper.cmake',
669            ]
670
671        if self.instance.sysbuild and not filter_stages:
672            logger.debug(f"Building {self.source_dir} using sysbuild")
673            source_args = [
674                f'-S{canonical_zephyr_base}/share/sysbuild',
675                f'-DAPP_DIR={self.source_dir}'
676            ]
677        else:
678            source_args = [
679                f'-S{self.source_dir}'
680            ]
681        cmake_args.extend(source_args)
682
683        cmake_args.extend(args)
684
685        cmake_opts = [f'-DBOARD={self.platform.name}']
686        cmake_args.extend(cmake_opts)
687
688        if self.instance.testsuite.required_snippets:
689            cmake_opts = [
690                '-DSNIPPET={}'.format(';'.join(self.instance.testsuite.required_snippets))
691            ]
692            cmake_args.extend(cmake_opts)
693
694        cmake = shutil.which('cmake')
695        cmd = [cmake] + cmake_args
696
697        if filter_stages:
698            cmd += cmake_filter_args
699
700        kwargs = dict()
701
702        log_command(logger, "Calling cmake", cmd)
703
704        if self.capture_output:
705            kwargs['stdout'] = subprocess.PIPE
706            # CMake sends the output of message() to stderr unless it's STATUS
707            kwargs['stderr'] = subprocess.STDOUT
708
709        if self.cwd:
710            kwargs['cwd'] = self.cwd
711
712        start_time = time.time()
713        if sys.platform == 'linux':
714            p = self.jobserver.popen(cmd, **kwargs)
715        else:
716            p = subprocess.Popen(cmd, **kwargs)
717        out, _ = p.communicate()
718
719        duration = time.time() - start_time
720        self.instance.build_time += duration
721
722        if p.returncode == 0:
723            filter_results = self.parse_generated(filter_stages)
724            msg = (
725                f"Finished running cmake {self.source_dir} for {self.platform.name}"
726                f" in {duration:.2f} seconds"
727            )
728            logger.debug(msg)
729            ret = {
730                    'returncode': p.returncode,
731                    'filter': filter_results
732                    }
733        else:
734            self.instance.status = TwisterStatus.ERROR
735            self.instance.reason = "CMake build failure"
736
737            for tc in self.instance.testcases:
738                tc.status = self.instance.status
739
740            logger.error(f"CMake build failure: {self.source_dir} for {self.platform.name}")
741            ret = {"returncode": p.returncode}
742
743        if out:
744            os.makedirs(self.build_dir, exist_ok=True)
745            with open(
746                os.path.join(self.build_dir, self.log),
747                "a",
748                encoding=self.default_encoding
749            ) as log:
750                log_msg = out.decode(self.default_encoding)
751                log.write(log_msg)
752
753        return ret
754
755
756class FilterBuilder(CMake):
757
758    def __init__(self, testsuite: TestSuite, platform: Platform, source_dir, build_dir, jobserver):
759        super().__init__(testsuite, platform, source_dir, build_dir, jobserver)
760
761        self.log = "config-twister.log"
762
763    def parse_generated(self, filter_stages=None):
764        if filter_stages is None:
765            filter_stages = []
766
767        if self.platform.name == "unit_testing":
768            return {}
769
770        if self.instance.sysbuild and not filter_stages:
771            # Load domain yaml to get default domain build directory
772            domain_path = os.path.join(self.build_dir, "domains.yaml")
773            domains = Domains.from_file(domain_path)
774            logger.debug(f"Loaded sysbuild domain data from {domain_path}")
775            self.instance.domains = domains
776            domain_build = domains.get_default_domain().build_dir
777            cmake_cache_path = os.path.join(domain_build, "CMakeCache.txt")
778            defconfig_path = os.path.join(domain_build, "zephyr", ".config")
779            edt_pickle = os.path.join(domain_build, "zephyr", "edt.pickle")
780        else:
781            cmake_cache_path = os.path.join(self.build_dir, "CMakeCache.txt")
782            # .config is only available after kconfig stage in cmake.
783            # If only dt based filtration is required package helper call won't produce .config
784            if not filter_stages or "kconfig" in filter_stages:
785                defconfig_path = os.path.join(self.build_dir, "zephyr", ".config")
786            # dt is compiled before kconfig,
787            # so edt_pickle is available regardless of choice of filter stages
788            edt_pickle = os.path.join(self.build_dir, "zephyr", "edt.pickle")
789
790
791        if not filter_stages or "kconfig" in filter_stages:
792            with open(defconfig_path) as fp:
793                defconfig = {}
794                for line in fp.readlines():
795                    m = self.config_re.match(line)
796                    if not m:
797                        if line.strip() and not line.startswith("#"):
798                            sys.stderr.write(f"Unrecognized line {line}\n")
799                        continue
800                    defconfig[m.group(1)] = m.group(2).strip()
801
802            self.defconfig = defconfig
803
804        cmake_conf = {}
805        try:
806            cache = CMakeCache.from_file(cmake_cache_path)
807        except FileNotFoundError:
808            cache = {}
809
810        for k in iter(cache):
811            cmake_conf[k.name] = k.value
812
813        self.cmake_cache = cmake_conf
814
815        filter_data = {
816            "ARCH": self.platform.arch,
817            "PLATFORM": self.platform.name
818        }
819        filter_data.update(os.environ)
820        if not filter_stages or "kconfig" in filter_stages:
821            filter_data.update(self.defconfig)
822        filter_data.update(self.cmake_cache)
823
824        # Verify that twister's arguments support sysbuild.
825        # Twister sysbuild flashing currently only works with west,
826        # so --west-flash must be passed.
827        if (
828            self.instance.sysbuild
829            and self.env.options.device_testing
830            and self.env.options.west_flash is None
831        ):
832            logger.warning("Sysbuild test will be skipped. West must be used for flashing.")
833            return {os.path.join(self.platform.name, self.testsuite.name): True}
834
835        if self.testsuite and self.testsuite.filter:
836            try:
837                if os.path.exists(edt_pickle):
838                    with open(edt_pickle, 'rb') as f:
839                        edt = pickle.load(f)
840                else:
841                    edt = None
842                ret = expr_parser.parse(self.testsuite.filter, filter_data, edt)
843
844            except (ValueError, SyntaxError) as se:
845                sys.stderr.write(f"Failed processing {self.testsuite.yamlfile}\n")
846                raise se
847
848            if not ret:
849                return {os.path.join(self.platform.name, self.testsuite.name): True}
850            else:
851                return {os.path.join(self.platform.name, self.testsuite.name): False}
852        else:
853            self.platform.filter_data = filter_data
854            return filter_data
855
856
857class ProjectBuilder(FilterBuilder):
858
859    def __init__(self, instance: TestInstance, env: TwisterEnv, jobserver, **kwargs):
860        super().__init__(
861            instance.testsuite,
862            instance.platform,
863            instance.testsuite.source_dir,
864            instance.build_dir,
865            jobserver
866        )
867
868        self.log = "build.log"
869        self.instance = instance
870        self.filtered_tests = 0
871        self.options = env.options
872        self.env = env
873        self.duts = None
874
875    @property
876    def trace(self) -> bool:
877        return self.options.verbose > 2
878
879    def log_info(self, filename, inline_logs, log_testcases=False):
880        filename = os.path.abspath(os.path.realpath(filename))
881        if inline_logs:
882            logger.info(f"{filename:-^100}")
883
884            try:
885                with open(filename) as fp:
886                    data = fp.read()
887            except Exception as e:
888                data = f"Unable to read log data ({e!s})\n"
889
890            # Remove any coverage data from the dumped logs
891            data = re.sub(
892                r"GCOV_COVERAGE_DUMP_START.*GCOV_COVERAGE_DUMP_END",
893                "GCOV_COVERAGE_DUMP_START\n...\nGCOV_COVERAGE_DUMP_END",
894                data,
895                flags=re.DOTALL,
896            )
897            logger.error(data)
898
899            logger.info(f"{filename:-^100}")
900
901            if log_testcases:
902                for tc in self.instance.testcases:
903                    if not tc.reason:
904                        continue
905                    logger.info(
906                        f"\n{str(tc.name).center(100, '_')}\n"
907                        f"{tc.reason}\n"
908                        f"{100*'_'}\n"
909                        f"{tc.output}"
910                    )
911        else:
912            logger.error("see: " + Fore.YELLOW + filename + Fore.RESET)
913
914    def log_info_file(self, inline_logs):
915        build_dir = self.instance.build_dir
916        h_log = f"{build_dir}/handler.log"
917        he_log = f"{build_dir}/handler_stderr.log"
918        b_log = f"{build_dir}/build.log"
919        v_log = f"{build_dir}/valgrind.log"
920        d_log = f"{build_dir}/device.log"
921        pytest_log = f"{build_dir}/twister_harness.log"
922
923        if os.path.exists(v_log) and "Valgrind" in self.instance.reason:
924            self.log_info(f"{v_log}", inline_logs)
925        elif os.path.exists(pytest_log) and os.path.getsize(pytest_log) > 0:
926            self.log_info(f"{pytest_log}", inline_logs, log_testcases=True)
927        elif os.path.exists(h_log) and os.path.getsize(h_log) > 0:
928            self.log_info(f"{h_log}", inline_logs)
929        elif os.path.exists(he_log) and os.path.getsize(he_log) > 0:
930            self.log_info(f"{he_log}", inline_logs)
931        elif os.path.exists(d_log) and os.path.getsize(d_log) > 0:
932            self.log_info(f"{d_log}", inline_logs)
933        else:
934            self.log_info(f"{b_log}", inline_logs)
935
936
937    def _add_to_pipeline(self, pipeline, op: str, additionals: dict=None):
938        if additionals is None:
939            additionals = {}
940        try:
941            if op:
942                task = dict({'op': op, 'test': self.instance}, **additionals)
943                pipeline.put(task)
944        # Only possible RuntimeError source here is a mutation of the pipeline during iteration.
945        # If that happens, we ought to consider the whole pipeline corrupted.
946        except RuntimeError as e:
947            logger.error(f"RuntimeError: {e}")
948            traceback.print_exc()
949
950
951    def process(self, pipeline, done, message, lock, results):
952        next_op = None
953        additionals = {}
954
955        op = message.get('op')
956
957        self.instance.setup_handler(self.env)
958
959        if op == "filter":
960            try:
961                ret = self.cmake(filter_stages=self.instance.filter_stages)
962                if self.instance.status in [TwisterStatus.FAIL, TwisterStatus.ERROR]:
963                    next_op = 'report'
964                else:
965                    # Here we check the dt/kconfig filter results coming from running cmake
966                    if self.instance.name in ret['filter'] and ret['filter'][self.instance.name]:
967                        logger.debug(f"filtering {self.instance.name}")
968                        self.instance.status = TwisterStatus.FILTER
969                        self.instance.reason = "runtime filter"
970                        results.filtered_runtime_increment()
971                        self.instance.add_missing_case_status(TwisterStatus.FILTER)
972                        next_op = 'report'
973                    else:
974                        next_op = 'cmake'
975            except StatusAttributeError as sae:
976                logger.error(str(sae))
977                self.instance.status = TwisterStatus.ERROR
978                reason = 'Incorrect status assignment'
979                self.instance.reason = reason
980                self.instance.add_missing_case_status(TwisterStatus.BLOCK, reason)
981                next_op = 'report'
982            finally:
983                self._add_to_pipeline(pipeline, next_op)
984
985        # The build process, call cmake and build with configured generator
986        elif op == "cmake":
987            try:
988                ret = self.cmake()
989                if self.instance.status in [TwisterStatus.FAIL, TwisterStatus.ERROR]:
990                    next_op = 'report'
991                elif self.options.cmake_only:
992                    if self.instance.status == TwisterStatus.NONE:
993                        logger.debug(f"CMake only: PASS {self.instance.name}")
994                        self.instance.status = TwisterStatus.NOTRUN
995                        self.instance.add_missing_case_status(TwisterStatus.NOTRUN, 'CMake only')
996                    next_op = 'report'
997                else:
998                    # Here we check the runtime filter results coming from running cmake
999                    if self.instance.name in ret['filter'] and ret['filter'][self.instance.name]:
1000                        logger.debug(f"filtering {self.instance.name}")
1001                        self.instance.status = TwisterStatus.FILTER
1002                        self.instance.reason = "runtime filter"
1003                        results.filtered_runtime_increment()
1004                        self.instance.add_missing_case_status(TwisterStatus.FILTER)
1005                        next_op = 'report'
1006                    else:
1007                        next_op = 'build'
1008            except StatusAttributeError as sae:
1009                logger.error(str(sae))
1010                self.instance.status = TwisterStatus.ERROR
1011                reason = 'Incorrect status assignment'
1012                self.instance.reason = reason
1013                self.instance.add_missing_case_status(TwisterStatus.BLOCK, reason)
1014                next_op = 'report'
1015            finally:
1016                self._add_to_pipeline(pipeline, next_op)
1017
1018        elif op == "build":
1019            try:
1020                logger.debug(f"build test: {self.instance.name}")
1021                ret = self.build()
1022                if not ret:
1023                    self.instance.status = TwisterStatus.ERROR
1024                    self.instance.reason = "Build Failure"
1025                    next_op = 'report'
1026                else:
1027                    # Count skipped cases during build, for example
1028                    # due to ram/rom overflow.
1029                    if  self.instance.status == TwisterStatus.SKIP:
1030                        results.skipped_increment()
1031                        self.instance.add_missing_case_status(
1032                            TwisterStatus.SKIP,
1033                            self.instance.reason
1034                        )
1035
1036                    if ret.get('returncode', 1) > 0:
1037                        self.instance.add_missing_case_status(
1038                            TwisterStatus.BLOCK,
1039                            self.instance.reason
1040                        )
1041                        next_op = 'report'
1042                    else:
1043                        if self.instance.testsuite.harness in ['ztest', 'test']:
1044                            logger.debug(
1045                                f"Determine test cases for test instance: {self.instance.name}"
1046                            )
1047                            try:
1048                                self.determine_testcases(results)
1049                                next_op = 'gather_metrics'
1050                            except BuildError as e:
1051                                logger.error(str(e))
1052                                self.instance.status = TwisterStatus.ERROR
1053                                self.instance.reason = str(e)
1054                                next_op = 'report'
1055                        else:
1056                            next_op = 'gather_metrics'
1057            except StatusAttributeError as sae:
1058                logger.error(str(sae))
1059                self.instance.status = TwisterStatus.ERROR
1060                reason = 'Incorrect status assignment'
1061                self.instance.reason = reason
1062                self.instance.add_missing_case_status(TwisterStatus.BLOCK, reason)
1063                next_op = 'report'
1064            finally:
1065                self._add_to_pipeline(pipeline, next_op)
1066
1067        elif op == "gather_metrics":
1068            try:
1069                ret = self.gather_metrics(self.instance)
1070                if not ret or ret.get('returncode', 1) > 0:
1071                    self.instance.status = TwisterStatus.ERROR
1072                    self.instance.reason = "Build Failure at gather_metrics."
1073                    next_op = 'report'
1074                elif self.instance.run and self.instance.handler.ready:
1075                    next_op = 'run'
1076                else:
1077                    if self.instance.status == TwisterStatus.NOTRUN:
1078                        run_conditions =  (
1079                            f"(run:{self.instance.run},"
1080                            f" handler.ready:{self.instance.handler.ready})"
1081                        )
1082                        logger.debug(f"Instance {self.instance.name} can't run {run_conditions}")
1083                        self.instance.add_missing_case_status(
1084                            TwisterStatus.NOTRUN,
1085                            "Nowhere to run"
1086                        )
1087                    next_op = 'report'
1088            except StatusAttributeError as sae:
1089                logger.error(str(sae))
1090                self.instance.status = TwisterStatus.ERROR
1091                reason = 'Incorrect status assignment'
1092                self.instance.reason = reason
1093                self.instance.add_missing_case_status(TwisterStatus.BLOCK, reason)
1094                next_op = 'report'
1095            finally:
1096                self._add_to_pipeline(pipeline, next_op)
1097
1098        # Run the generated binary using one of the supported handlers
1099        elif op == "run":
1100            try:
1101                logger.debug(f"run test: {self.instance.name}")
1102                self.run()
1103                logger.debug(f"run status: {self.instance.name} {self.instance.status}")
1104
1105                # to make it work with pickle
1106                self.instance.handler.thread = None
1107                self.instance.handler.duts = None
1108
1109                next_op = 'report'
1110                additionals = {
1111                    "status": self.instance.status,
1112                    "reason": self.instance.reason
1113                }
1114            except StatusAttributeError as sae:
1115                logger.error(str(sae))
1116                self.instance.status = TwisterStatus.ERROR
1117                reason = 'Incorrect status assignment'
1118                self.instance.reason = reason
1119                self.instance.add_missing_case_status(TwisterStatus.BLOCK, reason)
1120                next_op = 'report'
1121                additionals = {}
1122            finally:
1123                self._add_to_pipeline(pipeline, next_op, additionals)
1124
1125        # Report results and output progress to screen
1126        elif op == "report":
1127            try:
1128                with lock:
1129                    done.put(self.instance)
1130                    self.report_out(results)
1131
1132                if not self.options.coverage:
1133                    if self.options.prep_artifacts_for_testing:
1134                        next_op = 'cleanup'
1135                        additionals = {"mode": "device"}
1136                    elif self.options.runtime_artifact_cleanup == "pass" and \
1137                        self.instance.status in [TwisterStatus.PASS, TwisterStatus.NOTRUN]:
1138                        next_op = 'cleanup'
1139                        additionals = {"mode": "passed"}
1140                    elif self.options.runtime_artifact_cleanup == "all":
1141                        next_op = 'cleanup'
1142                        additionals = {"mode": "all"}
1143            except StatusAttributeError as sae:
1144                logger.error(str(sae))
1145                self.instance.status = TwisterStatus.ERROR
1146                reason = 'Incorrect status assignment'
1147                self.instance.reason = reason
1148                self.instance.add_missing_case_status(TwisterStatus.BLOCK, reason)
1149                next_op = None
1150                additionals = {}
1151            finally:
1152                self._add_to_pipeline(pipeline, next_op, additionals)
1153
1154        elif op == "cleanup":
1155            try:
1156                mode = message.get("mode")
1157                if mode == "device":
1158                    self.cleanup_device_testing_artifacts()
1159                elif (
1160                    mode == "passed"
1161                    or (mode == "all" and self.instance.reason != "CMake build failure")
1162                ):
1163                    self.cleanup_artifacts()
1164            except StatusAttributeError as sae:
1165                logger.error(str(sae))
1166                self.instance.status = TwisterStatus.ERROR
1167                reason = 'Incorrect status assignment'
1168                self.instance.reason = reason
1169                self.instance.add_missing_case_status(TwisterStatus.BLOCK, reason)
1170
1171    def demangle(self, symbol_name):
1172        if symbol_name[:2] == '_Z':
1173            try:
1174                cpp_filt = subprocess.run(
1175                    'c++filt',
1176                    input=symbol_name,
1177                    text=True,
1178                    check=True,
1179                    capture_output=True
1180                )
1181                if self.trace:
1182                    logger.debug(f"Demangle: '{symbol_name}'==>'{cpp_filt.stdout}'")
1183                return cpp_filt.stdout.strip()
1184            except Exception as e:
1185                logger.error(f"Failed to demangle '{symbol_name}': {e}")
1186        return symbol_name
1187
1188    def determine_testcases(self, results):
1189        logger.debug(f"Determine test cases for test suite: {self.instance.testsuite.id}")
1190
1191        new_ztest_unit_test_regex = re.compile(r"z_ztest_unit_test__([^\s]+?)__([^\s]*)")
1192        detected_cases = []
1193
1194        elf_file = self.instance.get_elf_file()
1195        with open(elf_file, "rb") as elf_fp:
1196            elf = ELFFile(elf_fp)
1197
1198            for section in elf.iter_sections():
1199                if isinstance(section, SymbolTableSection):
1200                    for sym in section.iter_symbols():
1201                        # It is only meant for new ztest fx
1202                        # because only new ztest fx exposes test functions precisely.
1203                        m_ = new_ztest_unit_test_regex.search(sym.name)
1204                        if not m_:
1205                            continue
1206                        # Demangle C++ symbols
1207                        m_ = new_ztest_unit_test_regex.search(self.demangle(sym.name))
1208                        if not m_:
1209                            continue
1210                        # The 1st capture group is new ztest suite name.
1211                        # The 2nd capture group is new ztest unit test name.
1212                        new_ztest_suite = m_[1]
1213                        if new_ztest_suite not in self.instance.testsuite.ztest_suite_names:
1214                            logger.warning(
1215                                f"Unexpected Ztest suite '{new_ztest_suite}' "
1216                                f"not present in: {self.instance.testsuite.ztest_suite_names}"
1217                            )
1218                        test_func_name = m_[2].replace("test_", "", 1)
1219                        testcase_id = self.instance.compose_case_name(
1220                            f"{new_ztest_suite}.{test_func_name}"
1221                        )
1222                        detected_cases.append(testcase_id)
1223
1224        logger.debug(
1225            f"Test instance {self.instance.name} already has {len(self.instance.testcases)} cases."
1226        )
1227        if detected_cases:
1228            logger.debug(f"Detected Ztest cases: [{', '.join(detected_cases)}] in {elf_file}")
1229            tc_keeper = {
1230                tc.name: {'status': tc.status, 'reason': tc.reason}
1231                for tc in self.instance.testcases
1232            }
1233            self.instance.testcases.clear()
1234            self.instance.testsuite.testcases.clear()
1235
1236            for testcase_id in detected_cases:
1237                testcase = self.instance.add_testcase(name=testcase_id)
1238                self.instance.testsuite.add_testcase(name=testcase_id)
1239
1240                # Keep previous statuses and reasons
1241                tc_info = tc_keeper.get(testcase_id, {})
1242                if not tc_info and self.trace:
1243                    # Also happens when Ztest uses macroses, eg. DEFINE_TEST_VARIANT
1244                    logger.debug(f"Ztest case '{testcase_id}' discovered for "
1245                                 f"'{self.instance.testsuite.source_dir_rel}' "
1246                                 f"with {list(tc_keeper)}")
1247                testcase.status = tc_info.get('status', TwisterStatus.NONE)
1248                testcase.reason = tc_info.get('reason')
1249
1250
1251    def cleanup_artifacts(self, additional_keep: list[str] = None):
1252        if additional_keep is None:
1253            additional_keep = []
1254        logger.debug(f"Cleaning up {self.instance.build_dir}")
1255        allow = [
1256            os.path.join('zephyr', '.config'),
1257            'handler.log',
1258            'handler_stderr.log',
1259            'build.log',
1260            'device.log',
1261            'recording.csv',
1262            'rom.json',
1263            'ram.json',
1264            # below ones are needed to make --test-only work as well
1265            'Makefile',
1266            'CMakeCache.txt',
1267            'build.ninja',
1268            os.path.join('CMakeFiles', 'rules.ninja')
1269            ]
1270
1271        allow += additional_keep
1272
1273        if self.options.runtime_artifact_cleanup == 'all':
1274            allow += [os.path.join('twister', 'testsuite_extra.conf')]
1275
1276        allow = [os.path.join(self.instance.build_dir, file) for file in allow]
1277
1278        for dirpath, dirnames, filenames in os.walk(self.instance.build_dir, topdown=False):
1279            for name in filenames:
1280                path = os.path.join(dirpath, name)
1281                if path not in allow:
1282                    os.remove(path)
1283            # Remove empty directories and symbolic links to directories
1284            for dir in dirnames:
1285                path = os.path.join(dirpath, dir)
1286                if os.path.islink(path):
1287                    os.remove(path)
1288                elif not os.listdir(path):
1289                    os.rmdir(path)
1290
1291    def cleanup_device_testing_artifacts(self):
1292        logger.debug(f"Cleaning up for Device Testing {self.instance.build_dir}")
1293
1294        files_to_keep = self._get_binaries()
1295        files_to_keep.append(os.path.join('zephyr', 'runners.yaml'))
1296
1297        if self.instance.sysbuild:
1298            files_to_keep.append('domains.yaml')
1299            for domain in self.instance.domains.get_domains():
1300                files_to_keep += self._get_artifact_allow_list_for_domain(domain.name)
1301
1302        self.cleanup_artifacts(files_to_keep)
1303
1304        self._sanitize_files()
1305
1306    def _get_artifact_allow_list_for_domain(self, domain: str) -> list[str]:
1307        """
1308        Return a list of files needed to test a given domain.
1309        """
1310        allow = [
1311            os.path.join(domain, 'build.ninja'),
1312            os.path.join(domain, 'CMakeCache.txt'),
1313            os.path.join(domain, 'CMakeFiles', 'rules.ninja'),
1314            os.path.join(domain, 'Makefile'),
1315            os.path.join(domain, 'zephyr', '.config'),
1316            os.path.join(domain, 'zephyr', 'runners.yaml')
1317            ]
1318        return allow
1319
1320    def _get_binaries(self) -> list[str]:
1321        """
1322        Get list of binaries paths (absolute or relative to the
1323        self.instance.build_dir), basing on information from platform.binaries
1324        or runners.yaml. If they are not found take default binaries like
1325        "zephyr/zephyr.hex" etc.
1326        """
1327        binaries: list[str] = []
1328
1329        platform = self.instance.platform
1330        if platform.binaries:
1331            for binary in platform.binaries:
1332                binaries.append(os.path.join('zephyr', binary))
1333
1334        # Get binaries for a single-domain build
1335        binaries += self._get_binaries_from_runners()
1336        # Get binaries in the case of a multiple-domain build
1337        if self.instance.sysbuild:
1338            for domain in self.instance.domains.get_domains():
1339                binaries += self._get_binaries_from_runners(domain.name)
1340
1341        # if binaries was not found in platform.binaries and runners.yaml take default ones
1342        if len(binaries) == 0:
1343            binaries = [
1344                os.path.join('zephyr', 'zephyr.hex'),
1345                os.path.join('zephyr', 'zephyr.bin'),
1346                os.path.join('zephyr', 'zephyr.elf'),
1347                os.path.join('zephyr', 'zephyr.exe'),
1348            ]
1349        return binaries
1350
1351    def _get_binaries_from_runners(self, domain='') -> list[str]:
1352        """
1353        Get list of binaries paths (absolute or relative to the
1354        self.instance.build_dir) from runners.yaml file. May be used for
1355        multiple-domain builds by passing in one domain at a time.
1356        """
1357
1358        runners_file_path: str = os.path.join(self.instance.build_dir,
1359                                              domain, 'zephyr', 'runners.yaml')
1360        if not os.path.exists(runners_file_path):
1361            return []
1362
1363        with open(runners_file_path) as file:
1364            runners_content: dict = yaml.load(file, Loader=SafeLoader)
1365
1366        if 'config' not in runners_content:
1367            return []
1368
1369        runners_config: dict = runners_content['config']
1370        binary_keys: list[str] = ['elf_file', 'hex_file', 'bin_file']
1371
1372        binaries: list[str] = []
1373        for binary_key in binary_keys:
1374            binary_path = runners_config.get(binary_key)
1375            if binary_path is None:
1376                continue
1377            if os.path.isabs(binary_path):
1378                binaries.append(binary_path)
1379            else:
1380                binaries.append(os.path.join(domain, 'zephyr', binary_path))
1381
1382        return binaries
1383
1384    def _sanitize_files(self):
1385        """
1386        Sanitize files to make it possible to flash those file on different
1387        computer/system.
1388        """
1389        self._sanitize_runners_file()
1390        self._sanitize_zephyr_base_from_files()
1391
1392    def _sanitize_runners_file(self):
1393        """
1394        Replace absolute paths of binary files for relative ones. The base
1395        directory for those files is f"{self.instance.build_dir}/zephyr"
1396        """
1397        runners_dir_path: str = os.path.join(self.instance.build_dir, 'zephyr')
1398        runners_file_path: str = os.path.join(runners_dir_path, 'runners.yaml')
1399        if not os.path.exists(runners_file_path):
1400            return
1401
1402        with open(runners_file_path) as file:
1403            runners_content_text = file.read()
1404            runners_content_yaml: dict = yaml.load(runners_content_text, Loader=SafeLoader)
1405
1406        if 'config' not in runners_content_yaml:
1407            return
1408
1409        runners_config: dict = runners_content_yaml['config']
1410        binary_keys: list[str] = ['elf_file', 'hex_file', 'bin_file']
1411
1412        for binary_key in binary_keys:
1413            binary_path = runners_config.get(binary_key)
1414            # sanitize only paths which exist and are absolute
1415            if binary_path is None or not os.path.isabs(binary_path):
1416                continue
1417            binary_path_relative = os.path.relpath(binary_path, start=runners_dir_path)
1418            runners_content_text = runners_content_text.replace(binary_path, binary_path_relative)
1419
1420        with open(runners_file_path, 'w') as file:
1421            file.write(runners_content_text)
1422
1423    def _sanitize_zephyr_base_from_files(self):
1424        """
1425        Remove Zephyr base paths from selected files.
1426        """
1427        files_to_sanitize = [
1428            'CMakeCache.txt',
1429            os.path.join('zephyr', 'runners.yaml'),
1430        ]
1431        for file_path in files_to_sanitize:
1432            file_path = os.path.join(self.instance.build_dir, file_path)
1433            if not os.path.exists(file_path):
1434                continue
1435
1436            with open(file_path) as file:
1437                data = file.read()
1438
1439            # add trailing slash at the end of canonical_zephyr_base if it does not exist:
1440            path_to_remove = os.path.join(canonical_zephyr_base, "")
1441            data = data.replace(path_to_remove, "")
1442
1443            with open(file_path, 'w') as file:
1444                file.write(data)
1445
1446    @staticmethod
1447    def _add_instance_testcases_to_status_counts(instance, results, decrement=False):
1448        increment_value = -1 if decrement else 1
1449        for tc in instance.testcases:
1450            match tc.status:
1451                case TwisterStatus.PASS:
1452                    results.passed_cases_increment(increment_value)
1453                case TwisterStatus.NOTRUN:
1454                    results.notrun_cases_increment(increment_value)
1455                case TwisterStatus.BLOCK:
1456                    results.blocked_cases_increment(increment_value)
1457                case TwisterStatus.SKIP:
1458                    results.skipped_cases_increment(increment_value)
1459                case TwisterStatus.FILTER:
1460                    results.filtered_cases_increment(increment_value)
1461                case TwisterStatus.ERROR:
1462                    results.error_cases_increment(increment_value)
1463                case TwisterStatus.FAIL:
1464                    results.failed_cases_increment(increment_value)
1465                # Statuses that should not appear.
1466                # Crashing Twister at this point would be counterproductive,
1467                # but having those statuses in this part of processing is an error.
1468                case TwisterStatus.NONE:
1469                    results.none_cases_increment(increment_value)
1470                    logger.warning(f'A None status detected in instance {instance.name},'
1471                                 f' test case {tc.name}.')
1472                    results.warnings_increment(1)
1473                case TwisterStatus.STARTED:
1474                    results.started_cases_increment(increment_value)
1475                    logger.warning(f'A started status detected in instance {instance.name},'
1476                                 f' test case {tc.name}.')
1477                    results.warnings_increment(1)
1478                case _:
1479                    logger.warning(
1480                        f'An unknown status "{tc.status}" detected in instance {instance.name},'
1481                        f' test case {tc.name}.'
1482                    )
1483                    results.warnings_increment(1)
1484
1485
1486    def report_out(self, results):
1487        total_to_do = results.total - results.filtered_static
1488        total_tests_width = len(str(total_to_do))
1489        results.done_increment()
1490        instance = self.instance
1491        if results.iteration == 1:
1492            results.cases_increment(len(instance.testcases))
1493
1494        self._add_instance_testcases_to_status_counts(instance, results)
1495
1496        status = (
1497            f'{TwisterStatus.get_color(instance.status)}{str.upper(instance.status)}{Fore.RESET}'
1498        )
1499
1500        if instance.status in [TwisterStatus.ERROR, TwisterStatus.FAIL]:
1501            if instance.status == TwisterStatus.ERROR:
1502                results.error_increment()
1503            else:
1504                results.failed_increment()
1505            if self.options.verbose:
1506                status += " " + instance.reason
1507            else:
1508                logger.error(
1509                    f"{instance.platform.name:<25} {instance.testsuite.name:<50}"
1510                    f" {status}: {instance.reason}"
1511                )
1512            if not self.options.verbose:
1513                self.log_info_file(self.options.inline_logs)
1514        elif instance.status == TwisterStatus.SKIP:
1515            results.skipped_increment()
1516        elif instance.status == TwisterStatus.FILTER:
1517            results.filtered_configs_increment()
1518        elif instance.status == TwisterStatus.PASS:
1519            results.passed_increment()
1520        elif instance.status == TwisterStatus.NOTRUN:
1521            results.notrun_increment()
1522        else:
1523            logger.debug(f"Unknown status = {instance.status}")
1524            status = Fore.YELLOW + "UNKNOWN" + Fore.RESET
1525
1526        if self.options.verbose:
1527            if self.options.cmake_only:
1528                more_info = "cmake"
1529            elif instance.status in [TwisterStatus.SKIP, TwisterStatus.FILTER]:
1530                more_info = instance.reason
1531            else:
1532                if instance.handler.ready and instance.run:
1533                    more_info = instance.handler.type_str
1534                    htime = instance.execution_time
1535                    if instance.dut:
1536                        more_info += f": {instance.dut},"
1537                    if htime:
1538                        more_info += f" {htime:.3f}s"
1539                else:
1540                    more_info = "build"
1541
1542                if ( instance.status in [TwisterStatus.ERROR, TwisterStatus.FAIL]
1543                     and hasattr(self.instance.handler, 'seed')
1544                     and self.instance.handler.seed is not None ):
1545                    more_info += "/seed: " + str(self.options.seed)
1546            logger.info(
1547                f"{results.done - results.filtered_static:>{total_tests_width}}/{total_to_do}"
1548                f" {instance.platform.name:<25} {instance.testsuite.name:<50}"
1549                f" {status} ({more_info})"
1550            )
1551
1552            if self.options.verbose > 1:
1553                for tc in self.instance.testcases:
1554                    color = TwisterStatus.get_color(tc.status)
1555                    logger.info(f'    {" ":<{total_tests_width+25+4}} {tc.name:<75} '
1556                                f'{color}{str.upper(tc.status.value):<12}{Fore.RESET}'
1557                                f'{" " + tc.reason if tc.reason else ""}')
1558
1559            if instance.status in [TwisterStatus.ERROR, TwisterStatus.FAIL]:
1560                self.log_info_file(self.options.inline_logs)
1561        else:
1562            completed_perc = 0
1563            if total_to_do > 0:
1564                completed_perc = int(
1565                    (float(results.done - results.filtered_static) / total_to_do) * 100
1566                )
1567
1568            unfiltered = results.done - results.filtered_static
1569            complete_section = (
1570                f"{TwisterStatus.get_color(TwisterStatus.PASS)}"
1571                f"{unfiltered:>4}/{total_to_do:>4}"
1572                f"{Fore.RESET}  {completed_perc:>2}%"
1573            )
1574            notrun_section = (
1575                f"{TwisterStatus.get_color(TwisterStatus.NOTRUN)}{results.notrun:>4}{Fore.RESET}"
1576            )
1577            filtered_section_color = (
1578                TwisterStatus.get_color(TwisterStatus.SKIP)
1579                if results.filtered_configs > 0
1580                else Fore.RESET
1581            )
1582            filtered_section = (
1583                f"{filtered_section_color}{results.filtered_configs:>4}{Fore.RESET}"
1584            )
1585            failed_section_color = (
1586                TwisterStatus.get_color(TwisterStatus.FAIL) if results.failed > 0 else Fore.RESET
1587            )
1588            failed_section = (
1589                f"{failed_section_color}{results.failed:>4}{Fore.RESET}"
1590            )
1591            error_section_color = (
1592                TwisterStatus.get_color(TwisterStatus.ERROR) if results.error > 0 else Fore.RESET
1593            )
1594            error_section = (
1595                f"{error_section_color}{results.error:>4}{Fore.RESET}"
1596            )
1597            sys.stdout.write(
1598                f"INFO    - Total complete: {complete_section}"
1599                f"  built (not run): {notrun_section},"
1600                f" filtered: {filtered_section},"
1601                f" failed: {failed_section},"
1602                f" error: {error_section}\r"
1603            )
1604        sys.stdout.flush()
1605
1606    @staticmethod
1607    def cmake_assemble_args(extra_args, handler, extra_conf_files, extra_overlay_confs,
1608                            extra_dtc_overlay_files, cmake_extra_args,
1609                            build_dir):
1610        # Retain quotes around config options
1611        config_options = [arg for arg in extra_args if arg.startswith("CONFIG_")]
1612        args = [arg for arg in extra_args if not arg.startswith("CONFIG_")]
1613
1614        args_expanded = ["-D{}".format(a.replace('"', '\"')) for a in config_options]
1615
1616        if handler.ready:
1617            args.extend(handler.args)
1618
1619        if extra_conf_files:
1620            args.append(f"CONF_FILE=\"{';'.join(extra_conf_files)}\"")
1621
1622        if extra_dtc_overlay_files:
1623            args.append(f"DTC_OVERLAY_FILE=\"{';'.join(extra_dtc_overlay_files)}\"")
1624
1625        # merge overlay files into one variable
1626        overlays = extra_overlay_confs.copy()
1627
1628        additional_overlay_path = os.path.join(
1629            build_dir, "twister", "testsuite_extra.conf"
1630        )
1631        if os.path.exists(additional_overlay_path):
1632            overlays.append(additional_overlay_path)
1633
1634        if overlays:
1635            args.append(f"OVERLAY_CONFIG=\"{' '.join(overlays)}\"")
1636
1637        # Build the final argument list
1638        args_expanded.extend(["-D{}".format(a.replace('"', '\"')) for a in cmake_extra_args])
1639        args_expanded.extend(["-D{}".format(a.replace('"', '')) for a in args])
1640
1641        return args_expanded
1642
1643    def cmake(self, filter_stages=None):
1644        if filter_stages is None:
1645            filter_stages = []
1646        args = []
1647        for va in self.testsuite.extra_args.copy():
1648            cond_args = va.split(":")
1649            if cond_args[0] == "arch" and len(cond_args) == 3:
1650                if self.instance.platform.arch == cond_args[1]:
1651                    args.append(cond_args[2])
1652            elif cond_args[0] == "platform" and len(cond_args) == 3:
1653                if self.instance.platform.name == cond_args[1]:
1654                    args.append(cond_args[2])
1655            elif cond_args[0] == "simulation" and len(cond_args) == 3:
1656                if self.instance.platform.simulation == cond_args[1]:
1657                    args.append(cond_args[2])
1658            else:
1659                if cond_args[0] in ["arch", "platform", "simulation"]:
1660                    logger.warning(f"Unexpected extra_args: {va}")
1661                args.append(va)
1662
1663
1664        args = self.cmake_assemble_args(
1665            args,
1666            self.instance.handler,
1667            self.testsuite.extra_conf_files,
1668            self.testsuite.extra_overlay_confs,
1669            self.testsuite.extra_dtc_overlay_files,
1670            self.options.extra_args, # CMake extra args
1671            self.instance.build_dir,
1672        )
1673        return self.run_cmake(args,filter_stages)
1674
1675    def build(self):
1676        harness = HarnessImporter.get_harness(self.instance.testsuite.harness.capitalize())
1677        build_result = self.run_build(['--build', self.build_dir])
1678        try:
1679            if harness:
1680                harness.instance = self.instance
1681                harness.build()
1682        except ConfigurationError as error:
1683            self.instance.status = TwisterStatus.ERROR
1684            self.instance.reason = str(error)
1685            logger.error(self.instance.reason)
1686            return
1687        return build_result
1688
1689    def run(self):
1690
1691        instance = self.instance
1692
1693        if instance.handler.ready:
1694            logger.debug(f"Reset instance status from '{instance.status}' to None before run.")
1695            instance.status = TwisterStatus.NONE
1696
1697            if instance.handler.type_str == "device":
1698                instance.handler.duts = self.duts
1699
1700            if(self.options.seed is not None and instance.platform.name.startswith("native_")):
1701                self.parse_generated()
1702                if('CONFIG_FAKE_ENTROPY_NATIVE_POSIX' in self.defconfig and
1703                    self.defconfig['CONFIG_FAKE_ENTROPY_NATIVE_POSIX'] == 'y'):
1704                    instance.handler.seed = self.options.seed
1705
1706            if self.options.extra_test_args and instance.platform.arch == "posix":
1707                instance.handler.extra_test_args = self.options.extra_test_args
1708
1709            harness = HarnessImporter.get_harness(instance.testsuite.harness.capitalize())
1710            try:
1711                harness.configure(instance)
1712            except ConfigurationError as error:
1713                instance.status = TwisterStatus.ERROR
1714                instance.reason = str(error)
1715                logger.error(instance.reason)
1716                return
1717            #
1718            if isinstance(harness, Pytest):
1719                harness.pytest_run(instance.handler.get_test_timeout())
1720            else:
1721                instance.handler.handle(harness)
1722
1723        sys.stdout.flush()
1724
1725    def gather_metrics(self, instance: TestInstance):
1726        build_result = {"returncode": 0}
1727        if self.options.create_rom_ram_report:
1728            build_result = self.run_build(['--build', self.build_dir, "--target", "footprint"])
1729        if self.options.enable_size_report and not self.options.cmake_only:
1730            self.calc_size(instance=instance, from_buildlog=self.options.footprint_from_buildlog)
1731        else:
1732            instance.metrics["used_ram"] = 0
1733            instance.metrics["used_rom"] = 0
1734            instance.metrics["available_rom"] = 0
1735            instance.metrics["available_ram"] = 0
1736            instance.metrics["unrecognized"] = []
1737        return build_result
1738
1739    @staticmethod
1740    def calc_size(instance: TestInstance, from_buildlog: bool):
1741        if instance.status not in [TwisterStatus.ERROR, TwisterStatus.FAIL, TwisterStatus.SKIP]:
1742            if instance.platform.type not in ["native", "qemu", "unit"]:
1743                generate_warning = bool(instance.platform.type == "mcu")
1744                size_calc = instance.calculate_sizes(
1745                    from_buildlog=from_buildlog,
1746                    generate_warning=generate_warning
1747                )
1748                instance.metrics["used_ram"] = size_calc.get_used_ram()
1749                instance.metrics["used_rom"] = size_calc.get_used_rom()
1750                instance.metrics["available_rom"] = size_calc.get_available_rom()
1751                instance.metrics["available_ram"] = size_calc.get_available_ram()
1752                instance.metrics["unrecognized"] = size_calc.unrecognized_sections()
1753            else:
1754                instance.metrics["used_ram"] = 0
1755                instance.metrics["used_rom"] = 0
1756                instance.metrics["available_rom"] = 0
1757                instance.metrics["available_ram"] = 0
1758                instance.metrics["unrecognized"] = []
1759            instance.metrics["handler_time"] = instance.execution_time
1760
1761class TwisterRunner:
1762
1763    def __init__(self, instances, suites, env=None) -> None:
1764        self.pipeline = None
1765        self.options = env.options
1766        self.env = env
1767        self.instances = instances
1768        self.suites = suites
1769        self.duts = None
1770        self.jobs = 1
1771        self.results = None
1772        self.jobserver = None
1773
1774    def run(self):
1775
1776        retries = self.options.retry_failed + 1
1777
1778        BaseManager.register('LifoQueue', queue.LifoQueue)
1779        manager = BaseManager()
1780        manager.start()
1781
1782        self.results = ExecutionCounter(total=len(self.instances))
1783        self.iteration = 0
1784        pipeline = manager.LifoQueue()
1785        done_queue = manager.LifoQueue()
1786
1787        # Set number of jobs
1788        if self.options.jobs:
1789            self.jobs = self.options.jobs
1790        elif self.options.build_only:
1791            self.jobs = multiprocessing.cpu_count() * 2
1792        else:
1793            self.jobs = multiprocessing.cpu_count()
1794
1795        if sys.platform == "linux":
1796            if os.name == 'posix':
1797                self.jobserver = GNUMakeJobClient.from_environ(jobs=self.options.jobs)
1798                if not self.jobserver:
1799                    self.jobserver = GNUMakeJobServer(self.jobs)
1800                elif self.jobserver.jobs:
1801                    self.jobs = self.jobserver.jobs
1802            # TODO: Implement this on windows/mac also
1803            else:
1804                self.jobserver = JobClient()
1805
1806            logger.info(f"JOBS: {self.jobs}")
1807
1808        self.update_counting_before_pipeline()
1809
1810        while True:
1811            self.results.iteration_increment()
1812
1813            if self.results.iteration > 1:
1814                logger.info(f"{self.results.iteration} Iteration:")
1815                time.sleep(self.options.retry_interval)  # waiting for the system to settle down
1816                self.results.done = self.results.total - self.results.failed
1817                self.results.failed = 0
1818                if self.options.retry_build_errors:
1819                    self.results.error = 0
1820                    self.results.done -= self.results.error
1821            else:
1822                self.results.done = self.results.filtered_static
1823
1824            self.execute(pipeline, done_queue)
1825
1826            while True:
1827                try:
1828                    inst = done_queue.get_nowait()
1829                except queue.Empty:
1830                    break
1831                else:
1832                    inst.metrics.update(self.instances[inst.name].metrics)
1833                    inst.metrics["handler_time"] = inst.execution_time
1834                    inst.metrics["unrecognized"] = []
1835                    self.instances[inst.name] = inst
1836
1837            print("")
1838
1839            retry_errors = False
1840            if self.results.error and self.options.retry_build_errors:
1841                retry_errors = True
1842
1843            retries = retries - 1
1844            if retries == 0 or ( self.results.failed == 0 and not retry_errors):
1845                break
1846
1847        self.show_brief()
1848
1849    def update_counting_before_pipeline(self):
1850        '''
1851        Updating counting before pipeline is necessary because statically filterd
1852        test instance never enter the pipeline. While some pipeline output needs
1853        the static filter stats. So need to prepare them before pipline starts.
1854        '''
1855        for instance in self.instances.values():
1856            if instance.status == TwisterStatus.FILTER and instance.reason != 'runtime filter':
1857                self.results.filtered_static_increment()
1858                self.results.filtered_configs_increment()
1859                self.results.filtered_cases_increment(len(instance.testsuite.testcases))
1860                self.results.cases_increment(len(instance.testsuite.testcases))
1861            elif instance.status == TwisterStatus.ERROR:
1862                self.results.error_increment()
1863
1864    def show_brief(self):
1865        logger.info(
1866            f"{len(self.suites)} test scenarios ({len(self.instances)} configurations) selected,"
1867            f" {self.results.filtered_configs} configurations filtered"
1868            f" ({self.results.filtered_static} by static filter,"
1869            f" {self.results.filtered_configs - self.results.filtered_static} at runtime)."
1870        )
1871
1872    def add_tasks_to_queue(
1873        self,
1874        pipeline,
1875        build_only=False,
1876        test_only=False,
1877        retry_build_errors=False
1878    ):
1879        for instance in self.instances.values():
1880            if build_only:
1881                instance.run = False
1882
1883            no_retry_statuses = [
1884                TwisterStatus.PASS,
1885                TwisterStatus.SKIP,
1886                TwisterStatus.FILTER,
1887                TwisterStatus.NOTRUN
1888            ]
1889            if not retry_build_errors:
1890                no_retry_statuses.append(TwisterStatus.ERROR)
1891
1892            if instance.status not in no_retry_statuses:
1893                logger.debug(f"adding {instance.name}")
1894                if instance.status != TwisterStatus.NONE:
1895                    instance.retries += 1
1896                instance.status = TwisterStatus.NONE
1897                # Previous states should be removed from the stats
1898                if self.results.iteration > 1:
1899                    ProjectBuilder._add_instance_testcases_to_status_counts(
1900                        instance,
1901                        self.results,
1902                        decrement=True
1903                    )
1904
1905                # Check if cmake package_helper script can be run in advance.
1906                instance.filter_stages = []
1907                if instance.testsuite.filter:
1908                    instance.filter_stages = self.get_cmake_filter_stages(
1909                        instance.testsuite.filter,
1910                        expr_parser.reserved.keys()
1911                    )
1912
1913                if test_only and instance.run:
1914                    pipeline.put({"op": "run", "test": instance})
1915                elif instance.filter_stages and "full" not in instance.filter_stages:
1916                    pipeline.put({"op": "filter", "test": instance})
1917                else:
1918                    cache_file = os.path.join(instance.build_dir, "CMakeCache.txt")
1919                    if os.path.exists(cache_file) and self.env.options.aggressive_no_clean:
1920                        pipeline.put({"op": "build", "test": instance})
1921                    else:
1922                        pipeline.put({"op": "cmake", "test": instance})
1923
1924
1925    def pipeline_mgr(self, pipeline, done_queue, lock, results):
1926        try:
1927            if sys.platform == 'linux':
1928                with self.jobserver.get_job():
1929                    while True:
1930                        try:
1931                            task = pipeline.get_nowait()
1932                        except queue.Empty:
1933                            break
1934                        else:
1935                            instance = task['test']
1936                            pb = ProjectBuilder(instance, self.env, self.jobserver)
1937                            pb.duts = self.duts
1938                            pb.process(pipeline, done_queue, task, lock, results)
1939                            if self.env.options.quit_on_failure and \
1940                                pb.instance.status in [TwisterStatus.FAIL, TwisterStatus.ERROR]:
1941                                with pipeline.mutex:
1942                                    pipeline.queue.clear()
1943                                break
1944
1945                    return True
1946            else:
1947                while True:
1948                    try:
1949                        task = pipeline.get_nowait()
1950                    except queue.Empty:
1951                        break
1952                    else:
1953                        instance = task['test']
1954                        pb = ProjectBuilder(instance, self.env, self.jobserver)
1955                        pb.duts = self.duts
1956                        pb.process(pipeline, done_queue, task, lock, results)
1957                        if self.env.options.quit_on_failure and \
1958                            pb.instance.status in [TwisterStatus.FAIL, TwisterStatus.ERROR]:
1959                            with pipeline.mutex:
1960                                pipeline.queue.clear()
1961                            break
1962                return True
1963        except Exception as e:
1964            logger.error(f"General exception: {e}")
1965            sys.exit(1)
1966
1967    def execute(self, pipeline, done):
1968        lock = Lock()
1969        logger.info("Adding tasks to the queue...")
1970        self.add_tasks_to_queue(pipeline, self.options.build_only, self.options.test_only,
1971                                retry_build_errors=self.options.retry_build_errors)
1972        logger.info("Added initial list of jobs to queue")
1973
1974        processes = []
1975
1976        for _ in range(self.jobs):
1977            p = Process(target=self.pipeline_mgr, args=(pipeline, done, lock, self.results, ))
1978            processes.append(p)
1979            p.start()
1980        logger.debug(f"Launched {self.jobs} jobs")
1981
1982        try:
1983            for p in processes:
1984                p.join()
1985                if p.exitcode != 0:
1986                    logger.error(f"Process {p.pid} failed, aborting execution")
1987                    for proc in processes:
1988                        proc.terminate()
1989                    sys.exit(1)
1990        except KeyboardInterrupt:
1991            logger.info("Execution interrupted")
1992            for p in processes:
1993                p.terminate()
1994
1995    @staticmethod
1996    def get_cmake_filter_stages(filt, logic_keys):
1997        """Analyze filter expressions from test yaml
1998        and decide if dts and/or kconfig based filtering will be needed.
1999        """
2000        dts_required = False
2001        kconfig_required = False
2002        full_required = False
2003        filter_stages = []
2004
2005        # Compress args in expressions like "function('x', 'y')"
2006        # so they are not split when splitting by whitespaces
2007        filt = filt.replace(", ", ",")
2008        # Remove logic words
2009        for k in logic_keys:
2010            filt = filt.replace(f"{k} ", "")
2011        # Remove brackets
2012        filt = filt.replace("(", "")
2013        filt = filt.replace(")", "")
2014        # Splite by whitespaces
2015        filt = filt.split()
2016        for expression in filt:
2017            if expression.startswith("dt_"):
2018                dts_required = True
2019            elif expression.startswith("CONFIG"):
2020                kconfig_required = True
2021            else:
2022                full_required = True
2023
2024        if full_required:
2025            return ["full"]
2026        if dts_required:
2027            filter_stages.append("dts")
2028        if kconfig_required:
2029            filter_stages.append("kconfig")
2030
2031        return filter_stages
2032