1# vim: set syntax=python ts=4 :
2#
3# Copyright (c) 20180-2022 Intel Corporation
4# Copyright 2022 NXP
5# SPDX-License-Identifier: Apache-2.0
6
7import logging
8import multiprocessing
9import os
10import pickle
11import queue
12import re
13import shutil
14import subprocess
15import sys
16import time
17import traceback
18import yaml
19from multiprocessing import Lock, Process, Value
20from multiprocessing.managers import BaseManager
21from typing import List
22from packaging import version
23
24from colorama import Fore
25from domains import Domains
26from twisterlib.cmakecache import CMakeCache
27from twisterlib.environment import canonical_zephyr_base
28from twisterlib.error import BuildError, ConfigurationError
29
30import elftools
31from elftools.elf.elffile import ELFFile
32from elftools.elf.sections import SymbolTableSection
33
34if version.parse(elftools.__version__) < version.parse('0.24'):
35    sys.exit("pyelftools is out of date, need version 0.24 or later")
36
37# Job server only works on Linux for now.
38if sys.platform == 'linux':
39    from twisterlib.jobserver import GNUMakeJobClient, GNUMakeJobServer, JobClient
40
41from twisterlib.log_helper import log_command
42from twisterlib.testinstance import TestInstance
43from twisterlib.environment import TwisterEnv
44from twisterlib.testsuite import TestSuite
45from twisterlib.platform import Platform
46from twisterlib.testplan import change_skip_to_error_if_integration
47from twisterlib.harness import HarnessImporter, Pytest
48
49try:
50    from yaml import CSafeLoader as SafeLoader
51except ImportError:
52    from yaml import SafeLoader
53
54logger = logging.getLogger('twister')
55logger.setLevel(logging.DEBUG)
56import expr_parser
57
58
59class ExecutionCounter(object):
60    def __init__(self, total=0):
61        '''
62        Most of the stats are at test instance level
63        Except that "_cases" and "_skipped_cases" are for cases of ALL test instances
64
65        total complete = done + skipped_filter
66        total = yaml test scenarios * applicable platforms
67        complete perctenage = (done + skipped_filter) / total
68        pass rate = passed / (total - skipped_configs)
69        '''
70        # instances that go through the pipeline
71        # updated by report_out()
72        self._done = Value('i', 0)
73
74        # iteration
75        self._iteration = Value('i', 0)
76
77        # instances that actually executed and passed
78        # updated by report_out()
79        self._passed = Value('i', 0)
80
81        # static filter + runtime filter + build skipped
82        # updated by update_counting_before_pipeline() and report_out()
83        self._skipped_configs = Value('i', 0)
84
85        # cmake filter + build skipped
86        # updated by report_out()
87        self._skipped_runtime = Value('i', 0)
88
89        # staic filtered at yaml parsing time
90        # updated by update_counting_before_pipeline()
91        self._skipped_filter = Value('i', 0)
92
93        # updated by update_counting_before_pipeline() and report_out()
94        self._skipped_cases = Value('i', 0)
95
96        # updated by report_out() in pipeline
97        self._error = Value('i', 0)
98        self._failed = Value('i', 0)
99
100        # initialized to number of test instances
101        self._total = Value('i', total)
102
103        # updated in report_out
104        self._cases = Value('i', 0)
105        self.lock = Lock()
106
107    def summary(self):
108        print("--------------------------------")
109        print(f"Total test suites: {self.total}") # actually test instances
110        print(f"Total test cases: {self.cases}")
111        print(f"Executed test cases: {self.cases - self.skipped_cases}")
112        print(f"Skipped test cases: {self.skipped_cases}")
113        print(f"Completed test suites: {self.done}")
114        print(f"Passing test suites: {self.passed}")
115        print(f"Failing test suites: {self.failed}")
116        print(f"Skipped test suites: {self.skipped_configs}")
117        print(f"Skipped test suites (runtime): {self.skipped_runtime}")
118        print(f"Skipped test suites (filter): {self.skipped_filter}")
119        print(f"Errors: {self.error}")
120        print("--------------------------------")
121
122    @property
123    def cases(self):
124        with self._cases.get_lock():
125            return self._cases.value
126
127    @cases.setter
128    def cases(self, value):
129        with self._cases.get_lock():
130            self._cases.value = value
131
132    @property
133    def skipped_cases(self):
134        with self._skipped_cases.get_lock():
135            return self._skipped_cases.value
136
137    @skipped_cases.setter
138    def skipped_cases(self, value):
139        with self._skipped_cases.get_lock():
140            self._skipped_cases.value = value
141
142    @property
143    def error(self):
144        with self._error.get_lock():
145            return self._error.value
146
147    @error.setter
148    def error(self, value):
149        with self._error.get_lock():
150            self._error.value = value
151
152    @property
153    def iteration(self):
154        with self._iteration.get_lock():
155            return self._iteration.value
156
157    @iteration.setter
158    def iteration(self, value):
159        with self._iteration.get_lock():
160            self._iteration.value = value
161
162    @property
163    def done(self):
164        with self._done.get_lock():
165            return self._done.value
166
167    @done.setter
168    def done(self, value):
169        with self._done.get_lock():
170            self._done.value = value
171
172    @property
173    def passed(self):
174        with self._passed.get_lock():
175            return self._passed.value
176
177    @passed.setter
178    def passed(self, value):
179        with self._passed.get_lock():
180            self._passed.value = value
181
182    @property
183    def skipped_configs(self):
184        with self._skipped_configs.get_lock():
185            return self._skipped_configs.value
186
187    @skipped_configs.setter
188    def skipped_configs(self, value):
189        with self._skipped_configs.get_lock():
190            self._skipped_configs.value = value
191
192    @property
193    def skipped_filter(self):
194        with self._skipped_filter.get_lock():
195            return self._skipped_filter.value
196
197    @skipped_filter.setter
198    def skipped_filter(self, value):
199        with self._skipped_filter.get_lock():
200            self._skipped_filter.value = value
201
202    @property
203    def skipped_runtime(self):
204        with self._skipped_runtime.get_lock():
205            return self._skipped_runtime.value
206
207    @skipped_runtime.setter
208    def skipped_runtime(self, value):
209        with self._skipped_runtime.get_lock():
210            self._skipped_runtime.value = value
211
212    @property
213    def failed(self):
214        with self._failed.get_lock():
215            return self._failed.value
216
217    @failed.setter
218    def failed(self, value):
219        with self._failed.get_lock():
220            self._failed.value = value
221
222    @property
223    def total(self):
224        with self._total.get_lock():
225            return self._total.value
226
227class CMake:
228    config_re = re.compile('(CONFIG_[A-Za-z0-9_]+)[=]\"?([^\"]*)\"?$')
229    dt_re = re.compile('([A-Za-z0-9_]+)[=]\"?([^\"]*)\"?$')
230
231    def __init__(self, testsuite: TestSuite, platform: Platform, source_dir, build_dir, jobserver):
232
233        self.cwd = None
234        self.capture_output = True
235
236        self.defconfig = {}
237        self.cmake_cache = {}
238
239        self.instance = None
240        self.testsuite = testsuite
241        self.platform = platform
242        self.source_dir = source_dir
243        self.build_dir = build_dir
244        self.log = "build.log"
245
246        self.default_encoding = sys.getdefaultencoding()
247        self.jobserver = jobserver
248
249    def parse_generated(self, filter_stages=[]):
250        self.defconfig = {}
251        return {}
252
253    def run_build(self, args=[]):
254
255        logger.debug("Building %s for %s" % (self.source_dir, self.platform.name))
256
257        cmake_args = []
258        cmake_args.extend(args)
259        cmake = shutil.which('cmake')
260        cmd = [cmake] + cmake_args
261        kwargs = dict()
262
263        if self.capture_output:
264            kwargs['stdout'] = subprocess.PIPE
265            # CMake sends the output of message() to stderr unless it's STATUS
266            kwargs['stderr'] = subprocess.STDOUT
267
268        if self.cwd:
269            kwargs['cwd'] = self.cwd
270
271        start_time = time.time()
272        if sys.platform == 'linux':
273            p = self.jobserver.popen(cmd, **kwargs)
274        else:
275            p = subprocess.Popen(cmd, **kwargs)
276        logger.debug(f'Running {" ".join(cmd)}')
277
278        out, _ = p.communicate()
279
280        ret = {}
281        duration = time.time() - start_time
282        self.instance.build_time += duration
283        if p.returncode == 0:
284            msg = f"Finished building {self.source_dir} for {self.platform.name} in {duration:.2f} seconds"
285            logger.debug(msg)
286
287            self.instance.status = "passed"
288            if not self.instance.run:
289                self.instance.add_missing_case_status("skipped", "Test was built only")
290            ret = {"returncode": p.returncode}
291
292            if out:
293                log_msg = out.decode(self.default_encoding)
294                with open(os.path.join(self.build_dir, self.log), "a", encoding=self.default_encoding) as log:
295                    log.write(log_msg)
296            else:
297                return None
298        else:
299            # A real error occurred, raise an exception
300            log_msg = ""
301            if out:
302                log_msg = out.decode(self.default_encoding)
303                with open(os.path.join(self.build_dir, self.log), "a", encoding=self.default_encoding) as log:
304                    log.write(log_msg)
305
306            if log_msg:
307                overflow_found = re.findall("region `(FLASH|ROM|RAM|ICCM|DCCM|SRAM|dram\\d_\\d_seg)' overflowed by", log_msg)
308                imgtool_overflow_found = re.findall(r"Error: Image size \(.*\) \+ trailer \(.*\) exceeds requested size", log_msg)
309                if overflow_found and not self.options.overflow_as_errors:
310                    logger.debug("Test skipped due to {} Overflow".format(overflow_found[0]))
311                    self.instance.status = "skipped"
312                    self.instance.reason = "{} overflow".format(overflow_found[0])
313                    change_skip_to_error_if_integration(self.options, self.instance)
314                elif imgtool_overflow_found and not self.options.overflow_as_errors:
315                    self.instance.status = "skipped"
316                    self.instance.reason = "imgtool overflow"
317                    change_skip_to_error_if_integration(self.options, self.instance)
318                else:
319                    self.instance.status = "error"
320                    self.instance.reason = "Build failure"
321
322            ret = {
323                "returncode": p.returncode
324            }
325
326        return ret
327
328    def run_cmake(self, args="", filter_stages=[]):
329
330        if not self.options.disable_warnings_as_errors:
331            warnings_as_errors = 'y'
332            gen_defines_args = "--edtlib-Werror"
333        else:
334            warnings_as_errors = 'n'
335            gen_defines_args = ""
336
337        warning_command = 'CONFIG_COMPILER_WARNINGS_AS_ERRORS'
338        if self.instance.sysbuild:
339            warning_command = 'SB_' + warning_command
340
341        logger.debug("Running cmake on %s for %s" % (self.source_dir, self.platform.name))
342        cmake_args = [
343            f'-B{self.build_dir}',
344            f'-DTC_RUNID={self.instance.run_id}',
345            f'-D{warning_command}={warnings_as_errors}',
346            f'-DEXTRA_GEN_DEFINES_ARGS={gen_defines_args}',
347            f'-G{self.env.generator}'
348        ]
349
350        # If needed, run CMake using the package_helper script first, to only run
351        # a subset of all cmake modules. This output will be used to filter
352        # testcases, and the full CMake configuration will be run for
353        # testcases that should be built
354        if filter_stages:
355            cmake_filter_args = [
356                f'-DMODULES={",".join(filter_stages)}',
357                f'-P{canonical_zephyr_base}/cmake/package_helper.cmake',
358            ]
359
360        if self.instance.sysbuild and not filter_stages:
361            logger.debug("Building %s using sysbuild" % (self.source_dir))
362            source_args = [
363                f'-S{canonical_zephyr_base}/share/sysbuild',
364                f'-DAPP_DIR={self.source_dir}'
365            ]
366        else:
367            source_args = [
368                f'-S{self.source_dir}'
369            ]
370        cmake_args.extend(source_args)
371
372        cmake_args.extend(args)
373
374        cmake_opts = ['-DBOARD={}'.format(self.platform.name)]
375        cmake_args.extend(cmake_opts)
376
377        if self.instance.testsuite.required_snippets:
378            cmake_opts = ['-DSNIPPET={}'.format(';'.join(self.instance.testsuite.required_snippets))]
379            cmake_args.extend(cmake_opts)
380
381        cmake = shutil.which('cmake')
382        cmd = [cmake] + cmake_args
383
384        if filter_stages:
385            cmd += cmake_filter_args
386
387        kwargs = dict()
388
389        log_command(logger, "Calling cmake", cmd)
390
391        if self.capture_output:
392            kwargs['stdout'] = subprocess.PIPE
393            # CMake sends the output of message() to stderr unless it's STATUS
394            kwargs['stderr'] = subprocess.STDOUT
395
396        if self.cwd:
397            kwargs['cwd'] = self.cwd
398
399        start_time = time.time()
400        if sys.platform == 'linux':
401            p = self.jobserver.popen(cmd, **kwargs)
402        else:
403            p = subprocess.Popen(cmd, **kwargs)
404        out, _ = p.communicate()
405
406        duration = time.time() - start_time
407        self.instance.build_time += duration
408
409        if p.returncode == 0:
410            filter_results = self.parse_generated(filter_stages)
411            msg = f"Finished running cmake {self.source_dir} for {self.platform.name} in {duration:.2f} seconds"
412            logger.debug(msg)
413            ret = {
414                    'returncode': p.returncode,
415                    'filter': filter_results
416                    }
417        else:
418            self.instance.status = "error"
419            self.instance.reason = "Cmake build failure"
420
421            for tc in self.instance.testcases:
422                tc.status = self.instance.status
423
424            logger.error("Cmake build failure: %s for %s" % (self.source_dir, self.platform.name))
425            ret = {"returncode": p.returncode}
426
427        if out:
428            os.makedirs(self.build_dir, exist_ok=True)
429            with open(os.path.join(self.build_dir, self.log), "a", encoding=self.default_encoding) as log:
430                log_msg = out.decode(self.default_encoding)
431                log.write(log_msg)
432
433        return ret
434
435
436class FilterBuilder(CMake):
437
438    def __init__(self, testsuite: TestSuite, platform: Platform, source_dir, build_dir, jobserver):
439        super().__init__(testsuite, platform, source_dir, build_dir, jobserver)
440
441        self.log = "config-twister.log"
442
443    def parse_generated(self, filter_stages=[]):
444
445        if self.platform.name == "unit_testing":
446            return {}
447
448        if self.instance.sysbuild and not filter_stages:
449            # Load domain yaml to get default domain build directory
450            domain_path = os.path.join(self.build_dir, "domains.yaml")
451            domains = Domains.from_file(domain_path)
452            logger.debug("Loaded sysbuild domain data from %s" % (domain_path))
453            self.instance.domains = domains
454            domain_build = domains.get_default_domain().build_dir
455            cmake_cache_path = os.path.join(domain_build, "CMakeCache.txt")
456            defconfig_path = os.path.join(domain_build, "zephyr", ".config")
457            edt_pickle = os.path.join(domain_build, "zephyr", "edt.pickle")
458        else:
459            cmake_cache_path = os.path.join(self.build_dir, "CMakeCache.txt")
460            # .config is only available after kconfig stage in cmake. If only dt based filtration is required
461            # package helper call won't produce .config
462            if not filter_stages or "kconfig" in filter_stages:
463                defconfig_path = os.path.join(self.build_dir, "zephyr", ".config")
464            # dt is compiled before kconfig, so edt_pickle is available regardless of choice of filter stages
465            edt_pickle = os.path.join(self.build_dir, "zephyr", "edt.pickle")
466
467
468        if not filter_stages or "kconfig" in filter_stages:
469            with open(defconfig_path, "r") as fp:
470                defconfig = {}
471                for line in fp.readlines():
472                    m = self.config_re.match(line)
473                    if not m:
474                        if line.strip() and not line.startswith("#"):
475                            sys.stderr.write("Unrecognized line %s\n" % line)
476                        continue
477                    defconfig[m.group(1)] = m.group(2).strip()
478
479            self.defconfig = defconfig
480
481        cmake_conf = {}
482        try:
483            cache = CMakeCache.from_file(cmake_cache_path)
484        except FileNotFoundError:
485            cache = {}
486
487        for k in iter(cache):
488            cmake_conf[k.name] = k.value
489
490        self.cmake_cache = cmake_conf
491
492        filter_data = {
493            "ARCH": self.platform.arch,
494            "PLATFORM": self.platform.name
495        }
496        filter_data.update(os.environ)
497        if not filter_stages or "kconfig" in filter_stages:
498            filter_data.update(self.defconfig)
499        filter_data.update(self.cmake_cache)
500
501        if self.instance.sysbuild and self.env.options.device_testing:
502            # Verify that twister's arguments support sysbuild.
503            # Twister sysbuild flashing currently only works with west, so
504            # --west-flash must be passed.
505            if self.env.options.west_flash is None:
506                logger.warning("Sysbuild test will be skipped. " +
507                    "West must be used for flashing.")
508                return {os.path.join(self.platform.name, self.testsuite.name): True}
509
510        if self.testsuite and self.testsuite.filter:
511            try:
512                if os.path.exists(edt_pickle):
513                    with open(edt_pickle, 'rb') as f:
514                        edt = pickle.load(f)
515                else:
516                    edt = None
517                ret = expr_parser.parse(self.testsuite.filter, filter_data, edt)
518
519            except (ValueError, SyntaxError) as se:
520                sys.stderr.write(
521                    "Failed processing %s\n" % self.testsuite.yamlfile)
522                raise se
523
524            if not ret:
525                return {os.path.join(self.platform.name, self.testsuite.name): True}
526            else:
527                return {os.path.join(self.platform.name, self.testsuite.name): False}
528        else:
529            self.platform.filter_data = filter_data
530            return filter_data
531
532
533class ProjectBuilder(FilterBuilder):
534
535    def __init__(self, instance: TestInstance, env: TwisterEnv, jobserver, **kwargs):
536        super().__init__(instance.testsuite, instance.platform, instance.testsuite.source_dir, instance.build_dir, jobserver)
537
538        self.log = "build.log"
539        self.instance = instance
540        self.filtered_tests = 0
541        self.options = env.options
542        self.env = env
543        self.duts = None
544
545    def log_info(self, filename, inline_logs, log_testcases=False):
546        filename = os.path.abspath(os.path.realpath(filename))
547        if inline_logs:
548            logger.info("{:-^100}".format(filename))
549
550            try:
551                with open(filename) as fp:
552                    data = fp.read()
553            except Exception as e:
554                data = "Unable to read log data (%s)\n" % (str(e))
555
556            # Remove any coverage data from the dumped logs
557            data = re.sub(
558                r"GCOV_COVERAGE_DUMP_START.*GCOV_COVERAGE_DUMP_END",
559                "GCOV_COVERAGE_DUMP_START\n...\nGCOV_COVERAGE_DUMP_END",
560                data,
561                flags=re.DOTALL,
562            )
563            logger.error(data)
564
565            logger.info("{:-^100}".format(filename))
566
567            if log_testcases:
568                for tc in self.instance.testcases:
569                    if not tc.reason:
570                        continue
571                    logger.info(
572                        f"\n{str(tc.name).center(100, '_')}\n"
573                        f"{tc.reason}\n"
574                        f"{100*'_'}\n"
575                        f"{tc.output}"
576                    )
577        else:
578            logger.error("see: " + Fore.YELLOW + filename + Fore.RESET)
579
580    def log_info_file(self, inline_logs):
581        build_dir = self.instance.build_dir
582        h_log = "{}/handler.log".format(build_dir)
583        he_log = "{}/handler_stderr.log".format(build_dir)
584        b_log = "{}/build.log".format(build_dir)
585        v_log = "{}/valgrind.log".format(build_dir)
586        d_log = "{}/device.log".format(build_dir)
587        pytest_log = "{}/twister_harness.log".format(build_dir)
588
589        if os.path.exists(v_log) and "Valgrind" in self.instance.reason:
590            self.log_info("{}".format(v_log), inline_logs)
591        elif os.path.exists(pytest_log) and os.path.getsize(pytest_log) > 0:
592            self.log_info("{}".format(pytest_log), inline_logs, log_testcases=True)
593        elif os.path.exists(h_log) and os.path.getsize(h_log) > 0:
594            self.log_info("{}".format(h_log), inline_logs)
595        elif os.path.exists(he_log) and os.path.getsize(he_log) > 0:
596            self.log_info("{}".format(he_log), inline_logs)
597        elif os.path.exists(d_log) and os.path.getsize(d_log) > 0:
598            self.log_info("{}".format(d_log), inline_logs)
599        else:
600            self.log_info("{}".format(b_log), inline_logs)
601
602
603    def process(self, pipeline, done, message, lock, results):
604        op = message.get('op')
605
606        self.instance.setup_handler(self.env)
607
608        if op == "filter":
609            ret = self.cmake(filter_stages=self.instance.filter_stages)
610            if self.instance.status in ["failed", "error"]:
611                pipeline.put({"op": "report", "test": self.instance})
612            else:
613                # Here we check the dt/kconfig filter results coming from running cmake
614                if self.instance.name in ret['filter'] and ret['filter'][self.instance.name]:
615                    logger.debug("filtering %s" % self.instance.name)
616                    self.instance.status = "filtered"
617                    self.instance.reason = "runtime filter"
618                    results.skipped_runtime += 1
619                    self.instance.add_missing_case_status("skipped")
620                    pipeline.put({"op": "report", "test": self.instance})
621                else:
622                    pipeline.put({"op": "cmake", "test": self.instance})
623
624        # The build process, call cmake and build with configured generator
625        elif op == "cmake":
626            ret = self.cmake()
627            if self.instance.status in ["failed", "error"]:
628                pipeline.put({"op": "report", "test": self.instance})
629            elif self.options.cmake_only:
630                if self.instance.status is None:
631                    self.instance.status = "passed"
632                pipeline.put({"op": "report", "test": self.instance})
633            else:
634                # Here we check the runtime filter results coming from running cmake
635                if self.instance.name in ret['filter'] and ret['filter'][self.instance.name]:
636                    logger.debug("filtering %s" % self.instance.name)
637                    self.instance.status = "filtered"
638                    self.instance.reason = "runtime filter"
639                    results.skipped_runtime += 1
640                    self.instance.add_missing_case_status("skipped")
641                    pipeline.put({"op": "report", "test": self.instance})
642                else:
643                    pipeline.put({"op": "build", "test": self.instance})
644
645        elif op == "build":
646            logger.debug("build test: %s" % self.instance.name)
647            ret = self.build()
648            if not ret:
649                self.instance.status = "error"
650                self.instance.reason = "Build Failure"
651                pipeline.put({"op": "report", "test": self.instance})
652            else:
653                # Count skipped cases during build, for example
654                # due to ram/rom overflow.
655                if  self.instance.status == "skipped":
656                    results.skipped_runtime += 1
657                    self.instance.add_missing_case_status("skipped", self.instance.reason)
658
659                if ret.get('returncode', 1) > 0:
660                    self.instance.add_missing_case_status("blocked", self.instance.reason)
661                    pipeline.put({"op": "report", "test": self.instance})
662                else:
663                    if self.instance.testsuite.harness in ['ztest', 'test']:
664                        logger.debug(f"Determine test cases for test instance: {self.instance.name}")
665                        try:
666                            self.determine_testcases(results)
667                            pipeline.put({"op": "gather_metrics", "test": self.instance})
668                        except BuildError as e:
669                            logger.error(str(e))
670                            self.instance.status = "error"
671                            self.instance.reason = str(e)
672                            pipeline.put({"op": "report", "test": self.instance})
673                    else:
674                        pipeline.put({"op": "gather_metrics", "test": self.instance})
675
676        elif op == "gather_metrics":
677            ret = self.gather_metrics(self.instance)
678            if not ret or ret.get('returncode', 1) > 0:
679                self.instance.status = "error"
680                self.instance.reason = "Build Failure at gather_metrics."
681                pipeline.put({"op": "report", "test": self.instance})
682            elif self.instance.run and self.instance.handler.ready:
683                pipeline.put({"op": "run", "test": self.instance})
684            else:
685                pipeline.put({"op": "report", "test": self.instance})
686
687        # Run the generated binary using one of the supported handlers
688        elif op == "run":
689            logger.debug("run test: %s" % self.instance.name)
690            self.run()
691            logger.debug(f"run status: {self.instance.name} {self.instance.status}")
692            try:
693                # to make it work with pickle
694                self.instance.handler.thread = None
695                self.instance.handler.duts = None
696                pipeline.put({
697                    "op": "report",
698                    "test": self.instance,
699                    "status": self.instance.status,
700                    "reason": self.instance.reason
701                    }
702                )
703            except RuntimeError as e:
704                logger.error(f"RuntimeError: {e}")
705                traceback.print_exc()
706
707        # Report results and output progress to screen
708        elif op == "report":
709            with lock:
710                done.put(self.instance)
711                self.report_out(results)
712
713            if not self.options.coverage:
714                if self.options.prep_artifacts_for_testing:
715                    pipeline.put({"op": "cleanup", "mode": "device", "test": self.instance})
716                elif self.options.runtime_artifact_cleanup == "pass" and self.instance.status == "passed":
717                    pipeline.put({"op": "cleanup", "mode": "passed", "test": self.instance})
718                elif self.options.runtime_artifact_cleanup == "all":
719                    pipeline.put({"op": "cleanup", "mode": "all", "test": self.instance})
720
721        elif op == "cleanup":
722            mode = message.get("mode")
723            if mode == "device":
724                self.cleanup_device_testing_artifacts()
725            elif mode == "passed" or (mode == "all" and self.instance.reason != "Cmake build failure"):
726                self.cleanup_artifacts()
727
728    def determine_testcases(self, results):
729        yaml_testsuite_name = self.instance.testsuite.id
730        logger.debug(f"Determine test cases for test suite: {yaml_testsuite_name}")
731
732        elf_file = self.instance.get_elf_file()
733        elf = ELFFile(open(elf_file, "rb"))
734
735        logger.debug(f"Test instance {self.instance.name} already has {len(self.instance.testcases)} cases.")
736        new_ztest_unit_test_regex = re.compile(r"z_ztest_unit_test__([^\s]+?)__([^\s]*)")
737        detected_cases = []
738        for section in elf.iter_sections():
739            if isinstance(section, SymbolTableSection):
740                for sym in section.iter_symbols():
741                    # It is only meant for new ztest fx because only new ztest fx exposes test functions
742                    # precisely.
743
744                    # The 1st capture group is new ztest suite name.
745                    # The 2nd capture group is new ztest unit test name.
746                    matches = new_ztest_unit_test_regex.findall(sym.name)
747                    if matches:
748                        for m in matches:
749                            # new_ztest_suite = m[0] # not used for now
750                            test_func_name = m[1].replace("test_", "", 1)
751                            testcase_id = f"{yaml_testsuite_name}.{test_func_name}"
752                            detected_cases.append(testcase_id)
753
754        if detected_cases:
755            logger.debug(f"{', '.join(detected_cases)} in {elf_file}")
756            self.instance.testcases.clear()
757            self.instance.testsuite.testcases.clear()
758
759            # When the old regex-based test case collection is fully deprecated,
760            # this will be the sole place where test cases get added to the test instance.
761            # Then we can further include the new_ztest_suite info in the testcase_id.
762
763            for testcase_id in detected_cases:
764                self.instance.add_testcase(name=testcase_id)
765                self.instance.testsuite.add_testcase(name=testcase_id)
766
767
768    def cleanup_artifacts(self, additional_keep: List[str] = []):
769        logger.debug("Cleaning up {}".format(self.instance.build_dir))
770        allow = [
771            os.path.join('zephyr', '.config'),
772            'handler.log',
773            'handler_stderr.log',
774            'build.log',
775            'device.log',
776            'recording.csv',
777            'rom.json',
778            'ram.json',
779            # below ones are needed to make --test-only work as well
780            'Makefile',
781            'CMakeCache.txt',
782            'build.ninja',
783            os.path.join('CMakeFiles', 'rules.ninja')
784            ]
785
786        allow += additional_keep
787
788        if self.options.runtime_artifact_cleanup == 'all':
789            allow += [os.path.join('twister', 'testsuite_extra.conf')]
790
791        allow = [os.path.join(self.instance.build_dir, file) for file in allow]
792
793        for dirpath, dirnames, filenames in os.walk(self.instance.build_dir, topdown=False):
794            for name in filenames:
795                path = os.path.join(dirpath, name)
796                if path not in allow:
797                    os.remove(path)
798            # Remove empty directories and symbolic links to directories
799            for dir in dirnames:
800                path = os.path.join(dirpath, dir)
801                if os.path.islink(path):
802                    os.remove(path)
803                elif not os.listdir(path):
804                    os.rmdir(path)
805
806    def cleanup_device_testing_artifacts(self):
807        logger.debug("Cleaning up for Device Testing {}".format(self.instance.build_dir))
808
809        files_to_keep = self._get_binaries()
810        files_to_keep.append(os.path.join('zephyr', 'runners.yaml'))
811
812        if self.instance.sysbuild:
813            files_to_keep.append('domains.yaml')
814            for domain in self.instance.domains.get_domains():
815                files_to_keep += self._get_artifact_allow_list_for_domain(domain.name)
816
817        self.cleanup_artifacts(files_to_keep)
818
819        self._sanitize_files()
820
821    def _get_artifact_allow_list_for_domain(self, domain: str) -> List[str]:
822        """
823        Return a list of files needed to test a given domain.
824        """
825        allow = [
826            os.path.join(domain, 'build.ninja'),
827            os.path.join(domain, 'CMakeCache.txt'),
828            os.path.join(domain, 'CMakeFiles', 'rules.ninja'),
829            os.path.join(domain, 'Makefile'),
830            os.path.join(domain, 'zephyr', '.config'),
831            os.path.join(domain, 'zephyr', 'runners.yaml')
832            ]
833        return allow
834
835    def _get_binaries(self) -> List[str]:
836        """
837        Get list of binaries paths (absolute or relative to the
838        self.instance.build_dir), basing on information from platform.binaries
839        or runners.yaml. If they are not found take default binaries like
840        "zephyr/zephyr.hex" etc.
841        """
842        binaries: List[str] = []
843
844        platform = self.instance.platform
845        if platform.binaries:
846            for binary in platform.binaries:
847                binaries.append(os.path.join('zephyr', binary))
848
849        # Get binaries for a single-domain build
850        binaries += self._get_binaries_from_runners()
851        # Get binaries in the case of a multiple-domain build
852        if self.instance.sysbuild:
853            for domain in self.instance.domains.get_domains():
854                binaries += self._get_binaries_from_runners(domain.name)
855
856        # if binaries was not found in platform.binaries and runners.yaml take default ones
857        if len(binaries) == 0:
858            binaries = [
859                os.path.join('zephyr', 'zephyr.hex'),
860                os.path.join('zephyr', 'zephyr.bin'),
861                os.path.join('zephyr', 'zephyr.elf'),
862                os.path.join('zephyr', 'zephyr.exe'),
863            ]
864        return binaries
865
866    def _get_binaries_from_runners(self, domain='') -> List[str]:
867        """
868        Get list of binaries paths (absolute or relative to the
869        self.instance.build_dir) from runners.yaml file. May be used for
870        multiple-domain builds by passing in one domain at a time.
871        """
872
873        runners_file_path: str = os.path.join(self.instance.build_dir,
874                                              domain, 'zephyr', 'runners.yaml')
875        if not os.path.exists(runners_file_path):
876            return []
877
878        with open(runners_file_path, 'r') as file:
879            runners_content: dict = yaml.load(file, Loader=SafeLoader)
880
881        if 'config' not in runners_content:
882            return []
883
884        runners_config: dict = runners_content['config']
885        binary_keys: List[str] = ['elf_file', 'hex_file', 'bin_file']
886
887        binaries: List[str] = []
888        for binary_key in binary_keys:
889            binary_path = runners_config.get(binary_key)
890            if binary_path is None:
891                continue
892            if os.path.isabs(binary_path):
893                binaries.append(binary_path)
894            else:
895                binaries.append(os.path.join(domain, 'zephyr', binary_path))
896
897        return binaries
898
899    def _sanitize_files(self):
900        """
901        Sanitize files to make it possible to flash those file on different
902        computer/system.
903        """
904        self._sanitize_runners_file()
905        self._sanitize_zephyr_base_from_files()
906
907    def _sanitize_runners_file(self):
908        """
909        Replace absolute paths of binary files for relative ones. The base
910        directory for those files is f"{self.instance.build_dir}/zephyr"
911        """
912        runners_dir_path: str = os.path.join(self.instance.build_dir, 'zephyr')
913        runners_file_path: str = os.path.join(runners_dir_path, 'runners.yaml')
914        if not os.path.exists(runners_file_path):
915            return
916
917        with open(runners_file_path, 'rt') as file:
918            runners_content_text = file.read()
919            runners_content_yaml: dict = yaml.load(runners_content_text, Loader=SafeLoader)
920
921        if 'config' not in runners_content_yaml:
922            return
923
924        runners_config: dict = runners_content_yaml['config']
925        binary_keys: List[str] = ['elf_file', 'hex_file', 'bin_file']
926
927        for binary_key in binary_keys:
928            binary_path = runners_config.get(binary_key)
929            # sanitize only paths which exist and are absolute
930            if binary_path is None or not os.path.isabs(binary_path):
931                continue
932            binary_path_relative = os.path.relpath(binary_path, start=runners_dir_path)
933            runners_content_text = runners_content_text.replace(binary_path, binary_path_relative)
934
935        with open(runners_file_path, 'wt') as file:
936            file.write(runners_content_text)
937
938    def _sanitize_zephyr_base_from_files(self):
939        """
940        Remove Zephyr base paths from selected files.
941        """
942        files_to_sanitize = [
943            'CMakeCache.txt',
944            os.path.join('zephyr', 'runners.yaml'),
945        ]
946        for file_path in files_to_sanitize:
947            file_path = os.path.join(self.instance.build_dir, file_path)
948            if not os.path.exists(file_path):
949                continue
950
951            with open(file_path, "rt") as file:
952                data = file.read()
953
954            # add trailing slash at the end of canonical_zephyr_base if it does not exist:
955            path_to_remove = os.path.join(canonical_zephyr_base, "")
956            data = data.replace(path_to_remove, "")
957
958            with open(file_path, "wt") as file:
959                file.write(data)
960
961    def report_out(self, results):
962        total_to_do = results.total
963        total_tests_width = len(str(total_to_do))
964        results.done += 1
965        instance = self.instance
966        if results.iteration == 1:
967            results.cases += len(instance.testcases)
968
969        if instance.status in ["error", "failed"]:
970            if instance.status == "error":
971                results.error += 1
972                txt = " ERROR "
973            else:
974                results.failed += 1
975                txt = " FAILED "
976            if self.options.verbose:
977                status = Fore.RED + txt + Fore.RESET + instance.reason
978            else:
979                logger.error(
980                    "{:<25} {:<50} {}{}{}: {}".format(
981                        instance.platform.name,
982                        instance.testsuite.name,
983                        Fore.RED,
984                        txt,
985                        Fore.RESET,
986                        instance.reason))
987            if not self.options.verbose:
988                self.log_info_file(self.options.inline_logs)
989        elif instance.status in ["skipped", "filtered"]:
990            status = Fore.YELLOW + "SKIPPED" + Fore.RESET
991            results.skipped_configs += 1
992            # test cases skipped at the test instance level
993            results.skipped_cases += len(instance.testsuite.testcases)
994        elif instance.status == "passed":
995            status = Fore.GREEN + "PASSED" + Fore.RESET
996            results.passed += 1
997            for case in instance.testcases:
998                # test cases skipped at the test case level
999                if case.status == 'skipped':
1000                    results.skipped_cases += 1
1001        else:
1002            logger.debug(f"Unknown status = {instance.status}")
1003            status = Fore.YELLOW + "UNKNOWN" + Fore.RESET
1004
1005        if self.options.verbose:
1006            if self.options.cmake_only:
1007                more_info = "cmake"
1008            elif instance.status in ["skipped", "filtered"]:
1009                more_info = instance.reason
1010            else:
1011                if instance.handler.ready and instance.run:
1012                    more_info = instance.handler.type_str
1013                    htime = instance.execution_time
1014                    if instance.dut:
1015                        more_info += f": {instance.dut},"
1016                    if htime:
1017                        more_info += " {:.3f}s".format(htime)
1018                else:
1019                    more_info = "build"
1020
1021                if ( instance.status in ["error", "failed", "timeout", "flash_error"]
1022                     and hasattr(self.instance.handler, 'seed')
1023                     and self.instance.handler.seed is not None ):
1024                    more_info += "/seed: " + str(self.options.seed)
1025            logger.info("{:>{}}/{} {:<25} {:<50} {} ({})".format(
1026                results.done, total_tests_width, total_to_do , instance.platform.name,
1027                instance.testsuite.name, status, more_info))
1028
1029            if instance.status in ["error", "failed", "timeout"]:
1030                self.log_info_file(self.options.inline_logs)
1031        else:
1032            completed_perc = 0
1033            if total_to_do > 0:
1034                completed_perc = int((float(results.done) / total_to_do) * 100)
1035
1036            sys.stdout.write("INFO    - Total complete: %s%4d/%4d%s  %2d%%  skipped: %s%4d%s, failed: %s%4d%s, error: %s%4d%s\r" % (
1037                Fore.GREEN,
1038                results.done,
1039                total_to_do,
1040                Fore.RESET,
1041                completed_perc,
1042                Fore.YELLOW if results.skipped_configs > 0 else Fore.RESET,
1043                results.skipped_configs,
1044                Fore.RESET,
1045                Fore.RED if results.failed > 0 else Fore.RESET,
1046                results.failed,
1047                Fore.RESET,
1048                Fore.RED if results.error > 0 else Fore.RESET,
1049                results.error,
1050                Fore.RESET
1051                )
1052                )
1053        sys.stdout.flush()
1054
1055    @staticmethod
1056    def cmake_assemble_args(extra_args, handler, extra_conf_files, extra_overlay_confs,
1057                            extra_dtc_overlay_files, cmake_extra_args,
1058                            build_dir):
1059        # Retain quotes around config options
1060        config_options = [arg for arg in extra_args if arg.startswith("CONFIG_")]
1061        args = [arg for arg in extra_args if not arg.startswith("CONFIG_")]
1062
1063        args_expanded = ["-D{}".format(a.replace('"', '\"')) for a in config_options]
1064
1065        if handler.ready:
1066            args.extend(handler.args)
1067
1068        if extra_conf_files:
1069            args.append(f"CONF_FILE=\"{';'.join(extra_conf_files)}\"")
1070
1071        if extra_dtc_overlay_files:
1072            args.append(f"DTC_OVERLAY_FILE=\"{';'.join(extra_dtc_overlay_files)}\"")
1073
1074        # merge overlay files into one variable
1075        overlays = extra_overlay_confs.copy()
1076
1077        additional_overlay_path = os.path.join(
1078            build_dir, "twister", "testsuite_extra.conf"
1079        )
1080        if os.path.exists(additional_overlay_path):
1081            overlays.append(additional_overlay_path)
1082
1083        if overlays:
1084            args.append("OVERLAY_CONFIG=\"%s\"" % (" ".join(overlays)))
1085
1086        # Build the final argument list
1087        args_expanded.extend(["-D{}".format(a.replace('"', '\"')) for a in cmake_extra_args])
1088        args_expanded.extend(["-D{}".format(a.replace('"', '')) for a in args])
1089
1090        return args_expanded
1091
1092    def cmake(self, filter_stages=[]):
1093        args = self.cmake_assemble_args(
1094            self.testsuite.extra_args.copy(), # extra_args from YAML
1095            self.instance.handler,
1096            self.testsuite.extra_conf_files,
1097            self.testsuite.extra_overlay_confs,
1098            self.testsuite.extra_dtc_overlay_files,
1099            self.options.extra_args, # CMake extra args
1100            self.instance.build_dir,
1101        )
1102        return self.run_cmake(args,filter_stages)
1103
1104    def build(self):
1105        harness = HarnessImporter.get_harness(self.instance.testsuite.harness.capitalize())
1106        build_result = self.run_build(['--build', self.build_dir])
1107        try:
1108            if harness:
1109                harness.instance = self.instance
1110                harness.build()
1111        except ConfigurationError as error:
1112            self.instance.status = "error"
1113            self.instance.reason = str(error)
1114            logger.error(self.instance.reason)
1115            return
1116        return build_result
1117
1118    def run(self):
1119
1120        instance = self.instance
1121
1122        if instance.handler.ready:
1123            logger.debug(f"Reset instance status from '{instance.status}' to None before run.")
1124            instance.status = None
1125
1126            if instance.handler.type_str == "device":
1127                instance.handler.duts = self.duts
1128
1129            if(self.options.seed is not None and instance.platform.name.startswith("native_")):
1130                self.parse_generated()
1131                if('CONFIG_FAKE_ENTROPY_NATIVE_POSIX' in self.defconfig and
1132                    self.defconfig['CONFIG_FAKE_ENTROPY_NATIVE_POSIX'] == 'y'):
1133                    instance.handler.seed = self.options.seed
1134
1135            if self.options.extra_test_args and instance.platform.arch == "posix":
1136                instance.handler.extra_test_args = self.options.extra_test_args
1137
1138            harness = HarnessImporter.get_harness(instance.testsuite.harness.capitalize())
1139            try:
1140                harness.configure(instance)
1141            except ConfigurationError as error:
1142                instance.status = "error"
1143                instance.reason = str(error)
1144                logger.error(instance.reason)
1145                return
1146            #
1147            if isinstance(harness, Pytest):
1148                harness.pytest_run(instance.handler.get_test_timeout())
1149            else:
1150                instance.handler.handle(harness)
1151
1152        sys.stdout.flush()
1153
1154    def gather_metrics(self, instance: TestInstance):
1155        build_result = {"returncode": 0}
1156        if self.options.create_rom_ram_report:
1157            build_result = self.run_build(['--build', self.build_dir, "--target", "footprint"])
1158        if self.options.enable_size_report and not self.options.cmake_only:
1159            self.calc_size(instance=instance, from_buildlog=self.options.footprint_from_buildlog)
1160        else:
1161            instance.metrics["used_ram"] = 0
1162            instance.metrics["used_rom"] = 0
1163            instance.metrics["available_rom"] = 0
1164            instance.metrics["available_ram"] = 0
1165            instance.metrics["unrecognized"] = []
1166        return build_result
1167
1168    @staticmethod
1169    def calc_size(instance: TestInstance, from_buildlog: bool):
1170        if instance.status not in ["error", "failed", "skipped"]:
1171            if not instance.platform.type in ["native", "qemu", "unit"]:
1172                generate_warning = bool(instance.platform.type == "mcu")
1173                size_calc = instance.calculate_sizes(from_buildlog=from_buildlog, generate_warning=generate_warning)
1174                instance.metrics["used_ram"] = size_calc.get_used_ram()
1175                instance.metrics["used_rom"] = size_calc.get_used_rom()
1176                instance.metrics["available_rom"] = size_calc.get_available_rom()
1177                instance.metrics["available_ram"] = size_calc.get_available_ram()
1178                instance.metrics["unrecognized"] = size_calc.unrecognized_sections()
1179            else:
1180                instance.metrics["used_ram"] = 0
1181                instance.metrics["used_rom"] = 0
1182                instance.metrics["available_rom"] = 0
1183                instance.metrics["available_ram"] = 0
1184                instance.metrics["unrecognized"] = []
1185            instance.metrics["handler_time"] = instance.execution_time
1186
1187class TwisterRunner:
1188
1189    def __init__(self, instances, suites, env=None) -> None:
1190        self.pipeline = None
1191        self.options = env.options
1192        self.env = env
1193        self.instances = instances
1194        self.suites = suites
1195        self.duts = None
1196        self.jobs = 1
1197        self.results = None
1198        self.jobserver = None
1199
1200    def run(self):
1201
1202        retries = self.options.retry_failed + 1
1203
1204        BaseManager.register('LifoQueue', queue.LifoQueue)
1205        manager = BaseManager()
1206        manager.start()
1207
1208        self.results = ExecutionCounter(total=len(self.instances))
1209        self.iteration = 0
1210        pipeline = manager.LifoQueue()
1211        done_queue = manager.LifoQueue()
1212
1213        # Set number of jobs
1214        if self.options.jobs:
1215            self.jobs = self.options.jobs
1216        elif self.options.build_only:
1217            self.jobs = multiprocessing.cpu_count() * 2
1218        else:
1219            self.jobs = multiprocessing.cpu_count()
1220
1221        if sys.platform == "linux":
1222            if os.name == 'posix':
1223                self.jobserver = GNUMakeJobClient.from_environ(jobs=self.options.jobs)
1224                if not self.jobserver:
1225                    self.jobserver = GNUMakeJobServer(self.jobs)
1226                elif self.jobserver.jobs:
1227                    self.jobs = self.jobserver.jobs
1228            # TODO: Implement this on windows/mac also
1229            else:
1230                self.jobserver = JobClient()
1231
1232            logger.info("JOBS: %d", self.jobs)
1233
1234        self.update_counting_before_pipeline()
1235
1236        while True:
1237            self.results.iteration += 1
1238
1239            if self.results.iteration > 1:
1240                logger.info("%d Iteration:" % (self.results.iteration))
1241                time.sleep(self.options.retry_interval)  # waiting for the system to settle down
1242                self.results.done = self.results.total - self.results.failed - self.results.error
1243                self.results.failed = 0
1244                if self.options.retry_build_errors:
1245                    self.results.error = 0
1246            else:
1247                self.results.done = self.results.skipped_filter
1248
1249            self.execute(pipeline, done_queue)
1250
1251            while True:
1252                try:
1253                    inst = done_queue.get_nowait()
1254                except queue.Empty:
1255                    break
1256                else:
1257                    inst.metrics.update(self.instances[inst.name].metrics)
1258                    inst.metrics["handler_time"] = inst.execution_time
1259                    inst.metrics["unrecognized"] = []
1260                    self.instances[inst.name] = inst
1261
1262            print("")
1263
1264            retry_errors = False
1265            if self.results.error and self.options.retry_build_errors:
1266                retry_errors = True
1267
1268            retries = retries - 1
1269            if retries == 0 or ( self.results.failed == 0 and not retry_errors):
1270                break
1271
1272        self.show_brief()
1273
1274    def update_counting_before_pipeline(self):
1275        '''
1276        Updating counting before pipeline is necessary because statically filterd
1277        test instance never enter the pipeline. While some pipeline output needs
1278        the static filter stats. So need to prepare them before pipline starts.
1279        '''
1280        for instance in self.instances.values():
1281            if instance.status == 'filtered' and not instance.reason == 'runtime filter':
1282                self.results.skipped_filter += 1
1283                self.results.skipped_configs += 1
1284                self.results.skipped_cases += len(instance.testsuite.testcases)
1285                self.results.cases += len(instance.testsuite.testcases)
1286            elif instance.status == 'error':
1287                self.results.error += 1
1288
1289    def show_brief(self):
1290        logger.info("%d test scenarios (%d test instances) selected, "
1291                    "%d configurations skipped (%d by static filter, %d at runtime)." %
1292                    (len(self.suites), len(self.instances),
1293                    self.results.skipped_configs,
1294                    self.results.skipped_filter,
1295                    self.results.skipped_configs - self.results.skipped_filter))
1296
1297    def add_tasks_to_queue(self, pipeline, build_only=False, test_only=False, retry_build_errors=False):
1298        for instance in self.instances.values():
1299            if build_only:
1300                instance.run = False
1301
1302            no_retry_statuses = ['passed', 'skipped', 'filtered']
1303            if not retry_build_errors:
1304                no_retry_statuses.append("error")
1305
1306            if instance.status not in no_retry_statuses:
1307                logger.debug(f"adding {instance.name}")
1308                if instance.status:
1309                    instance.retries += 1
1310                instance.status = None
1311
1312                # Check if cmake package_helper script can be run in advance.
1313                instance.filter_stages = []
1314                if instance.testsuite.filter:
1315                    instance.filter_stages = self.get_cmake_filter_stages(instance.testsuite.filter, expr_parser.reserved.keys())
1316
1317                if test_only and instance.run:
1318                    pipeline.put({"op": "run", "test": instance})
1319                elif instance.filter_stages and "full" not in instance.filter_stages:
1320                    pipeline.put({"op": "filter", "test": instance})
1321                else:
1322                    cache_file = os.path.join(instance.build_dir, "CMakeCache.txt")
1323                    if os.path.exists(cache_file) and self.env.options.aggressive_no_clean:
1324                        pipeline.put({"op": "build", "test": instance})
1325                    else:
1326                        pipeline.put({"op": "cmake", "test": instance})
1327
1328
1329    def pipeline_mgr(self, pipeline, done_queue, lock, results):
1330        try:
1331            if sys.platform == 'linux':
1332                with self.jobserver.get_job():
1333                    while True:
1334                        try:
1335                            task = pipeline.get_nowait()
1336                        except queue.Empty:
1337                            break
1338                        else:
1339                            instance = task['test']
1340                            pb = ProjectBuilder(instance, self.env, self.jobserver)
1341                            pb.duts = self.duts
1342                            pb.process(pipeline, done_queue, task, lock, results)
1343
1344                    return True
1345            else:
1346                while True:
1347                    try:
1348                        task = pipeline.get_nowait()
1349                    except queue.Empty:
1350                        break
1351                    else:
1352                        instance = task['test']
1353                        pb = ProjectBuilder(instance, self.env, self.jobserver)
1354                        pb.duts = self.duts
1355                        pb.process(pipeline, done_queue, task, lock, results)
1356                return True
1357        except Exception as e:
1358            logger.error(f"General exception: {e}")
1359            sys.exit(1)
1360
1361    def execute(self, pipeline, done):
1362        lock = Lock()
1363        logger.info("Adding tasks to the queue...")
1364        self.add_tasks_to_queue(pipeline, self.options.build_only, self.options.test_only,
1365                                retry_build_errors=self.options.retry_build_errors)
1366        logger.info("Added initial list of jobs to queue")
1367
1368        processes = []
1369
1370        for _ in range(self.jobs):
1371            p = Process(target=self.pipeline_mgr, args=(pipeline, done, lock, self.results, ))
1372            processes.append(p)
1373            p.start()
1374        logger.debug(f"Launched {self.jobs} jobs")
1375
1376        try:
1377            for p in processes:
1378                p.join()
1379                if p.exitcode != 0:
1380                    logger.error(f"Process {p.pid} failed, aborting execution")
1381                    for proc in processes:
1382                        proc.terminate()
1383                    sys.exit(1)
1384        except KeyboardInterrupt:
1385            logger.info("Execution interrupted")
1386            for p in processes:
1387                p.terminate()
1388
1389    @staticmethod
1390    def get_cmake_filter_stages(filt, logic_keys):
1391        """ Analyze filter expressions from test yaml and decide if dts and/or kconfig based filtering will be needed."""
1392        dts_required = False
1393        kconfig_required = False
1394        full_required = False
1395        filter_stages = []
1396
1397        # Compress args in expressions like "function('x', 'y')" so they are not split when splitting by whitespaces
1398        filt = filt.replace(", ", ",")
1399        # Remove logic words
1400        for k in logic_keys:
1401            filt = filt.replace(f"{k} ", "")
1402        # Remove brackets
1403        filt = filt.replace("(", "")
1404        filt = filt.replace(")", "")
1405        # Splite by whitespaces
1406        filt = filt.split()
1407        for expression in filt:
1408            if expression.startswith("dt_"):
1409                dts_required = True
1410            elif expression.startswith("CONFIG"):
1411                kconfig_required = True
1412            else:
1413                full_required = True
1414
1415        if full_required:
1416            return ["full"]
1417        if dts_required:
1418            filter_stages.append("dts")
1419        if kconfig_required:
1420            filter_stages.append("kconfig")
1421
1422        return filter_stages
1423