1# vim: set syntax=python ts=4 : 2# 3# Copyright (c) 2018-2025 Intel Corporation 4# SPDX-License-Identifier: Apache-2.0 5 6import contextlib 7import glob 8import logging 9import os 10import pathlib 11import re 12import shutil 13import subprocess 14import sys 15import tempfile 16 17logger = logging.getLogger('twister') 18logger.setLevel(logging.DEBUG) 19 20supported_coverage_formats = { 21 "gcovr": ["html", "xml", "csv", "txt", "coveralls", "sonarqube"], 22 "lcov": ["html", "lcov"] 23} 24 25 26class CoverageTool: 27 """ Base class for every supported coverage tool 28 """ 29 30 def __init__(self): 31 self.gcov_tool = None 32 self.base_dir = None 33 self.output_formats = None 34 self.coverage_capture = True 35 self.coverage_report = True 36 self.coverage_per_instance = False 37 self.instances = {} 38 39 @staticmethod 40 def factory(tool, jobs=None): 41 if tool == 'lcov': 42 t = Lcov(jobs) 43 elif tool == 'gcovr': 44 t = Gcovr() 45 else: 46 logger.error(f"Unsupported coverage tool specified: {tool}") 47 return None 48 49 logger.debug(f"Select {tool} as the coverage tool...") 50 return t 51 52 @staticmethod 53 def retrieve_gcov_data(input_file): 54 logger.debug(f"Working on {input_file}") 55 extracted_coverage_info = {} 56 capture_data = False 57 capture_complete = False 58 with open(input_file) as fp: 59 for line in fp.readlines(): 60 if re.search("GCOV_COVERAGE_DUMP_START", line): 61 capture_data = True 62 capture_complete = False 63 continue 64 if re.search("GCOV_COVERAGE_DUMP_END", line): 65 capture_complete = True 66 # Keep searching for additional dumps 67 # Loop until the coverage data is found. 68 if not capture_data: 69 continue 70 if line.startswith("*"): 71 sp = line.split("<") 72 if len(sp) > 1: 73 # Remove the leading delimiter "*" 74 file_name = sp[0][1:] 75 # Remove the trailing new line char 76 hex_dump = sp[1][:-1] 77 else: 78 continue 79 else: 80 continue 81 if file_name in extracted_coverage_info: 82 extracted_coverage_info[file_name].append(hex_dump) 83 else: 84 extracted_coverage_info[file_name] = [hex_dump] 85 if not capture_data: 86 capture_complete = True 87 return {'complete': capture_complete, 'data': extracted_coverage_info} 88 89 def merge_hexdumps(self, hexdumps): 90 # Only one hexdump 91 if len(hexdumps) == 1: 92 return hexdumps[0] 93 94 with tempfile.TemporaryDirectory() as dir: 95 # Write each hexdump to a dedicated temporary folder 96 dirs = [] 97 for idx, dump in enumerate(hexdumps): 98 subdir = dir + f'/{idx}' 99 os.mkdir(subdir) 100 dirs.append(subdir) 101 with open(f'{subdir}/tmp.gcda', 'wb') as fp: 102 fp.write(bytes.fromhex(dump)) 103 104 # Iteratively call gcov-tool (not gcov) to merge the files 105 merge_tool = self.gcov_tool + '-tool' 106 for d1, d2 in zip(dirs[:-1], dirs[1:], strict=False): 107 cmd = [merge_tool, 'merge', d1, d2, '--output', d2] 108 subprocess.call(cmd) 109 110 # Read back the final output file 111 with open(f'{dirs[-1]}/tmp.gcda', 'rb') as fp: 112 return fp.read(-1).hex() 113 114 def create_gcda_files(self, extracted_coverage_info): 115 gcda_created = True 116 logger.debug(f"Generating {len(extracted_coverage_info)} gcda files") 117 for filename, hexdumps in extracted_coverage_info.items(): 118 # if kobject_hash is given for coverage gcovr fails 119 # hence skipping it problem only in gcovr v4.1 120 if "kobject_hash" in filename: 121 filename = (filename[:-4]) + "gcno" 122 with contextlib.suppress(Exception): 123 os.remove(filename) 124 continue 125 126 try: 127 hexdump_val = self.merge_hexdumps(hexdumps) 128 hex_bytes = bytes.fromhex(hexdump_val) 129 with open(filename, 'wb') as fp: 130 fp.write(hex_bytes) 131 except ValueError: 132 logger.exception(f"Unable to convert hex data for file: {filename}") 133 gcda_created = False 134 except FileNotFoundError: 135 logger.exception(f"Unable to create gcda file: {filename}") 136 gcda_created = False 137 return gcda_created 138 139 def capture_data(self, outdir): 140 coverage_completed = True 141 for filename in glob.glob(f"{outdir}/**/handler.log", recursive=True): 142 gcov_data = self.__class__.retrieve_gcov_data(filename) 143 capture_complete = gcov_data['complete'] 144 extracted_coverage_info = gcov_data['data'] 145 if capture_complete: 146 gcda_created = self.create_gcda_files(extracted_coverage_info) 147 if gcda_created: 148 logger.debug(f"Gcov data captured: {filename}") 149 else: 150 logger.error(f"Gcov data invalid for: {filename}") 151 coverage_completed = False 152 else: 153 logger.error(f"Gcov data capture incomplete: {filename}") 154 coverage_completed = False 155 return coverage_completed 156 157 def generate(self, outdir): 158 coverage_completed = self.capture_data(outdir) if self.coverage_capture else True 159 if not coverage_completed or not self.coverage_report: 160 return coverage_completed, {} 161 build_dirs = None 162 if not self.coverage_capture and self.coverage_report and self.coverage_per_instance: 163 build_dirs = [instance.build_dir for instance in self.instances.values()] 164 reports = {} 165 with open(os.path.join(outdir, "coverage.log"), "a") as coveragelog: 166 ret, reports = self._generate(outdir, coveragelog, build_dirs) 167 if ret == 0: 168 report_log = { 169 "html": "HTML report generated: {}".format( 170 os.path.join(outdir, "coverage", "index.html") 171 ), 172 "lcov": "LCOV report generated: {}".format( 173 os.path.join(outdir, "coverage.info") 174 ), 175 "xml": "XML report generated: {}".format( 176 os.path.join(outdir, "coverage", "coverage.xml") 177 ), 178 "csv": "CSV report generated: {}".format( 179 os.path.join(outdir, "coverage", "coverage.csv") 180 ), 181 "txt": "TXT report generated: {}".format( 182 os.path.join(outdir, "coverage", "coverage.txt") 183 ), 184 "coveralls": "Coveralls report generated: {}".format( 185 os.path.join(outdir, "coverage", "coverage.coveralls.json") 186 ), 187 "sonarqube": "Sonarqube report generated: {}".format( 188 os.path.join(outdir, "coverage", "coverage.sonarqube.xml") 189 ) 190 } 191 for r in self.output_formats.split(','): 192 logger.info(report_log[r]) 193 else: 194 coverage_completed = False 195 logger.debug(f"All coverage data processed: {coverage_completed}") 196 return coverage_completed, reports 197 198 199class Lcov(CoverageTool): 200 201 def __init__(self, jobs=None): 202 super().__init__() 203 self.ignores = [] 204 self.ignore_branch_patterns = [] 205 self.output_formats = "lcov,html" 206 self.version = self.get_version() 207 self.jobs = jobs 208 209 def get_version(self): 210 try: 211 result = subprocess.run( 212 ['lcov', '--version'], 213 capture_output=True, 214 text=True, 215 check=True 216 ) 217 version_output = result.stdout.strip().replace('lcov: LCOV version ', '') 218 return version_output 219 except subprocess.CalledProcessError as e: 220 logger.error(f"Unable to determine lcov version: {e}") 221 sys.exit(1) 222 except FileNotFoundError as e: 223 logger.error(f"Unable to find lcov tool: {e}") 224 sys.exit(1) 225 226 def add_ignore_file(self, pattern): 227 self.ignores.append('*' + pattern + '*') 228 229 def add_ignore_directory(self, pattern): 230 self.ignores.append('*/' + pattern + '/*') 231 232 def add_ignore_branch_pattern(self, pattern): 233 self.ignore_branch_patterns.append(pattern) 234 235 @property 236 def is_lcov_v2(self): 237 return self.version.startswith("2") 238 239 def run_command(self, cmd, coveragelog): 240 if self.is_lcov_v2: 241 # The --ignore-errors source option is added for genhtml as well as 242 # lcov to avoid it exiting due to 243 # samples/application_development/external_lib/ 244 cmd += [ 245 "--ignore-errors", "inconsistent,inconsistent", 246 "--ignore-errors", "negative,negative", 247 "--ignore-errors", "unused,unused", 248 "--ignore-errors", "empty,empty", 249 "--ignore-errors", "mismatch,mismatch", 250 ] 251 252 cmd_str = " ".join(cmd) 253 logger.debug(f"Running {cmd_str}...") 254 return subprocess.call(cmd, stdout=coveragelog) 255 256 def run_lcov(self, args, coveragelog): 257 if self.is_lcov_v2: 258 branch_coverage = "branch_coverage=1" 259 if self.jobs is None: 260 # Default: --parallel=0 will autodetect appropriate parallelism 261 parallel = ["--parallel", "0"] 262 elif self.jobs == 1: 263 # Serial execution requested, don't parallelize at all 264 parallel = [] 265 else: 266 parallel = ["--parallel", str(self.jobs)] 267 else: 268 branch_coverage = "lcov_branch_coverage=1" 269 parallel = [] 270 271 cmd = [ 272 "lcov", "--gcov-tool", self.gcov_tool, 273 "--rc", branch_coverage, 274 ] + parallel + args 275 return self.run_command(cmd, coveragelog) 276 277 278 def _generate(self, outdir, coveragelog, build_dirs=None): 279 coveragefile = os.path.join(outdir, "coverage.info") 280 ztestfile = os.path.join(outdir, "ztest.info") 281 282 if build_dirs: 283 files = [] 284 for dir_ in build_dirs: 285 files_ = [fname for fname in 286 [os.path.join(dir_, "coverage.info"), 287 os.path.join(dir_, "ztest.info")] 288 if os.path.exists(fname)] 289 if not files_: 290 logger.debug("Coverage merge no files in: %s", dir_) 291 continue 292 files += files_ 293 logger.debug("Coverage merge %d reports in %s", len(files), outdir) 294 cmd = ["--output-file", coveragefile] 295 for filename in files: 296 cmd.append("--add-tracefile") 297 cmd.append(filename) 298 else: 299 cmd = ["--capture", "--directory", outdir, "--output-file", coveragefile] 300 if self.coverage_per_instance and len(self.instances) == 1: 301 invalid_chars = re.compile(r"[^A-Za-z0-9_]") 302 cmd.append("--test-name") 303 cmd.append(invalid_chars.sub("_", next(iter(self.instances)))) 304 ret = self.run_lcov(cmd, coveragelog) 305 if ret: 306 logger.error("LCOV capture report stage failed with %s", ret) 307 return ret, {} 308 309 # We want to remove tests/* and tests/ztest/test/* but save tests/ztest 310 cmd = ["--extract", coveragefile, 311 os.path.join(self.base_dir, "tests", "ztest", "*"), 312 "--output-file", ztestfile] 313 ret = self.run_lcov(cmd, coveragelog) 314 if ret: 315 logger.error("LCOV extract report stage failed with %s", ret) 316 return ret, {} 317 318 files = [] 319 if os.path.exists(ztestfile) and os.path.getsize(ztestfile) > 0: 320 cmd = ["--remove", ztestfile, 321 os.path.join(self.base_dir, "tests/ztest/test/*"), 322 "--output-file", ztestfile] 323 ret = self.run_lcov(cmd, coveragelog) 324 if ret: 325 logger.error("LCOV remove ztest report stage failed with %s", ret) 326 return ret, {} 327 328 files = [coveragefile, ztestfile] 329 else: 330 files = [coveragefile] 331 332 for i in self.ignores: 333 cmd = ["--remove", coveragefile, i, "--output-file", coveragefile] 334 ret = self.run_lcov(cmd, coveragelog) 335 if ret: 336 logger.error("LCOV remove ignores report stage failed with %s", ret) 337 return ret, {} 338 339 if 'html' not in self.output_formats.split(','): 340 return 0, {} 341 342 cmd = ["genhtml", "--legend", "--branch-coverage", 343 "--prefix", self.base_dir, 344 "-output-directory", os.path.join(outdir, "coverage")] 345 if self.coverage_per_instance: 346 cmd.append("--show-details") 347 cmd += files 348 ret = self.run_command(cmd, coveragelog) 349 if ret: 350 logger.error("LCOV genhtml report stage failed with %s", ret) 351 352 # TODO: Add LCOV summary coverage report. 353 return ret, { 'report': coveragefile, 'ztest': ztestfile, 'summary': None } 354 355 356class Gcovr(CoverageTool): 357 358 def __init__(self): 359 super().__init__() 360 self.ignores = [] 361 self.ignore_branch_patterns = [] 362 self.output_formats = "html" 363 self.version = self.get_version() 364 # Different ifdef-ed implementations of the same function should not be 365 # in conflict treated by GCOVR as separate objects for coverage statistics. 366 self.options = ["-v", "--merge-mode-functions=separate"] 367 368 369 def get_version(self): 370 try: 371 result = subprocess.run( 372 ['gcovr', '--version'], 373 capture_output=True, 374 text=True, 375 check=True 376 ) 377 version_lines = result.stdout.strip().split('\n') 378 if version_lines: 379 version_output = version_lines[0].replace('gcovr ', '') 380 return version_output 381 except subprocess.CalledProcessError as e: 382 logger.error(f"Unable to determine gcovr version: {e}") 383 sys.exit(1) 384 except FileNotFoundError as e: 385 logger.error(f"Unable to find gcovr tool: {e}") 386 sys.exit(1) 387 388 def add_ignore_file(self, pattern): 389 self.ignores.append('.*' + pattern + '.*') 390 391 def add_ignore_directory(self, pattern): 392 self.ignores.append(".*/" + pattern + '/.*') 393 394 def add_ignore_branch_pattern(self, pattern): 395 self.ignore_branch_patterns.append(pattern) 396 397 @staticmethod 398 def _interleave_list(prefix, list): 399 tuple_list = [(prefix, item) for item in list] 400 return [item for sublist in tuple_list for item in sublist] 401 402 @staticmethod 403 def _flatten_list(list): 404 return [a for b in list for a in b] 405 406 def collect_coverage(self, outdir, coverage_file, ztest_file, coveragelog): 407 excludes = Gcovr._interleave_list("-e", self.ignores) 408 if len(self.ignore_branch_patterns) > 0: 409 # Last pattern overrides previous values, so merge all patterns together 410 merged_regex = "|".join([f"({p})" for p in self.ignore_branch_patterns]) 411 excludes += ["--exclude-branches-by-pattern", merged_regex] 412 413 # We want to remove tests/* and tests/ztest/test/* but save tests/ztest 414 cmd = ["gcovr", "-r", self.base_dir, 415 "--gcov-ignore-parse-errors=negative_hits.warn_once_per_file", 416 "--gcov-executable", self.gcov_tool, 417 "-e", "tests/*"] 418 cmd += excludes + self.options + ["--json", "-o", coverage_file, outdir] 419 cmd_str = " ".join(cmd) 420 logger.debug(f"Running: {cmd_str}") 421 coveragelog.write(f"Running: {cmd_str}\n") 422 coveragelog.flush() 423 ret = subprocess.call(cmd, stdout=coveragelog, stderr=coveragelog) 424 if ret: 425 logger.error(f"GCOVR failed with {ret}") 426 return ret, [] 427 428 cmd = ["gcovr", "-r", self.base_dir] + self.options 429 cmd += ["--gcov-executable", self.gcov_tool, 430 "-f", "tests/ztest", "-e", "tests/ztest/test/*", 431 "--json", "-o", ztest_file, outdir] 432 cmd_str = " ".join(cmd) 433 logger.debug(f"Running: {cmd_str}") 434 coveragelog.write(f"Running: {cmd_str}\n") 435 coveragelog.flush() 436 ret = subprocess.call(cmd, stdout=coveragelog, stderr=coveragelog) 437 if ret: 438 logger.error(f"GCOVR ztest stage failed with {ret}") 439 return ret, [] 440 441 return ret, [file_ for file_ in [coverage_file, ztest_file] 442 if os.path.exists(file_) and os.path.getsize(file_) > 0] 443 444 445 def _generate(self, outdir, coveragelog, build_dirs=None): 446 coverage_file = os.path.join(outdir, "coverage.json") 447 coverage_summary = os.path.join(outdir, "coverage_summary.json") 448 ztest_file = os.path.join(outdir, "ztest.json") 449 450 ret = 0 451 cmd_ = [] 452 files = [] 453 if build_dirs: 454 for dir_ in build_dirs: 455 files_ = [fname for fname in 456 [os.path.join(dir_, "coverage.json"), 457 os.path.join(dir_, "ztest.json")] 458 if os.path.exists(fname)] 459 if not files_: 460 logger.debug(f"Coverage merge no files in: {dir_}") 461 continue 462 files += files_ 463 logger.debug(f"Coverage merge {len(files)} reports in {outdir}") 464 ztest_file = None 465 cmd_ = ["--json-pretty", "--json", coverage_file] 466 else: 467 ret, files = self.collect_coverage(outdir, coverage_file, ztest_file, coveragelog) 468 logger.debug(f"Coverage collected {len(files)} reports from: {outdir}") 469 470 if not files: 471 logger.warning(f"No coverage files to compose report for {outdir}") 472 return ret, {} 473 474 subdir = os.path.join(outdir, "coverage") 475 os.makedirs(subdir, exist_ok=True) 476 477 tracefiles = self._interleave_list("--add-tracefile", files) 478 479 # Convert command line argument (comma-separated list) to gcovr flags 480 report_options = { 481 "html": ["--html", os.path.join(subdir, "index.html"), "--html-details"], 482 "xml": ["--xml", os.path.join(subdir, "coverage.xml"), "--xml-pretty"], 483 "csv": ["--csv", os.path.join(subdir, "coverage.csv")], 484 "txt": ["--txt", os.path.join(subdir, "coverage.txt")], 485 "coveralls": ["--coveralls", os.path.join(subdir, "coverage.coveralls.json"), 486 "--coveralls-pretty"], 487 "sonarqube": ["--sonarqube", os.path.join(subdir, "coverage.sonarqube.xml")] 488 } 489 gcovr_options = self._flatten_list( 490 [report_options[r] for r in self.output_formats.split(',')] 491 ) 492 493 cmd = ["gcovr", "-r", self.base_dir] + self.options + gcovr_options + tracefiles 494 cmd += cmd_ 495 cmd += ["--json-summary-pretty", "--json-summary", coverage_summary] 496 cmd_str = " ".join(cmd) 497 logger.debug(f"Running: {cmd_str}") 498 coveragelog.write(f"Running: {cmd_str}\n") 499 coveragelog.flush() 500 ret = subprocess.call(cmd, stdout=coveragelog, stderr=coveragelog) 501 if ret: 502 logger.error(f"GCOVR merge report stage failed with {ret}") 503 504 return ret, { 'report': coverage_file, 'ztest': ztest_file, 'summary': coverage_summary } 505 506 507def choose_gcov_tool(options, is_system_gcov): 508 gcov_tool = None 509 if not options.gcov_tool: 510 zephyr_sdk_gcov_tool = os.path.join( 511 os.environ.get("ZEPHYR_SDK_INSTALL_DIR", default=""), 512 "x86_64-zephyr-elf/bin/x86_64-zephyr-elf-gcov") 513 if os.environ.get("ZEPHYR_TOOLCHAIN_VARIANT") == "llvm": 514 llvm_path = os.environ.get("LLVM_TOOLCHAIN_PATH") 515 if llvm_path is not None: 516 llvm_path = os.path.join(llvm_path, "bin") 517 llvm_cov = shutil.which("llvm-cov", path=llvm_path) 518 llvm_cov_ext = pathlib.Path(llvm_cov).suffix 519 gcov_lnk = os.path.join(options.outdir, f"gcov{llvm_cov_ext}") 520 try: 521 os.symlink(llvm_cov, gcov_lnk) 522 except OSError: 523 shutil.copy(llvm_cov, gcov_lnk) 524 gcov_tool = gcov_lnk 525 elif is_system_gcov: 526 gcov_tool = "gcov" 527 elif os.path.exists(zephyr_sdk_gcov_tool): 528 gcov_tool = zephyr_sdk_gcov_tool 529 else: 530 logger.error( 531 "Can't find a suitable gcov tool. Use --gcov-tool or set ZEPHYR_SDK_INSTALL_DIR." 532 ) 533 sys.exit(1) 534 else: 535 gcov_tool = str(options.gcov_tool) 536 537 return gcov_tool 538 539 540def run_coverage_tool(options, outdir, is_system_gcov, instances, 541 coverage_capture, coverage_report): 542 coverage_tool = CoverageTool.factory(options.coverage_tool, jobs=options.jobs) 543 if not coverage_tool: 544 return False, {} 545 546 coverage_tool.gcov_tool = str(choose_gcov_tool(options, is_system_gcov)) 547 logger.debug(f"Using gcov tool: {coverage_tool.gcov_tool}") 548 549 coverage_tool.instances = instances 550 coverage_tool.coverage_per_instance = options.coverage_per_instance 551 coverage_tool.coverage_capture = coverage_capture 552 coverage_tool.coverage_report = coverage_report 553 coverage_tool.base_dir = os.path.abspath(options.coverage_basedir) 554 # Apply output format default 555 if options.coverage_formats is not None: 556 coverage_tool.output_formats = options.coverage_formats 557 coverage_tool.add_ignore_file('generated') 558 coverage_tool.add_ignore_directory('tests') 559 coverage_tool.add_ignore_directory('samples') 560 # Ignore branch coverage on LOG_* and LOG_HEXDUMP_* macros 561 # Branch misses are due to the implementation of Z_LOG2 and cannot be avoided 562 coverage_tool.add_ignore_branch_pattern(r"^\s*LOG_(?:HEXDUMP_)?(?:DBG|INF|WRN|ERR)\(.*") 563 # Ignore branch coverage on __ASSERT* macros 564 # Covering the failing case is not desirable as it will immediately terminate the test. 565 coverage_tool.add_ignore_branch_pattern(r"^\s*__ASSERT(?:_EVAL|_NO_MSG|_POST_ACTION)?\(.*") 566 return coverage_tool.generate(outdir) 567 568 569def has_system_gcov(platform): 570 return platform and (platform.type in {"native", "unit"}) 571 572 573def run_coverage(options, testplan): 574 """ Summary code coverage over the full test plan's scope. 575 """ 576 is_system_gcov = False 577 578 for plat in options.coverage_platform: 579 if has_system_gcov(testplan.get_platform(plat)): 580 is_system_gcov = True 581 break 582 583 return run_coverage_tool(options, options.outdir, is_system_gcov, 584 instances=testplan.instances, 585 coverage_capture=False, 586 coverage_report=True) 587 588 589def run_coverage_instance(options, instance): 590 """ Per-instance code coverage called by ProjectBuilder ('coverage' operation). 591 """ 592 is_system_gcov = has_system_gcov(instance.platform) 593 return run_coverage_tool(options, instance.build_dir, is_system_gcov, 594 instances={instance.name: instance}, 595 coverage_capture=True, 596 coverage_report=options.coverage_per_instance) 597