# vim: set syntax=python ts=4 : # # Copyright (c) 20180-2022 Intel Corporation # Copyright 2022 NXP # SPDX-License-Identifier: Apache-2.0 import logging import multiprocessing import os import pickle import queue import re import shutil import subprocess import sys import time import traceback import yaml from multiprocessing import Lock, Process, Value from multiprocessing.managers import BaseManager from typing import List from packaging import version from colorama import Fore from domains import Domains from twisterlib.cmakecache import CMakeCache from twisterlib.environment import canonical_zephyr_base from twisterlib.error import BuildError, ConfigurationError import elftools from elftools.elf.elffile import ELFFile from elftools.elf.sections import SymbolTableSection if version.parse(elftools.__version__) < version.parse('0.24'): sys.exit("pyelftools is out of date, need version 0.24 or later") # Job server only works on Linux for now. if sys.platform == 'linux': from twisterlib.jobserver import GNUMakeJobClient, GNUMakeJobServer, JobClient from twisterlib.log_helper import log_command from twisterlib.testinstance import TestInstance from twisterlib.environment import TwisterEnv from twisterlib.testsuite import TestSuite from twisterlib.platform import Platform from twisterlib.testplan import change_skip_to_error_if_integration from twisterlib.harness import HarnessImporter, Pytest logger = logging.getLogger('twister') logger.setLevel(logging.DEBUG) import expr_parser class ExecutionCounter(object): def __init__(self, total=0): ''' Most of the stats are at test instance level Except that "_cases" and "_skipped_cases" are for cases of ALL test instances total complete = done + skipped_filter total = yaml test scenarios * applicable platforms complete perctenage = (done + skipped_filter) / total pass rate = passed / (total - skipped_configs) ''' # instances that go through the pipeline # updated by report_out() self._done = Value('i', 0) # iteration self._iteration = Value('i', 0) # instances that actually executed and passed # updated by report_out() self._passed = Value('i', 0) # static filter + runtime filter + build skipped # updated by update_counting_before_pipeline() and report_out() self._skipped_configs = Value('i', 0) # cmake filter + build skipped # updated by report_out() self._skipped_runtime = Value('i', 0) # staic filtered at yaml parsing time # updated by update_counting_before_pipeline() self._skipped_filter = Value('i', 0) # updated by update_counting_before_pipeline() and report_out() self._skipped_cases = Value('i', 0) # updated by report_out() in pipeline self._error = Value('i', 0) self._failed = Value('i', 0) # initialized to number of test instances self._total = Value('i', total) # updated in report_out self._cases = Value('i', 0) self.lock = Lock() def summary(self): print("--------------------------------") print(f"Total test suites: {self.total}") # actually test instances print(f"Total test cases: {self.cases}") print(f"Executed test cases: {self.cases - self.skipped_cases}") print(f"Skipped test cases: {self.skipped_cases}") print(f"Completed test suites: {self.done}") print(f"Passing test suites: {self.passed}") print(f"Failing test suites: {self.failed}") print(f"Skipped test suites: {self.skipped_configs}") print(f"Skipped test suites (runtime): {self.skipped_runtime}") print(f"Skipped test suites (filter): {self.skipped_filter}") print(f"Errors: {self.error}") print("--------------------------------") @property def cases(self): with self._cases.get_lock(): return self._cases.value @cases.setter def cases(self, value): with self._cases.get_lock(): self._cases.value = value @property def skipped_cases(self): with self._skipped_cases.get_lock(): return self._skipped_cases.value @skipped_cases.setter def skipped_cases(self, value): with self._skipped_cases.get_lock(): self._skipped_cases.value = value @property def error(self): with self._error.get_lock(): return self._error.value @error.setter def error(self, value): with self._error.get_lock(): self._error.value = value @property def iteration(self): with self._iteration.get_lock(): return self._iteration.value @iteration.setter def iteration(self, value): with self._iteration.get_lock(): self._iteration.value = value @property def done(self): with self._done.get_lock(): return self._done.value @done.setter def done(self, value): with self._done.get_lock(): self._done.value = value @property def passed(self): with self._passed.get_lock(): return self._passed.value @passed.setter def passed(self, value): with self._passed.get_lock(): self._passed.value = value @property def skipped_configs(self): with self._skipped_configs.get_lock(): return self._skipped_configs.value @skipped_configs.setter def skipped_configs(self, value): with self._skipped_configs.get_lock(): self._skipped_configs.value = value @property def skipped_filter(self): with self._skipped_filter.get_lock(): return self._skipped_filter.value @skipped_filter.setter def skipped_filter(self, value): with self._skipped_filter.get_lock(): self._skipped_filter.value = value @property def skipped_runtime(self): with self._skipped_runtime.get_lock(): return self._skipped_runtime.value @skipped_runtime.setter def skipped_runtime(self, value): with self._skipped_runtime.get_lock(): self._skipped_runtime.value = value @property def failed(self): with self._failed.get_lock(): return self._failed.value @failed.setter def failed(self, value): with self._failed.get_lock(): self._failed.value = value @property def total(self): with self._total.get_lock(): return self._total.value class CMake: config_re = re.compile('(CONFIG_[A-Za-z0-9_]+)[=]\"?([^\"]*)\"?$') dt_re = re.compile('([A-Za-z0-9_]+)[=]\"?([^\"]*)\"?$') def __init__(self, testsuite: TestSuite, platform: Platform, source_dir, build_dir, jobserver): self.cwd = None self.capture_output = True self.defconfig = {} self.cmake_cache = {} self.instance = None self.testsuite = testsuite self.platform = platform self.source_dir = source_dir self.build_dir = build_dir self.log = "build.log" self.default_encoding = sys.getdefaultencoding() self.jobserver = jobserver def parse_generated(self, filter_stages=[]): self.defconfig = {} return {} def run_build(self, args=[]): logger.debug("Building %s for %s" % (self.source_dir, self.platform.name)) cmake_args = [] cmake_args.extend(args) cmake = shutil.which('cmake') cmd = [cmake] + cmake_args kwargs = dict() if self.capture_output: kwargs['stdout'] = subprocess.PIPE # CMake sends the output of message() to stderr unless it's STATUS kwargs['stderr'] = subprocess.STDOUT if self.cwd: kwargs['cwd'] = self.cwd start_time = time.time() if sys.platform == 'linux': p = self.jobserver.popen(cmd, **kwargs) else: p = subprocess.Popen(cmd, **kwargs) logger.debug(f'Running {"".join(cmd)}') out, _ = p.communicate() ret = {} duration = time.time() - start_time self.instance.build_time += duration if p.returncode == 0: msg = f"Finished building {self.source_dir} for {self.platform.name} in {duration:.2f} seconds" logger.debug(msg) self.instance.status = "passed" if not self.instance.run: self.instance.add_missing_case_status("skipped", "Test was built only") ret = {"returncode": p.returncode} if out: log_msg = out.decode(self.default_encoding) with open(os.path.join(self.build_dir, self.log), "a", encoding=self.default_encoding) as log: log.write(log_msg) else: return None else: # A real error occurred, raise an exception log_msg = "" if out: log_msg = out.decode(self.default_encoding) with open(os.path.join(self.build_dir, self.log), "a", encoding=self.default_encoding) as log: log.write(log_msg) if log_msg: overflow_found = re.findall("region `(FLASH|ROM|RAM|ICCM|DCCM|SRAM|dram0_1_seg)' overflowed by", log_msg) imgtool_overflow_found = re.findall(r"Error: Image size \(.*\) \+ trailer \(.*\) exceeds requested size", log_msg) if overflow_found and not self.options.overflow_as_errors: logger.debug("Test skipped due to {} Overflow".format(overflow_found[0])) self.instance.status = "skipped" self.instance.reason = "{} overflow".format(overflow_found[0]) change_skip_to_error_if_integration(self.options, self.instance) elif imgtool_overflow_found and not self.options.overflow_as_errors: self.instance.status = "skipped" self.instance.reason = "imgtool overflow" change_skip_to_error_if_integration(self.options, self.instance) else: self.instance.status = "error" self.instance.reason = "Build failure" ret = { "returncode": p.returncode } return ret def run_cmake(self, args="", filter_stages=[]): if not self.options.disable_warnings_as_errors: warnings_as_errors = 'y' gen_defines_args = "--edtlib-Werror" else: warnings_as_errors = 'n' gen_defines_args = "" logger.debug("Running cmake on %s for %s" % (self.source_dir, self.platform.name)) cmake_args = [ f'-B{self.build_dir}', f'-DTC_RUNID={self.instance.run_id}', f'-DCONFIG_COMPILER_WARNINGS_AS_ERRORS={warnings_as_errors}', f'-DEXTRA_GEN_DEFINES_ARGS={gen_defines_args}', f'-G{self.env.generator}' ] # If needed, run CMake using the package_helper script first, to only run # a subset of all cmake modules. This output will be used to filter # testcases, and the full CMake configuration will be run for # testcases that should be built if filter_stages: cmake_filter_args = [ f'-DMODULES={",".join(filter_stages)}', f'-P{canonical_zephyr_base}/cmake/package_helper.cmake', ] if self.testsuite.sysbuild and not filter_stages: logger.debug("Building %s using sysbuild" % (self.source_dir)) source_args = [ f'-S{canonical_zephyr_base}/share/sysbuild', f'-DAPP_DIR={self.source_dir}' ] else: source_args = [ f'-S{self.source_dir}' ] cmake_args.extend(source_args) cmake_args.extend(args) cmake_opts = ['-DBOARD={}'.format(self.platform.name)] cmake_args.extend(cmake_opts) if self.instance.testsuite.required_snippets: cmake_opts = ['-DSNIPPET={}'.format(';'.join(self.instance.testsuite.required_snippets))] cmake_args.extend(cmake_opts) cmake = shutil.which('cmake') cmd = [cmake] + cmake_args if filter_stages: cmd += cmake_filter_args kwargs = dict() log_command(logger, "Calling cmake", cmd) if self.capture_output: kwargs['stdout'] = subprocess.PIPE # CMake sends the output of message() to stderr unless it's STATUS kwargs['stderr'] = subprocess.STDOUT if self.cwd: kwargs['cwd'] = self.cwd start_time = time.time() if sys.platform == 'linux': p = self.jobserver.popen(cmd, **kwargs) else: p = subprocess.Popen(cmd, **kwargs) out, _ = p.communicate() duration = time.time() - start_time self.instance.build_time += duration if p.returncode == 0: filter_results = self.parse_generated(filter_stages) msg = f"Finished running cmake {self.source_dir} for {self.platform.name} in {duration:.2f} seconds" logger.debug(msg) ret = { 'returncode': p.returncode, 'filter': filter_results } else: self.instance.status = "error" self.instance.reason = "Cmake build failure" for tc in self.instance.testcases: tc.status = self.instance.status logger.error("Cmake build failure: %s for %s" % (self.source_dir, self.platform.name)) ret = {"returncode": p.returncode} if out: os.makedirs(self.build_dir, exist_ok=True) with open(os.path.join(self.build_dir, self.log), "a", encoding=self.default_encoding) as log: log_msg = out.decode(self.default_encoding) log.write(log_msg) return ret class FilterBuilder(CMake): def __init__(self, testsuite: TestSuite, platform: Platform, source_dir, build_dir, jobserver): super().__init__(testsuite, platform, source_dir, build_dir, jobserver) self.log = "config-twister.log" def parse_generated(self, filter_stages=[]): if self.platform.name == "unit_testing": return {} if self.testsuite.sysbuild and not filter_stages: # Load domain yaml to get default domain build directory domain_path = os.path.join(self.build_dir, "domains.yaml") domains = Domains.from_file(domain_path) logger.debug("Loaded sysbuild domain data from %s" % (domain_path)) self.instance.domains = domains domain_build = domains.get_default_domain().build_dir cmake_cache_path = os.path.join(domain_build, "CMakeCache.txt") defconfig_path = os.path.join(domain_build, "zephyr", ".config") edt_pickle = os.path.join(domain_build, "zephyr", "edt.pickle") else: cmake_cache_path = os.path.join(self.build_dir, "CMakeCache.txt") # .config is only available after kconfig stage in cmake. If only dt based filtration is required # package helper call won't produce .config if not filter_stages or "kconfig" in filter_stages: defconfig_path = os.path.join(self.build_dir, "zephyr", ".config") # dt is compiled before kconfig, so edt_pickle is available regardless of choice of filter stages edt_pickle = os.path.join(self.build_dir, "zephyr", "edt.pickle") if not filter_stages or "kconfig" in filter_stages: with open(defconfig_path, "r") as fp: defconfig = {} for line in fp.readlines(): m = self.config_re.match(line) if not m: if line.strip() and not line.startswith("#"): sys.stderr.write("Unrecognized line %s\n" % line) continue defconfig[m.group(1)] = m.group(2).strip() self.defconfig = defconfig cmake_conf = {} try: cache = CMakeCache.from_file(cmake_cache_path) except FileNotFoundError: cache = {} for k in iter(cache): cmake_conf[k.name] = k.value self.cmake_cache = cmake_conf filter_data = { "ARCH": self.platform.arch, "PLATFORM": self.platform.name } filter_data.update(os.environ) if not filter_stages or "kconfig" in filter_stages: filter_data.update(self.defconfig) filter_data.update(self.cmake_cache) if self.testsuite.sysbuild and self.env.options.device_testing: # Verify that twister's arguments support sysbuild. # Twister sysbuild flashing currently only works with west, so # --west-flash must be passed. Additionally, erasing the DUT # before each test with --west-flash=--erase will inherently not # work with sysbuild. if self.env.options.west_flash is None: logger.warning("Sysbuild test will be skipped. " + "West must be used for flashing.") return {os.path.join(self.platform.name, self.testsuite.name): True} elif "--erase" in self.env.options.west_flash: logger.warning("Sysbuild test will be skipped, " + "--erase is not supported with --west-flash") return {os.path.join(self.platform.name, self.testsuite.name): True} if self.testsuite and self.testsuite.filter: try: if os.path.exists(edt_pickle): with open(edt_pickle, 'rb') as f: edt = pickle.load(f) else: edt = None ret = expr_parser.parse(self.testsuite.filter, filter_data, edt) except (ValueError, SyntaxError) as se: sys.stderr.write( "Failed processing %s\n" % self.testsuite.yamlfile) raise se if not ret: return {os.path.join(self.platform.name, self.testsuite.name): True} else: return {os.path.join(self.platform.name, self.testsuite.name): False} else: self.platform.filter_data = filter_data return filter_data class ProjectBuilder(FilterBuilder): def __init__(self, instance: TestInstance, env: TwisterEnv, jobserver, **kwargs): super().__init__(instance.testsuite, instance.platform, instance.testsuite.source_dir, instance.build_dir, jobserver) self.log = "build.log" self.instance = instance self.filtered_tests = 0 self.options = env.options self.env = env self.duts = None def log_info(self, filename, inline_logs, log_testcases=False): filename = os.path.abspath(os.path.realpath(filename)) if inline_logs: logger.info("{:-^100}".format(filename)) try: with open(filename) as fp: data = fp.read() except Exception as e: data = "Unable to read log data (%s)\n" % (str(e)) logger.error(data) logger.info("{:-^100}".format(filename)) if log_testcases: for tc in self.instance.testcases: if not tc.reason: continue logger.info( f"\n{str(tc.name).center(100, '_')}\n" f"{tc.reason}\n" f"{100*'_'}\n" f"{tc.output}" ) else: logger.error("see: " + Fore.YELLOW + filename + Fore.RESET) def log_info_file(self, inline_logs): build_dir = self.instance.build_dir h_log = "{}/handler.log".format(build_dir) b_log = "{}/build.log".format(build_dir) v_log = "{}/valgrind.log".format(build_dir) d_log = "{}/device.log".format(build_dir) pytest_log = "{}/twister_harness.log".format(build_dir) if os.path.exists(v_log) and "Valgrind" in self.instance.reason: self.log_info("{}".format(v_log), inline_logs) elif os.path.exists(pytest_log) and os.path.getsize(pytest_log) > 0: self.log_info("{}".format(pytest_log), inline_logs, log_testcases=True) elif os.path.exists(h_log) and os.path.getsize(h_log) > 0: self.log_info("{}".format(h_log), inline_logs) elif os.path.exists(d_log) and os.path.getsize(d_log) > 0: self.log_info("{}".format(d_log), inline_logs) else: self.log_info("{}".format(b_log), inline_logs) def process(self, pipeline, done, message, lock, results): op = message.get('op') self.instance.setup_handler(self.env) if op == "filter": ret = self.cmake(filter_stages=self.instance.filter_stages) if self.instance.status in ["failed", "error"]: pipeline.put({"op": "report", "test": self.instance}) else: # Here we check the dt/kconfig filter results coming from running cmake if self.instance.name in ret['filter'] and ret['filter'][self.instance.name]: logger.debug("filtering %s" % self.instance.name) self.instance.status = "filtered" self.instance.reason = "runtime filter" results.skipped_runtime += 1 self.instance.add_missing_case_status("skipped") pipeline.put({"op": "report", "test": self.instance}) else: pipeline.put({"op": "cmake", "test": self.instance}) # The build process, call cmake and build with configured generator elif op == "cmake": ret = self.cmake() if self.instance.status in ["failed", "error"]: pipeline.put({"op": "report", "test": self.instance}) elif self.options.cmake_only: if self.instance.status is None: self.instance.status = "passed" pipeline.put({"op": "report", "test": self.instance}) else: # Here we check the runtime filter results coming from running cmake if self.instance.name in ret['filter'] and ret['filter'][self.instance.name]: logger.debug("filtering %s" % self.instance.name) self.instance.status = "filtered" self.instance.reason = "runtime filter" results.skipped_runtime += 1 self.instance.add_missing_case_status("skipped") pipeline.put({"op": "report", "test": self.instance}) else: pipeline.put({"op": "build", "test": self.instance}) elif op == "build": logger.debug("build test: %s" % self.instance.name) ret = self.build() if not ret: self.instance.status = "error" self.instance.reason = "Build Failure" pipeline.put({"op": "report", "test": self.instance}) else: # Count skipped cases during build, for example # due to ram/rom overflow. if self.instance.status == "skipped": results.skipped_runtime += 1 self.instance.add_missing_case_status("skipped", self.instance.reason) if ret.get('returncode', 1) > 0: self.instance.add_missing_case_status("blocked", self.instance.reason) pipeline.put({"op": "report", "test": self.instance}) else: logger.debug(f"Determine test cases for test instance: {self.instance.name}") try: self.determine_testcases(results) pipeline.put({"op": "gather_metrics", "test": self.instance}) except BuildError as e: logger.error(str(e)) self.instance.status = "error" self.instance.reason = str(e) pipeline.put({"op": "report", "test": self.instance}) elif op == "gather_metrics": self.gather_metrics(self.instance) if self.instance.run and self.instance.handler.ready: pipeline.put({"op": "run", "test": self.instance}) else: pipeline.put({"op": "report", "test": self.instance}) # Run the generated binary using one of the supported handlers elif op == "run": logger.debug("run test: %s" % self.instance.name) self.run() logger.debug(f"run status: {self.instance.name} {self.instance.status}") try: # to make it work with pickle self.instance.handler.thread = None self.instance.handler.duts = None pipeline.put({ "op": "report", "test": self.instance, "status": self.instance.status, "reason": self.instance.reason } ) except RuntimeError as e: logger.error(f"RuntimeError: {e}") traceback.print_exc() # Report results and output progress to screen elif op == "report": with lock: done.put(self.instance) self.report_out(results) if not self.options.coverage: if self.options.prep_artifacts_for_testing: pipeline.put({"op": "cleanup", "mode": "device", "test": self.instance}) elif self.options.runtime_artifact_cleanup == "pass" and self.instance.status == "passed": pipeline.put({"op": "cleanup", "mode": "passed", "test": self.instance}) elif self.options.runtime_artifact_cleanup == "all": pipeline.put({"op": "cleanup", "mode": "all", "test": self.instance}) elif op == "cleanup": mode = message.get("mode") if mode == "device": self.cleanup_device_testing_artifacts() elif mode == "passed" or (mode == "all" and self.instance.reason != "Cmake build failure"): self.cleanup_artifacts() def determine_testcases(self, results): yaml_testsuite_name = self.instance.testsuite.id logger.debug(f"Determine test cases for test suite: {yaml_testsuite_name}") elf_file = self.instance.get_elf_file() elf = ELFFile(open(elf_file, "rb")) logger.debug(f"Test instance {self.instance.name} already has {len(self.instance.testcases)} cases.") new_ztest_unit_test_regex = re.compile(r"z_ztest_unit_test__([^\s]+?)__([^\s]*)") detected_cases = [] for section in elf.iter_sections(): if isinstance(section, SymbolTableSection): for sym in section.iter_symbols(): # It is only meant for new ztest fx because only new ztest fx exposes test functions # precisely. # The 1st capture group is new ztest suite name. # The 2nd capture group is new ztest unit test name. matches = new_ztest_unit_test_regex.findall(sym.name) if matches: for m in matches: # new_ztest_suite = m[0] # not used for now test_func_name = m[1].replace("test_", "") testcase_id = f"{yaml_testsuite_name}.{test_func_name}" detected_cases.append(testcase_id) if detected_cases: logger.debug(f"{', '.join(detected_cases)} in {elf_file}") self.instance.testcases.clear() self.instance.testsuite.testcases.clear() # When the old regex-based test case collection is fully deprecated, # this will be the sole place where test cases get added to the test instance. # Then we can further include the new_ztest_suite info in the testcase_id. for testcase_id in detected_cases: self.instance.add_testcase(name=testcase_id) self.instance.testsuite.add_testcase(name=testcase_id) def cleanup_artifacts(self, additional_keep: List[str] = []): logger.debug("Cleaning up {}".format(self.instance.build_dir)) allow = [ os.path.join('zephyr', '.config'), 'handler.log', 'build.log', 'device.log', 'recording.csv', # below ones are needed to make --test-only work as well 'Makefile', 'CMakeCache.txt', 'build.ninja', os.path.join('CMakeFiles', 'rules.ninja') ] allow += additional_keep if self.options.runtime_artifact_cleanup == 'all': allow += [os.path.join('twister', 'testsuite_extra.conf')] allow = [os.path.join(self.instance.build_dir, file) for file in allow] for dirpath, dirnames, filenames in os.walk(self.instance.build_dir, topdown=False): for name in filenames: path = os.path.join(dirpath, name) if path not in allow: os.remove(path) # Remove empty directories and symbolic links to directories for dir in dirnames: path = os.path.join(dirpath, dir) if os.path.islink(path): os.remove(path) elif not os.listdir(path): os.rmdir(path) def cleanup_device_testing_artifacts(self): logger.debug("Cleaning up for Device Testing {}".format(self.instance.build_dir)) files_to_keep = self._get_binaries() files_to_keep.append(os.path.join('zephyr', 'runners.yaml')) if self.testsuite.sysbuild: files_to_keep.append('domains.yaml') for domain in self.instance.domains.get_domains(): files_to_keep += self._get_artifact_allow_list_for_domain(domain.name) self.cleanup_artifacts(files_to_keep) self._sanitize_files() def _get_artifact_allow_list_for_domain(self, domain: str) -> List[str]: """ Return a list of files needed to test a given domain. """ allow = [ os.path.join(domain, 'build.ninja'), os.path.join(domain, 'CMakeCache.txt'), os.path.join(domain, 'CMakeFiles', 'rules.ninja'), os.path.join(domain, 'Makefile'), os.path.join(domain, 'zephyr', '.config'), os.path.join(domain, 'zephyr', 'runners.yaml') ] return allow def _get_binaries(self) -> List[str]: """ Get list of binaries paths (absolute or relative to the self.instance.build_dir), basing on information from platform.binaries or runners.yaml. If they are not found take default binaries like "zephyr/zephyr.hex" etc. """ binaries: List[str] = [] platform = self.instance.platform if platform.binaries: for binary in platform.binaries: binaries.append(os.path.join('zephyr', binary)) # Get binaries for a single-domain build binaries += self._get_binaries_from_runners() # Get binaries in the case of a multiple-domain build if self.testsuite.sysbuild: for domain in self.instance.domains.get_domains(): binaries += self._get_binaries_from_runners(domain.name) # if binaries was not found in platform.binaries and runners.yaml take default ones if len(binaries) == 0: binaries = [ os.path.join('zephyr', 'zephyr.hex'), os.path.join('zephyr', 'zephyr.bin'), os.path.join('zephyr', 'zephyr.elf'), os.path.join('zephyr', 'zephyr.exe'), ] return binaries def _get_binaries_from_runners(self, domain='') -> List[str]: """ Get list of binaries paths (absolute or relative to the self.instance.build_dir) from runners.yaml file. May be used for multiple-domain builds by passing in one domain at a time. """ runners_file_path: str = os.path.join(self.instance.build_dir, domain, 'zephyr', 'runners.yaml') if not os.path.exists(runners_file_path): return [] with open(runners_file_path, 'r') as file: runners_content: dict = yaml.safe_load(file) if 'config' not in runners_content: return [] runners_config: dict = runners_content['config'] binary_keys: List[str] = ['elf_file', 'hex_file', 'bin_file'] binaries: List[str] = [] for binary_key in binary_keys: binary_path = runners_config.get(binary_key) if binary_path is None: continue if os.path.isabs(binary_path): binaries.append(binary_path) else: binaries.append(os.path.join(domain, 'zephyr', binary_path)) return binaries def _sanitize_files(self): """ Sanitize files to make it possible to flash those file on different computer/system. """ self._sanitize_runners_file() self._sanitize_zephyr_base_from_files() def _sanitize_runners_file(self): """ Replace absolute paths of binary files for relative ones. The base directory for those files is f"{self.instance.build_dir}/zephyr" """ runners_dir_path: str = os.path.join(self.instance.build_dir, 'zephyr') runners_file_path: str = os.path.join(runners_dir_path, 'runners.yaml') if not os.path.exists(runners_file_path): return with open(runners_file_path, 'rt') as file: runners_content_text = file.read() runners_content_yaml: dict = yaml.safe_load(runners_content_text) if 'config' not in runners_content_yaml: return runners_config: dict = runners_content_yaml['config'] binary_keys: List[str] = ['elf_file', 'hex_file', 'bin_file'] for binary_key in binary_keys: binary_path = runners_config.get(binary_key) # sanitize only paths which exist and are absolute if binary_path is None or not os.path.isabs(binary_path): continue binary_path_relative = os.path.relpath(binary_path, start=runners_dir_path) runners_content_text = runners_content_text.replace(binary_path, binary_path_relative) with open(runners_file_path, 'wt') as file: file.write(runners_content_text) def _sanitize_zephyr_base_from_files(self): """ Remove Zephyr base paths from selected files. """ files_to_sanitize = [ 'CMakeCache.txt', os.path.join('zephyr', 'runners.yaml'), ] for file_path in files_to_sanitize: file_path = os.path.join(self.instance.build_dir, file_path) if not os.path.exists(file_path): continue with open(file_path, "rt") as file: data = file.read() # add trailing slash at the end of canonical_zephyr_base if it does not exist: path_to_remove = os.path.join(canonical_zephyr_base, "") data = data.replace(path_to_remove, "") with open(file_path, "wt") as file: file.write(data) def report_out(self, results): total_to_do = results.total total_tests_width = len(str(total_to_do)) results.done += 1 instance = self.instance if results.iteration == 1: results.cases += len(instance.testcases) if instance.status in ["error", "failed"]: if instance.status == "error": results.error += 1 txt = " ERROR " else: results.failed += 1 txt = " FAILED " if self.options.verbose: status = Fore.RED + txt + Fore.RESET + instance.reason else: logger.error( "{:<25} {:<50} {}{}{}: {}".format( instance.platform.name, instance.testsuite.name, Fore.RED, txt, Fore.RESET, instance.reason)) if not self.options.verbose: self.log_info_file(self.options.inline_logs) elif instance.status in ["skipped", "filtered"]: status = Fore.YELLOW + "SKIPPED" + Fore.RESET results.skipped_configs += 1 # test cases skipped at the test instance level results.skipped_cases += len(instance.testsuite.testcases) elif instance.status == "passed": status = Fore.GREEN + "PASSED" + Fore.RESET results.passed += 1 for case in instance.testcases: # test cases skipped at the test case level if case.status == 'skipped': results.skipped_cases += 1 else: logger.debug(f"Unknown status = {instance.status}") status = Fore.YELLOW + "UNKNOWN" + Fore.RESET if self.options.verbose: if self.options.cmake_only: more_info = "cmake" elif instance.status in ["skipped", "filtered"]: more_info = instance.reason else: if instance.handler.ready and instance.run: more_info = instance.handler.type_str htime = instance.execution_time if instance.dut: more_info += f": {instance.dut}," if htime: more_info += " {:.3f}s".format(htime) else: more_info = "build" if ( instance.status in ["error", "failed", "timeout", "flash_error"] and hasattr(self.instance.handler, 'seed') and self.instance.handler.seed is not None ): more_info += "/seed: " + str(self.options.seed) logger.info("{:>{}}/{} {:<25} {:<50} {} ({})".format( results.done, total_tests_width, total_to_do , instance.platform.name, instance.testsuite.name, status, more_info)) if instance.status in ["error", "failed", "timeout"]: self.log_info_file(self.options.inline_logs) else: completed_perc = 0 if total_to_do > 0: completed_perc = int((float(results.done) / total_to_do) * 100) sys.stdout.write("INFO - Total complete: %s%4d/%4d%s %2d%% skipped: %s%4d%s, failed: %s%4d%s, error: %s%4d%s\r" % ( Fore.GREEN, results.done, total_to_do, Fore.RESET, completed_perc, Fore.YELLOW if results.skipped_configs > 0 else Fore.RESET, results.skipped_configs, Fore.RESET, Fore.RED if results.failed > 0 else Fore.RESET, results.failed, Fore.RESET, Fore.RED if results.error > 0 else Fore.RESET, results.error, Fore.RESET ) ) sys.stdout.flush() @staticmethod def cmake_assemble_args(extra_args, handler, extra_conf_files, extra_overlay_confs, extra_dtc_overlay_files, cmake_extra_args, build_dir): # Retain quotes around config options config_options = [arg for arg in extra_args if arg.startswith("CONFIG_")] args = [arg for arg in extra_args if not arg.startswith("CONFIG_")] args_expanded = ["-D{}".format(a.replace('"', '\"')) for a in config_options] if handler.ready: args.extend(handler.args) if extra_conf_files: args.append(f"CONF_FILE=\"{';'.join(extra_conf_files)}\"") if extra_dtc_overlay_files: args.append(f"DTC_OVERLAY_FILE=\"{';'.join(extra_dtc_overlay_files)}\"") # merge overlay files into one variable overlays = extra_overlay_confs.copy() additional_overlay_path = os.path.join( build_dir, "twister", "testsuite_extra.conf" ) if os.path.exists(additional_overlay_path): overlays.append(additional_overlay_path) if overlays: args.append("OVERLAY_CONFIG=\"%s\"" % (" ".join(overlays))) # Build the final argument list args_expanded.extend(["-D{}".format(a.replace('"', '\"')) for a in cmake_extra_args]) args_expanded.extend(["-D{}".format(a.replace('"', '')) for a in args]) return args_expanded def cmake(self, filter_stages=[]): args = self.cmake_assemble_args( self.testsuite.extra_args.copy(), # extra_args from YAML self.instance.handler, self.testsuite.extra_conf_files, self.testsuite.extra_overlay_confs, self.testsuite.extra_dtc_overlay_files, self.options.extra_args, # CMake extra args self.instance.build_dir, ) return self.run_cmake(args,filter_stages) def build(self): harness = HarnessImporter.get_harness(self.instance.testsuite.harness.capitalize()) build_result = self.run_build(['--build', self.build_dir]) try: if harness: harness.instance = self.instance harness.build() except ConfigurationError as error: self.instance.status = "error" self.instance.reason = str(error) logger.error(self.instance.reason) return return build_result def run(self): instance = self.instance if instance.handler.ready: logger.debug(f"Reset instance status from '{instance.status}' to None before run.") instance.status = None if instance.handler.type_str == "device": instance.handler.duts = self.duts if(self.options.seed is not None and instance.platform.name.startswith("native_")): self.parse_generated() if('CONFIG_FAKE_ENTROPY_NATIVE_POSIX' in self.defconfig and self.defconfig['CONFIG_FAKE_ENTROPY_NATIVE_POSIX'] == 'y'): instance.handler.seed = self.options.seed if self.options.extra_test_args and instance.platform.arch == "posix": instance.handler.extra_test_args = self.options.extra_test_args harness = HarnessImporter.get_harness(instance.testsuite.harness.capitalize()) try: harness.configure(instance) except ConfigurationError as error: instance.status = "error" instance.reason = str(error) logger.error(instance.reason) return # if isinstance(harness, Pytest): harness.pytest_run(instance.handler.get_test_timeout()) else: instance.handler.handle(harness) sys.stdout.flush() def gather_metrics(self, instance: TestInstance): if self.options.create_rom_ram_report: self.run_build(['--build', self.build_dir, "--target", "footprint"]) if self.options.enable_size_report and not self.options.cmake_only: self.calc_size(instance=instance, from_buildlog=self.options.footprint_from_buildlog) else: instance.metrics["used_ram"] = 0 instance.metrics["used_rom"] = 0 instance.metrics["available_rom"] = 0 instance.metrics["available_ram"] = 0 instance.metrics["unrecognized"] = [] @staticmethod def calc_size(instance: TestInstance, from_buildlog: bool): if instance.status not in ["error", "failed", "skipped"]: if not instance.platform.type in ["native", "qemu", "unit"]: generate_warning = bool(instance.platform.type == "mcu") size_calc = instance.calculate_sizes(from_buildlog=from_buildlog, generate_warning=generate_warning) instance.metrics["used_ram"] = size_calc.get_used_ram() instance.metrics["used_rom"] = size_calc.get_used_rom() instance.metrics["available_rom"] = size_calc.get_available_rom() instance.metrics["available_ram"] = size_calc.get_available_ram() instance.metrics["unrecognized"] = size_calc.unrecognized_sections() else: instance.metrics["used_ram"] = 0 instance.metrics["used_rom"] = 0 instance.metrics["available_rom"] = 0 instance.metrics["available_ram"] = 0 instance.metrics["unrecognized"] = [] instance.metrics["handler_time"] = instance.execution_time class TwisterRunner: def __init__(self, instances, suites, env=None) -> None: self.pipeline = None self.options = env.options self.env = env self.instances = instances self.suites = suites self.duts = None self.jobs = 1 self.results = None self.jobserver = None def run(self): retries = self.options.retry_failed + 1 BaseManager.register('LifoQueue', queue.LifoQueue) manager = BaseManager() manager.start() self.results = ExecutionCounter(total=len(self.instances)) self.iteration = 0 pipeline = manager.LifoQueue() done_queue = manager.LifoQueue() # Set number of jobs if self.options.jobs: self.jobs = self.options.jobs elif self.options.build_only: self.jobs = multiprocessing.cpu_count() * 2 else: self.jobs = multiprocessing.cpu_count() if sys.platform == "linux": if os.name == 'posix': self.jobserver = GNUMakeJobClient.from_environ(jobs=self.options.jobs) if not self.jobserver: self.jobserver = GNUMakeJobServer(self.jobs) elif self.jobserver.jobs: self.jobs = self.jobserver.jobs # TODO: Implement this on windows/mac also else: self.jobserver = JobClient() logger.info("JOBS: %d", self.jobs) self.update_counting_before_pipeline() while True: self.results.iteration += 1 if self.results.iteration > 1: logger.info("%d Iteration:" % (self.results.iteration)) time.sleep(self.options.retry_interval) # waiting for the system to settle down self.results.done = self.results.total - self.results.failed - self.results.error self.results.failed = 0 if self.options.retry_build_errors: self.results.error = 0 else: self.results.done = self.results.skipped_filter self.execute(pipeline, done_queue) while True: try: inst = done_queue.get_nowait() except queue.Empty: break else: inst.metrics.update(self.instances[inst.name].metrics) inst.metrics["handler_time"] = inst.execution_time inst.metrics["unrecognized"] = [] self.instances[inst.name] = inst print("") retry_errors = False if self.results.error and self.options.retry_build_errors: retry_errors = True retries = retries - 1 if retries == 0 or ( self.results.failed == 0 and not retry_errors): break self.show_brief() def update_counting_before_pipeline(self): ''' Updating counting before pipeline is necessary because statically filterd test instance never enter the pipeline. While some pipeline output needs the static filter stats. So need to prepare them before pipline starts. ''' for instance in self.instances.values(): if instance.status == 'filtered' and not instance.reason == 'runtime filter': self.results.skipped_filter += 1 self.results.skipped_configs += 1 self.results.skipped_cases += len(instance.testsuite.testcases) self.results.cases += len(instance.testsuite.testcases) elif instance.status == 'error': self.results.error += 1 def show_brief(self): logger.info("%d test scenarios (%d test instances) selected, " "%d configurations skipped (%d by static filter, %d at runtime)." % (len(self.suites), len(self.instances), self.results.skipped_configs, self.results.skipped_filter, self.results.skipped_configs - self.results.skipped_filter)) def add_tasks_to_queue(self, pipeline, build_only=False, test_only=False, retry_build_errors=False): for instance in self.instances.values(): if build_only: instance.run = False no_retry_statuses = ['passed', 'skipped', 'filtered'] if not retry_build_errors: no_retry_statuses.append("error") if instance.status not in no_retry_statuses: logger.debug(f"adding {instance.name}") if instance.status: instance.retries += 1 instance.status = None # Check if cmake package_helper script can be run in advance. instance.filter_stages = [] if instance.testsuite.filter: instance.filter_stages = self.get_cmake_filter_stages(instance.testsuite.filter, expr_parser.reserved.keys()) if test_only and instance.run: pipeline.put({"op": "run", "test": instance}) elif instance.filter_stages and "full" not in instance.filter_stages: pipeline.put({"op": "filter", "test": instance}) else: cache_file = os.path.join(instance.build_dir, "CMakeCache.txt") if os.path.exists(cache_file) and self.env.options.aggressive_no_clean: pipeline.put({"op": "build", "test": instance}) else: pipeline.put({"op": "cmake", "test": instance}) def pipeline_mgr(self, pipeline, done_queue, lock, results): if sys.platform == 'linux': with self.jobserver.get_job(): while True: try: task = pipeline.get_nowait() except queue.Empty: break else: instance = task['test'] pb = ProjectBuilder(instance, self.env, self.jobserver) pb.duts = self.duts pb.process(pipeline, done_queue, task, lock, results) return True else: while True: try: task = pipeline.get_nowait() except queue.Empty: break else: instance = task['test'] pb = ProjectBuilder(instance, self.env, self.jobserver) pb.duts = self.duts pb.process(pipeline, done_queue, task, lock, results) return True def execute(self, pipeline, done): lock = Lock() logger.info("Adding tasks to the queue...") self.add_tasks_to_queue(pipeline, self.options.build_only, self.options.test_only, retry_build_errors=self.options.retry_build_errors) logger.info("Added initial list of jobs to queue") processes = [] for _ in range(self.jobs): p = Process(target=self.pipeline_mgr, args=(pipeline, done, lock, self.results, )) processes.append(p) p.start() logger.debug(f"Launched {self.jobs} jobs") try: for p in processes: p.join() except KeyboardInterrupt: logger.info("Execution interrupted") for p in processes: p.terminate() @staticmethod def get_cmake_filter_stages(filt, logic_keys): """ Analyze filter expressions from test yaml and decide if dts and/or kconfig based filtering will be needed.""" dts_required = False kconfig_required = False full_required = False filter_stages = [] # Compress args in expressions like "function('x', 'y')" so they are not split when splitting by whitespaces filt = filt.replace(", ", ",") # Remove logic words for k in logic_keys: filt = filt.replace(f"{k} ", "") # Remove brackets filt = filt.replace("(", "") filt = filt.replace(")", "") # Splite by whitespaces filt = filt.split() for expression in filt: if expression.startswith("dt_"): dts_required = True elif expression.startswith("CONFIG"): kconfig_required = True else: full_required = True if full_required: return ["full"] if dts_required: filter_stages.append("dts") if kconfig_required: filter_stages.append("kconfig") return filter_stages