1# vim: set syntax=python ts=4 :
2#
3# Copyright (c) 20180-2022 Intel Corporation
4# Copyright 2022 NXP
5# SPDX-License-Identifier: Apache-2.0
6
7import logging
8import multiprocessing
9import os
10import pickle
11import queue
12import re
13import shutil
14import subprocess
15import sys
16import time
17import traceback
18import yaml
19from multiprocessing import Lock, Process, Value
20from multiprocessing.managers import BaseManager
21from typing import List
22from packaging import version
23
24from colorama import Fore
25from domains import Domains
26from twisterlib.cmakecache import CMakeCache
27from twisterlib.environment import canonical_zephyr_base
28from twisterlib.error import BuildError
29
30import elftools
31from elftools.elf.elffile import ELFFile
32from elftools.elf.sections import SymbolTableSection
33
34if version.parse(elftools.__version__) < version.parse('0.24'):
35    sys.exit("pyelftools is out of date, need version 0.24 or later")
36
37# Job server only works on Linux for now.
38if sys.platform == 'linux':
39    from twisterlib.jobserver import GNUMakeJobClient, GNUMakeJobServer, JobClient
40
41from twisterlib.log_helper import log_command
42from twisterlib.testinstance import TestInstance
43from twisterlib.testplan import change_skip_to_error_if_integration
44from twisterlib.harness import HarnessImporter, Pytest
45
46logger = logging.getLogger('twister')
47logger.setLevel(logging.DEBUG)
48import expr_parser
49
50
51class ExecutionCounter(object):
52    def __init__(self, total=0):
53        '''
54        Most of the stats are at test instance level
55        Except that "_cases" and "_skipped_cases" are for cases of ALL test instances
56
57        total complete = done + skipped_filter
58        total = yaml test scenarios * applicable platforms
59        complete perctenage = (done + skipped_filter) / total
60        pass rate = passed / (total - skipped_configs)
61        '''
62        # instances that go through the pipeline
63        # updated by report_out()
64        self._done = Value('i', 0)
65
66        # iteration
67        self._iteration = Value('i', 0)
68
69        # instances that actually executed and passed
70        # updated by report_out()
71        self._passed = Value('i', 0)
72
73        # static filter + runtime filter + build skipped
74        # updated by update_counting_before_pipeline() and report_out()
75        self._skipped_configs = Value('i', 0)
76
77        # cmake filter + build skipped
78        # updated by report_out()
79        self._skipped_runtime = Value('i', 0)
80
81        # staic filtered at yaml parsing time
82        # updated by update_counting_before_pipeline()
83        self._skipped_filter = Value('i', 0)
84
85        # updated by update_counting_before_pipeline() and report_out()
86        self._skipped_cases = Value('i', 0)
87
88        # updated by report_out() in pipeline
89        self._error = Value('i', 0)
90        self._failed = Value('i', 0)
91
92        # initialized to number of test instances
93        self._total = Value('i', total)
94
95        # updated in report_out
96        self._cases = Value('i', 0)
97        self.lock = Lock()
98
99    def summary(self):
100        print("--------------------------------")
101        print(f"Total test suites: {self.total}") # actually test instances
102        print(f"Total test cases: {self.cases}")
103        print(f"Executed test cases: {self.cases - self.skipped_cases}")
104        print(f"Skipped test cases: {self.skipped_cases}")
105        print(f"Completed test suites: {self.done}")
106        print(f"Passing test suites: {self.passed}")
107        print(f"Failing test suites: {self.failed}")
108        print(f"Skipped test suites: {self.skipped_configs}")
109        print(f"Skipped test suites (runtime): {self.skipped_runtime}")
110        print(f"Skipped test suites (filter): {self.skipped_filter}")
111        print(f"Errors: {self.error}")
112        print("--------------------------------")
113
114    @property
115    def cases(self):
116        with self._cases.get_lock():
117            return self._cases.value
118
119    @cases.setter
120    def cases(self, value):
121        with self._cases.get_lock():
122            self._cases.value = value
123
124    @property
125    def skipped_cases(self):
126        with self._skipped_cases.get_lock():
127            return self._skipped_cases.value
128
129    @skipped_cases.setter
130    def skipped_cases(self, value):
131        with self._skipped_cases.get_lock():
132            self._skipped_cases.value = value
133
134    @property
135    def error(self):
136        with self._error.get_lock():
137            return self._error.value
138
139    @error.setter
140    def error(self, value):
141        with self._error.get_lock():
142            self._error.value = value
143
144    @property
145    def iteration(self):
146        with self._iteration.get_lock():
147            return self._iteration.value
148
149    @iteration.setter
150    def iteration(self, value):
151        with self._iteration.get_lock():
152            self._iteration.value = value
153
154    @property
155    def done(self):
156        with self._done.get_lock():
157            return self._done.value
158
159    @done.setter
160    def done(self, value):
161        with self._done.get_lock():
162            self._done.value = value
163
164    @property
165    def passed(self):
166        with self._passed.get_lock():
167            return self._passed.value
168
169    @passed.setter
170    def passed(self, value):
171        with self._passed.get_lock():
172            self._passed.value = value
173
174    @property
175    def skipped_configs(self):
176        with self._skipped_configs.get_lock():
177            return self._skipped_configs.value
178
179    @skipped_configs.setter
180    def skipped_configs(self, value):
181        with self._skipped_configs.get_lock():
182            self._skipped_configs.value = value
183
184    @property
185    def skipped_filter(self):
186        with self._skipped_filter.get_lock():
187            return self._skipped_filter.value
188
189    @skipped_filter.setter
190    def skipped_filter(self, value):
191        with self._skipped_filter.get_lock():
192            self._skipped_filter.value = value
193
194    @property
195    def skipped_runtime(self):
196        with self._skipped_runtime.get_lock():
197            return self._skipped_runtime.value
198
199    @skipped_runtime.setter
200    def skipped_runtime(self, value):
201        with self._skipped_runtime.get_lock():
202            self._skipped_runtime.value = value
203
204    @property
205    def failed(self):
206        with self._failed.get_lock():
207            return self._failed.value
208
209    @failed.setter
210    def failed(self, value):
211        with self._failed.get_lock():
212            self._failed.value = value
213
214    @property
215    def total(self):
216        with self._total.get_lock():
217            return self._total.value
218
219class CMake:
220    config_re = re.compile('(CONFIG_[A-Za-z0-9_]+)[=]\"?([^\"]*)\"?$')
221    dt_re = re.compile('([A-Za-z0-9_]+)[=]\"?([^\"]*)\"?$')
222
223    def __init__(self, testsuite, platform, source_dir, build_dir, jobserver):
224
225        self.cwd = None
226        self.capture_output = True
227
228        self.defconfig = {}
229        self.cmake_cache = {}
230
231        self.instance = None
232        self.testsuite = testsuite
233        self.platform = platform
234        self.source_dir = source_dir
235        self.build_dir = build_dir
236        self.log = "build.log"
237
238        self.default_encoding = sys.getdefaultencoding()
239        self.jobserver = jobserver
240
241    def parse_generated(self, filter_stages=[]):
242        self.defconfig = {}
243        return {}
244
245    def run_build(self, args=[]):
246
247        logger.debug("Building %s for %s" % (self.source_dir, self.platform.name))
248
249        cmake_args = []
250        cmake_args.extend(args)
251        cmake = shutil.which('cmake')
252        cmd = [cmake] + cmake_args
253        kwargs = dict()
254
255        if self.capture_output:
256            kwargs['stdout'] = subprocess.PIPE
257            # CMake sends the output of message() to stderr unless it's STATUS
258            kwargs['stderr'] = subprocess.STDOUT
259
260        if self.cwd:
261            kwargs['cwd'] = self.cwd
262
263        if sys.platform == 'linux':
264            p = self.jobserver.popen(cmd, **kwargs)
265        else:
266            p = subprocess.Popen(cmd, **kwargs)
267
268        out, _ = p.communicate()
269
270        results = {}
271        if p.returncode == 0:
272            msg = "Finished building %s for %s" % (self.source_dir, self.platform.name)
273
274            self.instance.status = "passed"
275            if not self.instance.run:
276                self.instance.add_missing_case_status("skipped", "Test was built only")
277            results = {'msg': msg, "returncode": p.returncode, "instance": self.instance}
278
279            if out:
280                log_msg = out.decode(self.default_encoding)
281                with open(os.path.join(self.build_dir, self.log), "a", encoding=self.default_encoding) as log:
282                    log.write(log_msg)
283
284            else:
285                return None
286        else:
287            # A real error occurred, raise an exception
288            log_msg = ""
289            if out:
290                log_msg = out.decode(self.default_encoding)
291                with open(os.path.join(self.build_dir, self.log), "a", encoding=self.default_encoding) as log:
292                    log.write(log_msg)
293
294            if log_msg:
295                overflow_found = re.findall("region `(FLASH|ROM|RAM|ICCM|DCCM|SRAM|dram0_1_seg)' overflowed by", log_msg)
296                imgtool_overflow_found = re.findall(r"Error: Image size \(.*\) \+ trailer \(.*\) exceeds requested size", log_msg)
297                if overflow_found and not self.options.overflow_as_errors:
298                    logger.debug("Test skipped due to {} Overflow".format(overflow_found[0]))
299                    self.instance.status = "skipped"
300                    self.instance.reason = "{} overflow".format(overflow_found[0])
301                    change_skip_to_error_if_integration(self.options, self.instance)
302                elif imgtool_overflow_found and not self.options.overflow_as_errors:
303                    self.instance.status = "skipped"
304                    self.instance.reason = "imgtool overflow"
305                    change_skip_to_error_if_integration(self.options, self.instance)
306                else:
307                    self.instance.status = "error"
308                    self.instance.reason = "Build failure"
309
310            results = {
311                "returncode": p.returncode,
312                "instance": self.instance,
313            }
314
315        return results
316
317    def run_cmake(self, args="", filter_stages=[]):
318
319        if not self.options.disable_warnings_as_errors:
320            warnings_as_errors = 'y'
321            gen_defines_args = "--edtlib-Werror"
322        else:
323            warnings_as_errors = 'n'
324            gen_defines_args = ""
325
326        logger.debug("Running cmake on %s for %s" % (self.source_dir, self.platform.name))
327        cmake_args = [
328            f'-B{self.build_dir}',
329            f'-DTC_RUNID={self.instance.run_id}',
330            f'-DCONFIG_COMPILER_WARNINGS_AS_ERRORS={warnings_as_errors}',
331            f'-DEXTRA_GEN_DEFINES_ARGS={gen_defines_args}',
332            f'-G{self.env.generator}'
333        ]
334
335        # If needed, run CMake using the package_helper script first, to only run
336        # a subset of all cmake modules. This output will be used to filter
337        # testcases, and the full CMake configuration will be run for
338        # testcases that should be built
339        if filter_stages:
340            cmake_filter_args = [
341                f'-DMODULES={",".join(filter_stages)}',
342                f'-P{canonical_zephyr_base}/cmake/package_helper.cmake',
343            ]
344
345        if self.testsuite.sysbuild and not filter_stages:
346            logger.debug("Building %s using sysbuild" % (self.source_dir))
347            source_args = [
348                f'-S{canonical_zephyr_base}/share/sysbuild',
349                f'-DAPP_DIR={self.source_dir}'
350            ]
351        else:
352            source_args = [
353                f'-S{self.source_dir}'
354            ]
355        cmake_args.extend(source_args)
356
357        cmake_args.extend(args)
358
359        cmake_opts = ['-DBOARD={}'.format(self.platform.name)]
360        cmake_args.extend(cmake_opts)
361
362        if self.instance.testsuite.required_snippets:
363            cmake_opts = ['-DSNIPPET={}'.format(';'.join(self.instance.testsuite.required_snippets))]
364            cmake_args.extend(cmake_opts)
365
366        cmake = shutil.which('cmake')
367        cmd = [cmake] + cmake_args
368
369        if filter_stages:
370            cmd += cmake_filter_args
371
372        kwargs = dict()
373
374        log_command(logger, "Calling cmake", cmd)
375
376        if self.capture_output:
377            kwargs['stdout'] = subprocess.PIPE
378            # CMake sends the output of message() to stderr unless it's STATUS
379            kwargs['stderr'] = subprocess.STDOUT
380
381        if self.cwd:
382            kwargs['cwd'] = self.cwd
383
384        if sys.platform == 'linux':
385            p = self.jobserver.popen(cmd, **kwargs)
386        else:
387            p = subprocess.Popen(cmd, **kwargs)
388        out, _ = p.communicate()
389
390        if p.returncode == 0:
391            filter_results = self.parse_generated(filter_stages)
392            msg = "Finished building %s for %s" % (self.source_dir, self.platform.name)
393            logger.debug(msg)
394            results = {'msg': msg, 'filter': filter_results}
395
396        else:
397            self.instance.status = "error"
398            self.instance.reason = "Cmake build failure"
399
400            for tc in self.instance.testcases:
401                tc.status = self.instance.status
402
403            logger.error("Cmake build failure: %s for %s" % (self.source_dir, self.platform.name))
404            results = {"returncode": p.returncode}
405
406        if out:
407            os.makedirs(self.build_dir, exist_ok=True)
408            with open(os.path.join(self.build_dir, self.log), "a", encoding=self.default_encoding) as log:
409                log_msg = out.decode(self.default_encoding)
410                log.write(log_msg)
411
412        return results
413
414
415class FilterBuilder(CMake):
416
417    def __init__(self, testsuite, platform, source_dir, build_dir, jobserver):
418        super().__init__(testsuite, platform, source_dir, build_dir, jobserver)
419
420        self.log = "config-twister.log"
421
422    def parse_generated(self, filter_stages=[]):
423
424        if self.platform.name == "unit_testing":
425            return {}
426
427        if self.testsuite.sysbuild and not filter_stages:
428            # Load domain yaml to get default domain build directory
429            domain_path = os.path.join(self.build_dir, "domains.yaml")
430            domains = Domains.from_file(domain_path)
431            logger.debug("Loaded sysbuild domain data from %s" % (domain_path))
432            self.instance.domains = domains
433            domain_build = domains.get_default_domain().build_dir
434            cmake_cache_path = os.path.join(domain_build, "CMakeCache.txt")
435            defconfig_path = os.path.join(domain_build, "zephyr", ".config")
436            edt_pickle = os.path.join(domain_build, "zephyr", "edt.pickle")
437        else:
438            cmake_cache_path = os.path.join(self.build_dir, "CMakeCache.txt")
439            # .config is only available after kconfig stage in cmake. If only dt based filtration is required
440            # package helper call won't produce .config
441            if not filter_stages or "kconfig" in filter_stages:
442                defconfig_path = os.path.join(self.build_dir, "zephyr", ".config")
443            # dt is compiled before kconfig, so edt_pickle is available regardless of choice of filter stages
444            edt_pickle = os.path.join(self.build_dir, "zephyr", "edt.pickle")
445
446
447        if not filter_stages or "kconfig" in filter_stages:
448            with open(defconfig_path, "r") as fp:
449                defconfig = {}
450                for line in fp.readlines():
451                    m = self.config_re.match(line)
452                    if not m:
453                        if line.strip() and not line.startswith("#"):
454                            sys.stderr.write("Unrecognized line %s\n" % line)
455                        continue
456                    defconfig[m.group(1)] = m.group(2).strip()
457
458            self.defconfig = defconfig
459
460        cmake_conf = {}
461        try:
462            cache = CMakeCache.from_file(cmake_cache_path)
463        except FileNotFoundError:
464            cache = {}
465
466        for k in iter(cache):
467            cmake_conf[k.name] = k.value
468
469        self.cmake_cache = cmake_conf
470
471        filter_data = {
472            "ARCH": self.platform.arch,
473            "PLATFORM": self.platform.name
474        }
475        filter_data.update(os.environ)
476        if not filter_stages or "kconfig" in filter_stages:
477            filter_data.update(self.defconfig)
478        filter_data.update(self.cmake_cache)
479
480        if self.testsuite.sysbuild and self.env.options.device_testing:
481            # Verify that twister's arguments support sysbuild.
482            # Twister sysbuild flashing currently only works with west, so
483            # --west-flash must be passed. Additionally, erasing the DUT
484            # before each test with --west-flash=--erase will inherently not
485            # work with sysbuild.
486            if self.env.options.west_flash is None:
487                logger.warning("Sysbuild test will be skipped. " +
488                    "West must be used for flashing.")
489                return {os.path.join(self.platform.name, self.testsuite.name): True}
490            elif "--erase" in self.env.options.west_flash:
491                logger.warning("Sysbuild test will be skipped, " +
492                    "--erase is not supported with --west-flash")
493                return {os.path.join(self.platform.name, self.testsuite.name): True}
494
495        if self.testsuite and self.testsuite.filter:
496            try:
497                if os.path.exists(edt_pickle):
498                    with open(edt_pickle, 'rb') as f:
499                        edt = pickle.load(f)
500                else:
501                    edt = None
502                res = expr_parser.parse(self.testsuite.filter, filter_data, edt)
503
504            except (ValueError, SyntaxError) as se:
505                sys.stderr.write(
506                    "Failed processing %s\n" % self.testsuite.yamlfile)
507                raise se
508
509            if not res:
510                return {os.path.join(self.platform.name, self.testsuite.name): True}
511            else:
512                return {os.path.join(self.platform.name, self.testsuite.name): False}
513        else:
514            self.platform.filter_data = filter_data
515            return filter_data
516
517
518class ProjectBuilder(FilterBuilder):
519
520    def __init__(self, instance, env, jobserver, **kwargs):
521        super().__init__(instance.testsuite, instance.platform, instance.testsuite.source_dir, instance.build_dir, jobserver)
522
523        self.log = "build.log"
524        self.instance = instance
525        self.filtered_tests = 0
526        self.options = env.options
527        self.env = env
528        self.duts = None
529
530    @staticmethod
531    def log_info(filename, inline_logs):
532        filename = os.path.abspath(os.path.realpath(filename))
533        if inline_logs:
534            logger.info("{:-^100}".format(filename))
535
536            try:
537                with open(filename) as fp:
538                    data = fp.read()
539            except Exception as e:
540                data = "Unable to read log data (%s)\n" % (str(e))
541
542            logger.error(data)
543
544            logger.info("{:-^100}".format(filename))
545        else:
546            logger.error("see: " + Fore.YELLOW + filename + Fore.RESET)
547
548    def log_info_file(self, inline_logs):
549        build_dir = self.instance.build_dir
550        h_log = "{}/handler.log".format(build_dir)
551        b_log = "{}/build.log".format(build_dir)
552        v_log = "{}/valgrind.log".format(build_dir)
553        d_log = "{}/device.log".format(build_dir)
554
555        if os.path.exists(v_log) and "Valgrind" in self.instance.reason:
556            self.log_info("{}".format(v_log), inline_logs)
557        elif os.path.exists(h_log) and os.path.getsize(h_log) > 0:
558            self.log_info("{}".format(h_log), inline_logs)
559        elif os.path.exists(d_log) and os.path.getsize(d_log) > 0:
560            self.log_info("{}".format(d_log), inline_logs)
561        else:
562            self.log_info("{}".format(b_log), inline_logs)
563
564
565    def process(self, pipeline, done, message, lock, results):
566        op = message.get('op')
567
568        self.instance.setup_handler(self.env)
569
570        if op == "filter":
571            res = self.cmake(filter_stages=self.instance.filter_stages)
572            if self.instance.status in ["failed", "error"]:
573                pipeline.put({"op": "report", "test": self.instance})
574            else:
575                # Here we check the dt/kconfig filter results coming from running cmake
576                if self.instance.name in res['filter'] and res['filter'][self.instance.name]:
577                    logger.debug("filtering %s" % self.instance.name)
578                    self.instance.status = "filtered"
579                    self.instance.reason = "runtime filter"
580                    results.skipped_runtime += 1
581                    self.instance.add_missing_case_status("skipped")
582                    pipeline.put({"op": "report", "test": self.instance})
583                else:
584                    pipeline.put({"op": "cmake", "test": self.instance})
585
586        # The build process, call cmake and build with configured generator
587        if op == "cmake":
588            res = self.cmake()
589            if self.instance.status in ["failed", "error"]:
590                pipeline.put({"op": "report", "test": self.instance})
591            elif self.options.cmake_only:
592                if self.instance.status is None:
593                    self.instance.status = "passed"
594                pipeline.put({"op": "report", "test": self.instance})
595            else:
596                # Here we check the runtime filter results coming from running cmake
597                if self.instance.name in res['filter'] and res['filter'][self.instance.name]:
598                    logger.debug("filtering %s" % self.instance.name)
599                    self.instance.status = "filtered"
600                    self.instance.reason = "runtime filter"
601                    results.skipped_runtime += 1
602                    self.instance.add_missing_case_status("skipped")
603                    pipeline.put({"op": "report", "test": self.instance})
604                else:
605                    pipeline.put({"op": "build", "test": self.instance})
606
607        elif op == "build":
608            logger.debug("build test: %s" % self.instance.name)
609            res = self.build()
610            if not res:
611                self.instance.status = "error"
612                self.instance.reason = "Build Failure"
613                pipeline.put({"op": "report", "test": self.instance})
614            else:
615                # Count skipped cases during build, for example
616                # due to ram/rom overflow.
617                if  self.instance.status == "skipped":
618                    results.skipped_runtime += 1
619                    self.instance.add_missing_case_status("skipped", self.instance.reason)
620
621                if res.get('returncode', 1) > 0:
622                    self.instance.add_missing_case_status("blocked", self.instance.reason)
623                    pipeline.put({"op": "report", "test": self.instance})
624                else:
625                    logger.debug(f"Determine test cases for test instance: {self.instance.name}")
626                    try:
627                        self.determine_testcases(results)
628                        pipeline.put({"op": "gather_metrics", "test": self.instance})
629                    except BuildError as e:
630                        logger.error(str(e))
631                        self.instance.status = "error"
632                        self.instance.reason = str(e)
633                        pipeline.put({"op": "report", "test": self.instance})
634
635        elif op == "gather_metrics":
636            self.gather_metrics(self.instance)
637            if self.instance.run and self.instance.handler.ready:
638                pipeline.put({"op": "run", "test": self.instance})
639            else:
640                pipeline.put({"op": "report", "test": self.instance})
641
642        # Run the generated binary using one of the supported handlers
643        elif op == "run":
644            logger.debug("run test: %s" % self.instance.name)
645            self.run()
646            logger.debug(f"run status: {self.instance.name} {self.instance.status}")
647            try:
648                # to make it work with pickle
649                self.instance.handler.thread = None
650                self.instance.handler.duts = None
651                pipeline.put({
652                    "op": "report",
653                    "test": self.instance,
654                    "status": self.instance.status,
655                    "reason": self.instance.reason
656                    }
657                )
658            except RuntimeError as e:
659                logger.error(f"RuntimeError: {e}")
660                traceback.print_exc()
661
662        # Report results and output progress to screen
663        elif op == "report":
664            with lock:
665                done.put(self.instance)
666                self.report_out(results)
667
668            if not self.options.coverage:
669                if self.options.prep_artifacts_for_testing:
670                    pipeline.put({"op": "cleanup", "mode": "device", "test": self.instance})
671                elif self.options.runtime_artifact_cleanup == "pass" and self.instance.status == "passed":
672                    pipeline.put({"op": "cleanup", "mode": "passed", "test": self.instance})
673                elif self.options.runtime_artifact_cleanup == "all":
674                    pipeline.put({"op": "cleanup", "mode": "all", "test": self.instance})
675
676        elif op == "cleanup":
677            mode = message.get("mode")
678            if mode == "device":
679                self.cleanup_device_testing_artifacts()
680            elif mode == "passed" or (mode == "all" and self.instance.reason != "Cmake build failure"):
681                self.cleanup_artifacts()
682
683    def determine_testcases(self, results):
684        yaml_testsuite_name = self.instance.testsuite.id
685        logger.debug(f"Determine test cases for test suite: {yaml_testsuite_name}")
686
687        elf_file = self.instance.get_elf_file()
688        elf = ELFFile(open(elf_file, "rb"))
689
690        logger.debug(f"Test instance {self.instance.name} already has {len(self.instance.testcases)} cases.")
691        new_ztest_unit_test_regex = re.compile(r"z_ztest_unit_test__([^\s]+?)__([^\s]*)")
692        detected_cases = []
693        for section in elf.iter_sections():
694            if isinstance(section, SymbolTableSection):
695                for sym in section.iter_symbols():
696                    # It is only meant for new ztest fx because only new ztest fx exposes test functions
697                    # precisely.
698
699                    # The 1st capture group is new ztest suite name.
700                    # The 2nd capture group is new ztest unit test name.
701                    matches = new_ztest_unit_test_regex.findall(sym.name)
702                    if matches:
703                        for m in matches:
704                            # new_ztest_suite = m[0] # not used for now
705                            test_func_name = m[1].replace("test_", "")
706                            testcase_id = f"{yaml_testsuite_name}.{test_func_name}"
707                            detected_cases.append(testcase_id)
708
709        if detected_cases:
710            logger.debug(f"{', '.join(detected_cases)} in {elf_file}")
711            self.instance.testcases.clear()
712            self.instance.testsuite.testcases.clear()
713
714            # When the old regex-based test case collection is fully deprecated,
715            # this will be the sole place where test cases get added to the test instance.
716            # Then we can further include the new_ztest_suite info in the testcase_id.
717
718            for testcase_id in detected_cases:
719                self.instance.add_testcase(name=testcase_id)
720                self.instance.testsuite.add_testcase(name=testcase_id)
721
722
723    def cleanup_artifacts(self, additional_keep=[]):
724        logger.debug("Cleaning up {}".format(self.instance.build_dir))
725        allow = [
726            os.path.join('zephyr', '.config'),
727            'handler.log',
728            'build.log',
729            'device.log',
730            'recording.csv',
731            # below ones are needed to make --test-only work as well
732            'Makefile',
733            'CMakeCache.txt',
734            'build.ninja',
735            os.path.join('CMakeFiles', 'rules.ninja')
736            ]
737
738        allow += additional_keep
739
740        if self.options.runtime_artifact_cleanup == 'all':
741            allow += [os.path.join('twister', 'testsuite_extra.conf')]
742
743        allow = [os.path.join(self.instance.build_dir, file) for file in allow]
744
745        for dirpath, dirnames, filenames in os.walk(self.instance.build_dir, topdown=False):
746            for name in filenames:
747                path = os.path.join(dirpath, name)
748                if path not in allow:
749                    os.remove(path)
750            # Remove empty directories and symbolic links to directories
751            for dir in dirnames:
752                path = os.path.join(dirpath, dir)
753                if os.path.islink(path):
754                    os.remove(path)
755                elif not os.listdir(path):
756                    os.rmdir(path)
757
758    def cleanup_device_testing_artifacts(self):
759        logger.debug("Cleaning up for Device Testing {}".format(self.instance.build_dir))
760
761        files_to_keep = self._get_binaries()
762        files_to_keep.append(os.path.join('zephyr', 'runners.yaml'))
763
764        self.cleanup_artifacts(files_to_keep)
765
766        self._sanitize_files()
767
768    def _get_binaries(self) -> List[str]:
769        """
770        Get list of binaries paths (absolute or relative to the
771        self.instance.build_dir), basing on information from platform.binaries
772        or runners.yaml. If they are not found take default binaries like
773        "zephyr/zephyr.hex" etc.
774        """
775        binaries: List[str] = []
776
777        platform = self.instance.platform
778        if platform.binaries:
779            for binary in platform.binaries:
780                binaries.append(os.path.join('zephyr', binary))
781
782        binaries += self._get_binaries_from_runners()
783
784        # if binaries was not found in platform.binaries and runners.yaml take default ones
785        if len(binaries) == 0:
786            binaries = [
787                os.path.join('zephyr', 'zephyr.hex'),
788                os.path.join('zephyr', 'zephyr.bin'),
789                os.path.join('zephyr', 'zephyr.elf'),
790                os.path.join('zephyr', 'zephyr.exe'),
791            ]
792        return binaries
793
794    def _get_binaries_from_runners(self) -> List[str]:
795        """
796        Get list of binaries paths (absolute or relative to the
797        self.instance.build_dir) from runners.yaml file.
798        """
799        runners_file_path: str = os.path.join(self.instance.build_dir, 'zephyr', 'runners.yaml')
800        if not os.path.exists(runners_file_path):
801            return []
802
803        with open(runners_file_path, 'r') as file:
804            runners_content: dict = yaml.safe_load(file)
805
806        if 'config' not in runners_content:
807            return []
808
809        runners_config: dict = runners_content['config']
810        binary_keys: List[str] = ['elf_file', 'hex_file', 'bin_file']
811
812        binaries: List[str] = []
813        for binary_key in binary_keys:
814            binary_path = runners_config.get(binary_key)
815            if binary_path is None:
816                continue
817            if os.path.isabs(binary_path):
818                binaries.append(binary_path)
819            else:
820                binaries.append(os.path.join('zephyr', binary_path))
821
822        return binaries
823
824    def _sanitize_files(self):
825        """
826        Sanitize files to make it possible to flash those file on different
827        computer/system.
828        """
829        self._sanitize_runners_file()
830        self._sanitize_zephyr_base_from_files()
831
832    def _sanitize_runners_file(self):
833        """
834        Replace absolute paths of binary files for relative ones. The base
835        directory for those files is f"{self.instance.build_dir}/zephyr"
836        """
837        runners_dir_path: str = os.path.join(self.instance.build_dir, 'zephyr')
838        runners_file_path: str = os.path.join(runners_dir_path, 'runners.yaml')
839        if not os.path.exists(runners_file_path):
840            return
841
842        with open(runners_file_path, 'rt') as file:
843            runners_content_text = file.read()
844            runners_content_yaml: dict = yaml.safe_load(runners_content_text)
845
846        if 'config' not in runners_content_yaml:
847            return
848
849        runners_config: dict = runners_content_yaml['config']
850        binary_keys: List[str] = ['elf_file', 'hex_file', 'bin_file']
851
852        for binary_key in binary_keys:
853            binary_path = runners_config.get(binary_key)
854            # sanitize only paths which exist and are absolute
855            if binary_path is None or not os.path.isabs(binary_path):
856                continue
857            binary_path_relative = os.path.relpath(binary_path, start=runners_dir_path)
858            runners_content_text = runners_content_text.replace(binary_path, binary_path_relative)
859
860        with open(runners_file_path, 'wt') as file:
861            file.write(runners_content_text)
862
863    def _sanitize_zephyr_base_from_files(self):
864        """
865        Remove Zephyr base paths from selected files.
866        """
867        files_to_sanitize = [
868            'CMakeCache.txt',
869            os.path.join('zephyr', 'runners.yaml'),
870        ]
871        for file_path in files_to_sanitize:
872            file_path = os.path.join(self.instance.build_dir, file_path)
873            if not os.path.exists(file_path):
874                continue
875
876            with open(file_path, "rt") as file:
877                data = file.read()
878
879            # add trailing slash at the end of canonical_zephyr_base if it does not exist:
880            path_to_remove = os.path.join(canonical_zephyr_base, "")
881            data = data.replace(path_to_remove, "")
882
883            with open(file_path, "wt") as file:
884                file.write(data)
885
886    def report_out(self, results):
887        total_to_do = results.total
888        total_tests_width = len(str(total_to_do))
889        results.done += 1
890        instance = self.instance
891        if results.iteration == 1:
892            results.cases += len(instance.testcases)
893
894        if instance.status in ["error", "failed"]:
895            if instance.status == "error":
896                results.error += 1
897                txt = " ERROR "
898            else:
899                results.failed += 1
900                txt = " FAILED "
901            if self.options.verbose:
902                status = Fore.RED + txt + Fore.RESET + instance.reason
903            else:
904                logger.error(
905                    "{:<25} {:<50} {}{}{}: {}".format(
906                        instance.platform.name,
907                        instance.testsuite.name,
908                        Fore.RED,
909                        txt,
910                        Fore.RESET,
911                        instance.reason))
912            if not self.options.verbose:
913                self.log_info_file(self.options.inline_logs)
914        elif instance.status in ["skipped", "filtered"]:
915            status = Fore.YELLOW + "SKIPPED" + Fore.RESET
916            results.skipped_configs += 1
917            # test cases skipped at the test instance level
918            results.skipped_cases += len(instance.testsuite.testcases)
919        elif instance.status == "passed":
920            status = Fore.GREEN + "PASSED" + Fore.RESET
921            results.passed += 1
922            for case in instance.testcases:
923                # test cases skipped at the test case level
924                if case.status == 'skipped':
925                    results.skipped_cases += 1
926        else:
927            logger.debug(f"Unknown status = {instance.status}")
928            status = Fore.YELLOW + "UNKNOWN" + Fore.RESET
929
930        if self.options.verbose:
931            if self.options.cmake_only:
932                more_info = "cmake"
933            elif instance.status in ["skipped", "filtered"]:
934                more_info = instance.reason
935            else:
936                if instance.handler.ready and instance.run:
937                    more_info = instance.handler.type_str
938                    htime = instance.execution_time
939                    if instance.dut:
940                        more_info += f": {instance.dut},"
941                    if htime:
942                        more_info += " {:.3f}s".format(htime)
943                else:
944                    more_info = "build"
945
946                if ( instance.status in ["error", "failed", "timeout", "flash_error"]
947                     and hasattr(self.instance.handler, 'seed')
948                     and self.instance.handler.seed is not None ):
949                    more_info += "/seed: " + str(self.options.seed)
950            logger.info("{:>{}}/{} {:<25} {:<50} {} ({})".format(
951                results.done, total_tests_width, total_to_do , instance.platform.name,
952                instance.testsuite.name, status, more_info))
953
954            if instance.status in ["error", "failed", "timeout"]:
955                self.log_info_file(self.options.inline_logs)
956        else:
957            completed_perc = 0
958            if total_to_do > 0:
959                completed_perc = int((float(results.done) / total_to_do) * 100)
960
961            sys.stdout.write("INFO    - Total complete: %s%4d/%4d%s  %2d%%  skipped: %s%4d%s, failed: %s%4d%s, error: %s%4d%s\r" % (
962                Fore.GREEN,
963                results.done,
964                total_to_do,
965                Fore.RESET,
966                completed_perc,
967                Fore.YELLOW if results.skipped_configs > 0 else Fore.RESET,
968                results.skipped_configs,
969                Fore.RESET,
970                Fore.RED if results.failed > 0 else Fore.RESET,
971                results.failed,
972                Fore.RESET,
973                Fore.RED if results.error > 0 else Fore.RESET,
974                results.error,
975                Fore.RESET
976                )
977                )
978        sys.stdout.flush()
979
980    @staticmethod
981    def cmake_assemble_args(extra_args, handler, extra_conf_files, extra_overlay_confs,
982                            extra_dtc_overlay_files, cmake_extra_args,
983                            build_dir):
984        # Retain quotes around config options
985        config_options = [arg for arg in extra_args if arg.startswith("CONFIG_")]
986        args = [arg for arg in extra_args if not arg.startswith("CONFIG_")]
987
988        args_expanded = ["-D{}".format(a.replace('"', '\"')) for a in config_options]
989
990        if handler.ready:
991            args.extend(handler.args)
992
993        if extra_conf_files:
994            args.append(f"CONF_FILE=\"{';'.join(extra_conf_files)}\"")
995
996        if extra_dtc_overlay_files:
997            args.append(f"DTC_OVERLAY_FILE=\"{';'.join(extra_dtc_overlay_files)}\"")
998
999        # merge overlay files into one variable
1000        overlays = extra_overlay_confs.copy()
1001
1002        additional_overlay_path = os.path.join(
1003            build_dir, "twister", "testsuite_extra.conf"
1004        )
1005        if os.path.exists(additional_overlay_path):
1006            overlays.append(additional_overlay_path)
1007
1008        if overlays:
1009            args.append("OVERLAY_CONFIG=\"%s\"" % (" ".join(overlays)))
1010
1011        # Build the final argument list
1012        args_expanded.extend(["-D{}".format(a.replace('"', '\"')) for a in cmake_extra_args])
1013        args_expanded.extend(["-D{}".format(a.replace('"', '')) for a in args])
1014
1015        return args_expanded
1016
1017    def cmake(self, filter_stages=[]):
1018        args = self.cmake_assemble_args(
1019            self.testsuite.extra_args.copy(), # extra_args from YAML
1020            self.instance.handler,
1021            self.testsuite.extra_conf_files,
1022            self.testsuite.extra_overlay_confs,
1023            self.testsuite.extra_dtc_overlay_files,
1024            self.options.extra_args, # CMake extra args
1025            self.instance.build_dir,
1026        )
1027
1028        res = self.run_cmake(args,filter_stages)
1029        return res
1030
1031    def build(self):
1032        res = self.run_build(['--build', self.build_dir])
1033        return res
1034
1035    def run(self):
1036
1037        instance = self.instance
1038
1039        if instance.handler.ready:
1040            logger.debug(f"Reset instance status from '{instance.status}' to None before run.")
1041            instance.status = None
1042
1043            if instance.handler.type_str == "device":
1044                instance.handler.duts = self.duts
1045
1046            if(self.options.seed is not None and instance.platform.name.startswith("native_posix")):
1047                self.parse_generated()
1048                if('CONFIG_FAKE_ENTROPY_NATIVE_POSIX' in self.defconfig and
1049                    self.defconfig['CONFIG_FAKE_ENTROPY_NATIVE_POSIX'] == 'y'):
1050                    instance.handler.seed = self.options.seed
1051
1052            if self.options.extra_test_args and instance.platform.arch == "posix":
1053                instance.handler.extra_test_args = self.options.extra_test_args
1054
1055            harness = HarnessImporter.get_harness(instance.testsuite.harness.capitalize())
1056            harness.configure(instance)
1057            if isinstance(harness, Pytest):
1058                harness.pytest_run(instance.handler.get_test_timeout())
1059            else:
1060                instance.handler.handle(harness)
1061
1062        sys.stdout.flush()
1063
1064    def gather_metrics(self, instance: TestInstance):
1065        if self.options.enable_size_report and not self.options.cmake_only:
1066            self.calc_size(instance=instance, from_buildlog=self.options.footprint_from_buildlog)
1067        else:
1068            instance.metrics["used_ram"] = 0
1069            instance.metrics["used_rom"] = 0
1070            instance.metrics["available_rom"] = 0
1071            instance.metrics["available_ram"] = 0
1072            instance.metrics["unrecognized"] = []
1073
1074    @staticmethod
1075    def calc_size(instance: TestInstance, from_buildlog: bool):
1076        if instance.status not in ["error", "failed", "skipped"]:
1077            if not instance.platform.type in ["native", "qemu", "unit"]:
1078                generate_warning = bool(instance.platform.type == "mcu")
1079                size_calc = instance.calculate_sizes(from_buildlog=from_buildlog, generate_warning=generate_warning)
1080                instance.metrics["used_ram"] = size_calc.get_used_ram()
1081                instance.metrics["used_rom"] = size_calc.get_used_rom()
1082                instance.metrics["available_rom"] = size_calc.get_available_rom()
1083                instance.metrics["available_ram"] = size_calc.get_available_ram()
1084                instance.metrics["unrecognized"] = size_calc.unrecognized_sections()
1085            else:
1086                instance.metrics["used_ram"] = 0
1087                instance.metrics["used_rom"] = 0
1088                instance.metrics["available_rom"] = 0
1089                instance.metrics["available_ram"] = 0
1090                instance.metrics["unrecognized"] = []
1091            instance.metrics["handler_time"] = instance.execution_time
1092
1093class TwisterRunner:
1094
1095    def __init__(self, instances, suites, env=None) -> None:
1096        self.pipeline = None
1097        self.options = env.options
1098        self.env = env
1099        self.instances = instances
1100        self.suites = suites
1101        self.duts = None
1102        self.jobs = 1
1103        self.results = None
1104        self.jobserver = None
1105
1106    def run(self):
1107
1108        retries = self.options.retry_failed + 1
1109
1110        BaseManager.register('LifoQueue', queue.LifoQueue)
1111        manager = BaseManager()
1112        manager.start()
1113
1114        self.results = ExecutionCounter(total=len(self.instances))
1115        self.iteration = 0
1116        pipeline = manager.LifoQueue()
1117        done_queue = manager.LifoQueue()
1118
1119        # Set number of jobs
1120        if self.options.jobs:
1121            self.jobs = self.options.jobs
1122        elif self.options.build_only:
1123            self.jobs = multiprocessing.cpu_count() * 2
1124        else:
1125            self.jobs = multiprocessing.cpu_count()
1126
1127        if sys.platform == "linux":
1128            if os.name == 'posix':
1129                self.jobserver = GNUMakeJobClient.from_environ(jobs=self.options.jobs)
1130                if not self.jobserver:
1131                    self.jobserver = GNUMakeJobServer(self.jobs)
1132                elif self.jobserver.jobs:
1133                    self.jobs = self.jobserver.jobs
1134            # TODO: Implement this on windows/mac also
1135            else:
1136                self.jobserver = JobClient()
1137
1138            logger.info("JOBS: %d", self.jobs)
1139
1140        self.update_counting_before_pipeline()
1141
1142        while True:
1143            self.results.iteration += 1
1144
1145            if self.results.iteration > 1:
1146                logger.info("%d Iteration:" % (self.results.iteration))
1147                time.sleep(self.options.retry_interval)  # waiting for the system to settle down
1148                self.results.done = self.results.total - self.results.failed - self.results.error
1149                self.results.failed = 0
1150                if self.options.retry_build_errors:
1151                    self.results.error = 0
1152            else:
1153                self.results.done = self.results.skipped_filter
1154
1155            self.execute(pipeline, done_queue)
1156
1157            while True:
1158                try:
1159                    inst = done_queue.get_nowait()
1160                except queue.Empty:
1161                    break
1162                else:
1163                    inst.metrics.update(self.instances[inst.name].metrics)
1164                    inst.metrics["handler_time"] = inst.execution_time
1165                    inst.metrics["unrecognized"] = []
1166                    self.instances[inst.name] = inst
1167
1168            print("")
1169
1170            retry_errors = False
1171            if self.results.error and self.options.retry_build_errors:
1172                retry_errors = True
1173
1174            retries = retries - 1
1175            if retries == 0 or ( self.results.failed == 0 and not retry_errors):
1176                break
1177
1178        self.show_brief()
1179
1180    def update_counting_before_pipeline(self):
1181        '''
1182        Updating counting before pipeline is necessary because statically filterd
1183        test instance never enter the pipeline. While some pipeline output needs
1184        the static filter stats. So need to prepare them before pipline starts.
1185        '''
1186        for instance in self.instances.values():
1187            if instance.status == 'filtered' and not instance.reason == 'runtime filter':
1188                self.results.skipped_filter += 1
1189                self.results.skipped_configs += 1
1190                self.results.skipped_cases += len(instance.testsuite.testcases)
1191                self.results.cases += len(instance.testsuite.testcases)
1192            elif instance.status == 'error':
1193                self.results.error += 1
1194
1195    def show_brief(self):
1196        logger.info("%d test scenarios (%d test instances) selected, "
1197                    "%d configurations skipped (%d by static filter, %d at runtime)." %
1198                    (len(self.suites), len(self.instances),
1199                    self.results.skipped_configs,
1200                    self.results.skipped_filter,
1201                    self.results.skipped_configs - self.results.skipped_filter))
1202
1203    def add_tasks_to_queue(self, pipeline, build_only=False, test_only=False, retry_build_errors=False):
1204        for instance in self.instances.values():
1205            if build_only:
1206                instance.run = False
1207
1208            no_retry_statuses = ['passed', 'skipped', 'filtered']
1209            if not retry_build_errors:
1210                no_retry_statuses.append("error")
1211
1212            if instance.status not in no_retry_statuses:
1213                logger.debug(f"adding {instance.name}")
1214                if instance.status:
1215                    instance.retries += 1
1216                instance.status = None
1217
1218                # Check if cmake package_helper script can be run in advance.
1219                instance.filter_stages = []
1220                if instance.testsuite.filter:
1221                    instance.filter_stages = self.get_cmake_filter_stages(instance.testsuite.filter, expr_parser.reserved.keys())
1222                if test_only and instance.run:
1223                    pipeline.put({"op": "run", "test": instance})
1224                elif instance.filter_stages and "full" not in instance.filter_stages:
1225                    pipeline.put({"op": "filter", "test": instance})
1226                else:
1227                    pipeline.put({"op": "cmake", "test": instance})
1228
1229
1230    def pipeline_mgr(self, pipeline, done_queue, lock, results):
1231        if sys.platform == 'linux':
1232            with self.jobserver.get_job():
1233                while True:
1234                    try:
1235                        task = pipeline.get_nowait()
1236                    except queue.Empty:
1237                        break
1238                    else:
1239                        instance = task['test']
1240                        pb = ProjectBuilder(instance, self.env, self.jobserver)
1241                        pb.duts = self.duts
1242                        pb.process(pipeline, done_queue, task, lock, results)
1243
1244                return True
1245        else:
1246            while True:
1247                try:
1248                    task = pipeline.get_nowait()
1249                except queue.Empty:
1250                    break
1251                else:
1252                    instance = task['test']
1253                    pb = ProjectBuilder(instance, self.env, self.jobserver)
1254                    pb.duts = self.duts
1255                    pb.process(pipeline, done_queue, task, lock, results)
1256            return True
1257
1258    def execute(self, pipeline, done):
1259        lock = Lock()
1260        logger.info("Adding tasks to the queue...")
1261        self.add_tasks_to_queue(pipeline, self.options.build_only, self.options.test_only,
1262                                retry_build_errors=self.options.retry_build_errors)
1263        logger.info("Added initial list of jobs to queue")
1264
1265        processes = []
1266
1267        for job in range(self.jobs):
1268            logger.debug(f"Launch process {job}")
1269            p = Process(target=self.pipeline_mgr, args=(pipeline, done, lock, self.results, ))
1270            processes.append(p)
1271            p.start()
1272
1273        try:
1274            for p in processes:
1275                p.join()
1276        except KeyboardInterrupt:
1277            logger.info("Execution interrupted")
1278            for p in processes:
1279                p.terminate()
1280
1281    @staticmethod
1282    def get_cmake_filter_stages(filt, logic_keys):
1283        """ Analyze filter expressions from test yaml and decide if dts and/or kconfig based filtering will be needed."""
1284        dts_required = False
1285        kconfig_required = False
1286        full_required = False
1287        filter_stages = []
1288
1289        # Compress args in expressions like "function('x', 'y')" so they are not split when splitting by whitespaces
1290        filt = filt.replace(", ", ",")
1291        # Remove logic words
1292        for k in logic_keys:
1293            filt = filt.replace(f"{k} ", "")
1294        # Remove brackets
1295        filt = filt.replace("(", "")
1296        filt = filt.replace(")", "")
1297        # Splite by whitespaces
1298        filt = filt.split()
1299        for expression in filt:
1300            if expression.startswith("dt_"):
1301                dts_required = True
1302            elif expression.startswith("CONFIG"):
1303                kconfig_required = True
1304            else:
1305                full_required = True
1306
1307        if full_required:
1308            return ["full"]
1309        if dts_required:
1310            filter_stages.append("dts")
1311        if kconfig_required:
1312            filter_stages.append("kconfig")
1313
1314        return filter_stages
1315