# vim: set syntax=python ts=4 : # # Copyright (c) 2018-2025 Intel Corporation # SPDX-License-Identifier: Apache-2.0 import contextlib import glob import logging import os import pathlib import re import shutil import subprocess import sys import tempfile logger = logging.getLogger('twister') logger.setLevel(logging.DEBUG) supported_coverage_formats = { "gcovr": ["html", "xml", "csv", "txt", "coveralls", "sonarqube"], "lcov": ["html", "lcov"] } class CoverageTool: """ Base class for every supported coverage tool """ def __init__(self): self.gcov_tool = None self.base_dir = None self.output_formats = None self.coverage_capture = True self.coverage_report = True self.coverage_per_instance = False self.instances = {} @staticmethod def factory(tool, jobs=None): if tool == 'lcov': t = Lcov(jobs) elif tool == 'gcovr': t = Gcovr() else: logger.error(f"Unsupported coverage tool specified: {tool}") return None logger.debug(f"Select {tool} as the coverage tool...") return t @staticmethod def retrieve_gcov_data(input_file): logger.debug(f"Working on {input_file}") extracted_coverage_info = {} capture_data = False capture_complete = False with open(input_file) as fp: for line in fp.readlines(): if re.search("GCOV_COVERAGE_DUMP_START", line): capture_data = True capture_complete = False continue if re.search("GCOV_COVERAGE_DUMP_END", line): capture_complete = True # Keep searching for additional dumps # Loop until the coverage data is found. if not capture_data: continue if line.startswith("*"): sp = line.split("<") if len(sp) > 1: # Remove the leading delimiter "*" file_name = sp[0][1:] # Remove the trailing new line char hex_dump = sp[1][:-1] else: continue else: continue if file_name in extracted_coverage_info: extracted_coverage_info[file_name].append(hex_dump) else: extracted_coverage_info[file_name] = [hex_dump] if not capture_data: capture_complete = True return {'complete': capture_complete, 'data': extracted_coverage_info} def merge_hexdumps(self, hexdumps): # Only one hexdump if len(hexdumps) == 1: return hexdumps[0] with tempfile.TemporaryDirectory() as dir: # Write each hexdump to a dedicated temporary folder dirs = [] for idx, dump in enumerate(hexdumps): subdir = dir + f'/{idx}' os.mkdir(subdir) dirs.append(subdir) with open(f'{subdir}/tmp.gcda', 'wb') as fp: fp.write(bytes.fromhex(dump)) # Iteratively call gcov-tool (not gcov) to merge the files merge_tool = self.gcov_tool + '-tool' for d1, d2 in zip(dirs[:-1], dirs[1:], strict=False): cmd = [merge_tool, 'merge', d1, d2, '--output', d2] subprocess.call(cmd) # Read back the final output file with open(f'{dirs[-1]}/tmp.gcda', 'rb') as fp: return fp.read(-1).hex() def create_gcda_files(self, extracted_coverage_info): gcda_created = True logger.debug(f"Generating {len(extracted_coverage_info)} gcda files") for filename, hexdumps in extracted_coverage_info.items(): # if kobject_hash is given for coverage gcovr fails # hence skipping it problem only in gcovr v4.1 if "kobject_hash" in filename: filename = (filename[:-4]) + "gcno" with contextlib.suppress(Exception): os.remove(filename) continue try: hexdump_val = self.merge_hexdumps(hexdumps) hex_bytes = bytes.fromhex(hexdump_val) with open(filename, 'wb') as fp: fp.write(hex_bytes) except ValueError: logger.exception(f"Unable to convert hex data for file: {filename}") gcda_created = False except FileNotFoundError: logger.exception(f"Unable to create gcda file: {filename}") gcda_created = False return gcda_created def capture_data(self, outdir): coverage_completed = True for filename in glob.glob(f"{outdir}/**/handler.log", recursive=True): gcov_data = self.__class__.retrieve_gcov_data(filename) capture_complete = gcov_data['complete'] extracted_coverage_info = gcov_data['data'] if capture_complete: gcda_created = self.create_gcda_files(extracted_coverage_info) if gcda_created: logger.debug(f"Gcov data captured: {filename}") else: logger.error(f"Gcov data invalid for: {filename}") coverage_completed = False else: logger.error(f"Gcov data capture incomplete: {filename}") coverage_completed = False return coverage_completed def generate(self, outdir): coverage_completed = self.capture_data(outdir) if self.coverage_capture else True if not coverage_completed or not self.coverage_report: return coverage_completed, {} build_dirs = None if not self.coverage_capture and self.coverage_report and self.coverage_per_instance: build_dirs = [instance.build_dir for instance in self.instances.values()] reports = {} with open(os.path.join(outdir, "coverage.log"), "a") as coveragelog: ret, reports = self._generate(outdir, coveragelog, build_dirs) if ret == 0: report_log = { "html": "HTML report generated: {}".format( os.path.join(outdir, "coverage", "index.html") ), "lcov": "LCOV report generated: {}".format( os.path.join(outdir, "coverage.info") ), "xml": "XML report generated: {}".format( os.path.join(outdir, "coverage", "coverage.xml") ), "csv": "CSV report generated: {}".format( os.path.join(outdir, "coverage", "coverage.csv") ), "txt": "TXT report generated: {}".format( os.path.join(outdir, "coverage", "coverage.txt") ), "coveralls": "Coveralls report generated: {}".format( os.path.join(outdir, "coverage", "coverage.coveralls.json") ), "sonarqube": "Sonarqube report generated: {}".format( os.path.join(outdir, "coverage", "coverage.sonarqube.xml") ) } for r in self.output_formats.split(','): logger.info(report_log[r]) else: coverage_completed = False logger.debug(f"All coverage data processed: {coverage_completed}") return coverage_completed, reports class Lcov(CoverageTool): def __init__(self, jobs=None): super().__init__() self.ignores = [] self.ignore_branch_patterns = [] self.output_formats = "lcov,html" self.version = self.get_version() self.jobs = jobs def get_version(self): try: result = subprocess.run( ['lcov', '--version'], capture_output=True, text=True, check=True ) version_output = result.stdout.strip().replace('lcov: LCOV version ', '') return version_output except subprocess.CalledProcessError as e: logger.error(f"Unable to determine lcov version: {e}") sys.exit(1) except FileNotFoundError as e: logger.error(f"Unable to find lcov tool: {e}") sys.exit(1) def add_ignore_file(self, pattern): self.ignores.append('*' + pattern + '*') def add_ignore_directory(self, pattern): self.ignores.append('*/' + pattern + '/*') def add_ignore_branch_pattern(self, pattern): self.ignore_branch_patterns.append(pattern) @property def is_lcov_v2(self): return self.version.startswith("2") def run_command(self, cmd, coveragelog): if self.is_lcov_v2: # The --ignore-errors source option is added for genhtml as well as # lcov to avoid it exiting due to # samples/application_development/external_lib/ cmd += [ "--ignore-errors", "inconsistent,inconsistent", "--ignore-errors", "negative,negative", "--ignore-errors", "unused,unused", "--ignore-errors", "empty,empty", "--ignore-errors", "mismatch,mismatch", ] cmd_str = " ".join(cmd) logger.debug(f"Running {cmd_str}...") return subprocess.call(cmd, stdout=coveragelog) def run_lcov(self, args, coveragelog): if self.is_lcov_v2: branch_coverage = "branch_coverage=1" if self.jobs is None: # Default: --parallel=0 will autodetect appropriate parallelism parallel = ["--parallel", "0"] elif self.jobs == 1: # Serial execution requested, don't parallelize at all parallel = [] else: parallel = ["--parallel", str(self.jobs)] else: branch_coverage = "lcov_branch_coverage=1" parallel = [] cmd = [ "lcov", "--gcov-tool", self.gcov_tool, "--rc", branch_coverage, ] + parallel + args return self.run_command(cmd, coveragelog) def _generate(self, outdir, coveragelog, build_dirs=None): coveragefile = os.path.join(outdir, "coverage.info") ztestfile = os.path.join(outdir, "ztest.info") if build_dirs: files = [] for dir_ in build_dirs: files_ = [fname for fname in [os.path.join(dir_, "coverage.info"), os.path.join(dir_, "ztest.info")] if os.path.exists(fname)] if not files_: logger.debug("Coverage merge no files in: %s", dir_) continue files += files_ logger.debug("Coverage merge %d reports in %s", len(files), outdir) cmd = ["--output-file", coveragefile] for filename in files: cmd.append("--add-tracefile") cmd.append(filename) else: cmd = ["--capture", "--directory", outdir, "--output-file", coveragefile] if self.coverage_per_instance and len(self.instances) == 1: invalid_chars = re.compile(r"[^A-Za-z0-9_]") cmd.append("--test-name") cmd.append(invalid_chars.sub("_", next(iter(self.instances)))) ret = self.run_lcov(cmd, coveragelog) if ret: logger.error("LCOV capture report stage failed with %s", ret) return ret, {} # We want to remove tests/* and tests/ztest/test/* but save tests/ztest cmd = ["--extract", coveragefile, os.path.join(self.base_dir, "tests", "ztest", "*"), "--output-file", ztestfile] ret = self.run_lcov(cmd, coveragelog) if ret: logger.error("LCOV extract report stage failed with %s", ret) return ret, {} files = [] if os.path.exists(ztestfile) and os.path.getsize(ztestfile) > 0: cmd = ["--remove", ztestfile, os.path.join(self.base_dir, "tests/ztest/test/*"), "--output-file", ztestfile] ret = self.run_lcov(cmd, coveragelog) if ret: logger.error("LCOV remove ztest report stage failed with %s", ret) return ret, {} files = [coveragefile, ztestfile] else: files = [coveragefile] for i in self.ignores: cmd = ["--remove", coveragefile, i, "--output-file", coveragefile] ret = self.run_lcov(cmd, coveragelog) if ret: logger.error("LCOV remove ignores report stage failed with %s", ret) return ret, {} if 'html' not in self.output_formats.split(','): return 0, {} cmd = ["genhtml", "--legend", "--branch-coverage", "--prefix", self.base_dir, "-output-directory", os.path.join(outdir, "coverage")] if self.coverage_per_instance: cmd.append("--show-details") cmd += files ret = self.run_command(cmd, coveragelog) if ret: logger.error("LCOV genhtml report stage failed with %s", ret) # TODO: Add LCOV summary coverage report. return ret, { 'report': coveragefile, 'ztest': ztestfile, 'summary': None } class Gcovr(CoverageTool): def __init__(self): super().__init__() self.ignores = [] self.ignore_branch_patterns = [] self.output_formats = "html" self.version = self.get_version() # Different ifdef-ed implementations of the same function should not be # in conflict treated by GCOVR as separate objects for coverage statistics. self.options = ["-v", "--merge-mode-functions=separate"] def get_version(self): try: result = subprocess.run( ['gcovr', '--version'], capture_output=True, text=True, check=True ) version_lines = result.stdout.strip().split('\n') if version_lines: version_output = version_lines[0].replace('gcovr ', '') return version_output except subprocess.CalledProcessError as e: logger.error(f"Unable to determine gcovr version: {e}") sys.exit(1) except FileNotFoundError as e: logger.error(f"Unable to find gcovr tool: {e}") sys.exit(1) def add_ignore_file(self, pattern): self.ignores.append('.*' + pattern + '.*') def add_ignore_directory(self, pattern): self.ignores.append(".*/" + pattern + '/.*') def add_ignore_branch_pattern(self, pattern): self.ignore_branch_patterns.append(pattern) @staticmethod def _interleave_list(prefix, list): tuple_list = [(prefix, item) for item in list] return [item for sublist in tuple_list for item in sublist] @staticmethod def _flatten_list(list): return [a for b in list for a in b] def collect_coverage(self, outdir, coverage_file, ztest_file, coveragelog): excludes = Gcovr._interleave_list("-e", self.ignores) if len(self.ignore_branch_patterns) > 0: # Last pattern overrides previous values, so merge all patterns together merged_regex = "|".join([f"({p})" for p in self.ignore_branch_patterns]) excludes += ["--exclude-branches-by-pattern", merged_regex] # We want to remove tests/* and tests/ztest/test/* but save tests/ztest cmd = ["gcovr", "-r", self.base_dir, "--gcov-ignore-parse-errors=negative_hits.warn_once_per_file", "--gcov-executable", self.gcov_tool, "-e", "tests/*"] cmd += excludes + self.options + ["--json", "-o", coverage_file, outdir] cmd_str = " ".join(cmd) logger.debug(f"Running: {cmd_str}") coveragelog.write(f"Running: {cmd_str}\n") coveragelog.flush() ret = subprocess.call(cmd, stdout=coveragelog, stderr=coveragelog) if ret: logger.error(f"GCOVR failed with {ret}") return ret, [] cmd = ["gcovr", "-r", self.base_dir] + self.options cmd += ["--gcov-executable", self.gcov_tool, "-f", "tests/ztest", "-e", "tests/ztest/test/*", "--json", "-o", ztest_file, outdir] cmd_str = " ".join(cmd) logger.debug(f"Running: {cmd_str}") coveragelog.write(f"Running: {cmd_str}\n") coveragelog.flush() ret = subprocess.call(cmd, stdout=coveragelog, stderr=coveragelog) if ret: logger.error(f"GCOVR ztest stage failed with {ret}") return ret, [] return ret, [file_ for file_ in [coverage_file, ztest_file] if os.path.exists(file_) and os.path.getsize(file_) > 0] def _generate(self, outdir, coveragelog, build_dirs=None): coverage_file = os.path.join(outdir, "coverage.json") coverage_summary = os.path.join(outdir, "coverage_summary.json") ztest_file = os.path.join(outdir, "ztest.json") ret = 0 cmd_ = [] files = [] if build_dirs: for dir_ in build_dirs: files_ = [fname for fname in [os.path.join(dir_, "coverage.json"), os.path.join(dir_, "ztest.json")] if os.path.exists(fname)] if not files_: logger.debug(f"Coverage merge no files in: {dir_}") continue files += files_ logger.debug(f"Coverage merge {len(files)} reports in {outdir}") ztest_file = None cmd_ = ["--json-pretty", "--json", coverage_file] else: ret, files = self.collect_coverage(outdir, coverage_file, ztest_file, coveragelog) logger.debug(f"Coverage collected {len(files)} reports from: {outdir}") if not files: logger.warning(f"No coverage files to compose report for {outdir}") return ret, {} subdir = os.path.join(outdir, "coverage") os.makedirs(subdir, exist_ok=True) tracefiles = self._interleave_list("--add-tracefile", files) # Convert command line argument (comma-separated list) to gcovr flags report_options = { "html": ["--html", os.path.join(subdir, "index.html"), "--html-details"], "xml": ["--xml", os.path.join(subdir, "coverage.xml"), "--xml-pretty"], "csv": ["--csv", os.path.join(subdir, "coverage.csv")], "txt": ["--txt", os.path.join(subdir, "coverage.txt")], "coveralls": ["--coveralls", os.path.join(subdir, "coverage.coveralls.json"), "--coveralls-pretty"], "sonarqube": ["--sonarqube", os.path.join(subdir, "coverage.sonarqube.xml")] } gcovr_options = self._flatten_list( [report_options[r] for r in self.output_formats.split(',')] ) cmd = ["gcovr", "-r", self.base_dir] + self.options + gcovr_options + tracefiles cmd += cmd_ cmd += ["--json-summary-pretty", "--json-summary", coverage_summary] cmd_str = " ".join(cmd) logger.debug(f"Running: {cmd_str}") coveragelog.write(f"Running: {cmd_str}\n") coveragelog.flush() ret = subprocess.call(cmd, stdout=coveragelog, stderr=coveragelog) if ret: logger.error(f"GCOVR merge report stage failed with {ret}") return ret, { 'report': coverage_file, 'ztest': ztest_file, 'summary': coverage_summary } def choose_gcov_tool(options, is_system_gcov): gcov_tool = None if not options.gcov_tool: zephyr_sdk_gcov_tool = os.path.join( os.environ.get("ZEPHYR_SDK_INSTALL_DIR", default=""), "x86_64-zephyr-elf/bin/x86_64-zephyr-elf-gcov") if os.environ.get("ZEPHYR_TOOLCHAIN_VARIANT") == "llvm": llvm_path = os.environ.get("LLVM_TOOLCHAIN_PATH") if llvm_path is not None: llvm_path = os.path.join(llvm_path, "bin") llvm_cov = shutil.which("llvm-cov", path=llvm_path) llvm_cov_ext = pathlib.Path(llvm_cov).suffix gcov_lnk = os.path.join(options.outdir, f"gcov{llvm_cov_ext}") try: os.symlink(llvm_cov, gcov_lnk) except OSError: shutil.copy(llvm_cov, gcov_lnk) gcov_tool = gcov_lnk elif is_system_gcov: gcov_tool = "gcov" elif os.path.exists(zephyr_sdk_gcov_tool): gcov_tool = zephyr_sdk_gcov_tool else: logger.error( "Can't find a suitable gcov tool. Use --gcov-tool or set ZEPHYR_SDK_INSTALL_DIR." ) sys.exit(1) else: gcov_tool = str(options.gcov_tool) return gcov_tool def run_coverage_tool(options, outdir, is_system_gcov, instances, coverage_capture, coverage_report): coverage_tool = CoverageTool.factory(options.coverage_tool, jobs=options.jobs) if not coverage_tool: return False, {} coverage_tool.gcov_tool = str(choose_gcov_tool(options, is_system_gcov)) logger.debug(f"Using gcov tool: {coverage_tool.gcov_tool}") coverage_tool.instances = instances coverage_tool.coverage_per_instance = options.coverage_per_instance coverage_tool.coverage_capture = coverage_capture coverage_tool.coverage_report = coverage_report coverage_tool.base_dir = os.path.abspath(options.coverage_basedir) # Apply output format default if options.coverage_formats is not None: coverage_tool.output_formats = options.coverage_formats coverage_tool.add_ignore_file('generated') coverage_tool.add_ignore_directory('tests') coverage_tool.add_ignore_directory('samples') # Ignore branch coverage on LOG_* and LOG_HEXDUMP_* macros # Branch misses are due to the implementation of Z_LOG2 and cannot be avoided coverage_tool.add_ignore_branch_pattern(r"^\s*LOG_(?:HEXDUMP_)?(?:DBG|INF|WRN|ERR)\(.*") # Ignore branch coverage on __ASSERT* macros # Covering the failing case is not desirable as it will immediately terminate the test. coverage_tool.add_ignore_branch_pattern(r"^\s*__ASSERT(?:_EVAL|_NO_MSG|_POST_ACTION)?\(.*") return coverage_tool.generate(outdir) def has_system_gcov(platform): return platform and (platform.type in {"native", "unit"}) def run_coverage(options, testplan): """ Summary code coverage over the full test plan's scope. """ is_system_gcov = False for plat in options.coverage_platform: if has_system_gcov(testplan.get_platform(plat)): is_system_gcov = True break return run_coverage_tool(options, options.outdir, is_system_gcov, instances=testplan.instances, coverage_capture=False, coverage_report=True) def run_coverage_instance(options, instance): """ Per-instance code coverage called by ProjectBuilder ('coverage' operation). """ is_system_gcov = has_system_gcov(instance.platform) return run_coverage_tool(options, instance.build_dir, is_system_gcov, instances={instance.name: instance}, coverage_capture=True, coverage_report=options.coverage_per_instance)