1# vim: set syntax=python ts=4 : 2# 3# Copyright (c) 2018-2022 Intel Corporation 4# SPDX-License-Identifier: Apache-2.0 5 6import contextlib 7import glob 8import logging 9import os 10import pathlib 11import re 12import shutil 13import subprocess 14import sys 15import tempfile 16 17logger = logging.getLogger('twister') 18logger.setLevel(logging.DEBUG) 19 20supported_coverage_formats = { 21 "gcovr": ["html", "xml", "csv", "txt", "coveralls", "sonarqube"], 22 "lcov": ["html", "lcov"] 23} 24 25 26class CoverageTool: 27 """ Base class for every supported coverage tool 28 """ 29 30 def __init__(self): 31 self.gcov_tool = None 32 self.base_dir = None 33 self.output_formats = None 34 35 @staticmethod 36 def factory(tool, jobs=None): 37 if tool == 'lcov': 38 t = Lcov(jobs) 39 elif tool == 'gcovr': 40 t = Gcovr() 41 else: 42 logger.error(f"Unsupported coverage tool specified: {tool}") 43 return None 44 45 logger.debug(f"Select {tool} as the coverage tool...") 46 return t 47 48 @staticmethod 49 def retrieve_gcov_data(input_file): 50 logger.debug(f"Working on {input_file}") 51 extracted_coverage_info = {} 52 capture_data = False 53 capture_complete = False 54 with open(input_file) as fp: 55 for line in fp.readlines(): 56 if re.search("GCOV_COVERAGE_DUMP_START", line): 57 capture_data = True 58 capture_complete = False 59 continue 60 if re.search("GCOV_COVERAGE_DUMP_END", line): 61 capture_complete = True 62 # Keep searching for additional dumps 63 # Loop until the coverage data is found. 64 if not capture_data: 65 continue 66 if line.startswith("*"): 67 sp = line.split("<") 68 if len(sp) > 1: 69 # Remove the leading delimiter "*" 70 file_name = sp[0][1:] 71 # Remove the trailing new line char 72 hex_dump = sp[1][:-1] 73 else: 74 continue 75 else: 76 continue 77 if file_name in extracted_coverage_info: 78 extracted_coverage_info[file_name].append(hex_dump) 79 else: 80 extracted_coverage_info[file_name] = [hex_dump] 81 if not capture_data: 82 capture_complete = True 83 return {'complete': capture_complete, 'data': extracted_coverage_info} 84 85 def merge_hexdumps(self, hexdumps): 86 # Only one hexdump 87 if len(hexdumps) == 1: 88 return hexdumps[0] 89 90 with tempfile.TemporaryDirectory() as dir: 91 # Write each hexdump to a dedicated temporary folder 92 dirs = [] 93 for idx, dump in enumerate(hexdumps): 94 subdir = dir + f'/{idx}' 95 os.mkdir(subdir) 96 dirs.append(subdir) 97 with open(f'{subdir}/tmp.gcda', 'wb') as fp: 98 fp.write(bytes.fromhex(dump)) 99 100 # Iteratively call gcov-tool (not gcov) to merge the files 101 merge_tool = self.gcov_tool + '-tool' 102 for d1, d2 in zip(dirs[:-1], dirs[1:], strict=False): 103 cmd = [merge_tool, 'merge', d1, d2, '--output', d2] 104 subprocess.call(cmd) 105 106 # Read back the final output file 107 with open(f'{dirs[-1]}/tmp.gcda', 'rb') as fp: 108 return fp.read(-1).hex() 109 110 def create_gcda_files(self, extracted_coverage_info): 111 gcda_created = True 112 logger.debug("Generating gcda files") 113 for filename, hexdumps in extracted_coverage_info.items(): 114 # if kobject_hash is given for coverage gcovr fails 115 # hence skipping it problem only in gcovr v4.1 116 if "kobject_hash" in filename: 117 filename = (filename[:-4]) + "gcno" 118 with contextlib.suppress(Exception): 119 os.remove(filename) 120 continue 121 122 try: 123 hexdump_val = self.merge_hexdumps(hexdumps) 124 hex_bytes = bytes.fromhex(hexdump_val) 125 with open(filename, 'wb') as fp: 126 fp.write(hex_bytes) 127 except ValueError: 128 logger.exception(f"Unable to convert hex data for file: {filename}") 129 gcda_created = False 130 except FileNotFoundError: 131 logger.exception(f"Unable to create gcda file: {filename}") 132 gcda_created = False 133 return gcda_created 134 135 def generate(self, outdir): 136 coverage_completed = True 137 for filename in glob.glob(f"{outdir}/**/handler.log", recursive=True): 138 gcov_data = self.__class__.retrieve_gcov_data(filename) 139 capture_complete = gcov_data['complete'] 140 extracted_coverage_info = gcov_data['data'] 141 if capture_complete: 142 gcda_created = self.create_gcda_files(extracted_coverage_info) 143 if gcda_created: 144 logger.debug(f"Gcov data captured: {filename}") 145 else: 146 logger.error(f"Gcov data invalid for: {filename}") 147 coverage_completed = False 148 else: 149 logger.error(f"Gcov data capture incomplete: {filename}") 150 coverage_completed = False 151 152 with open(os.path.join(outdir, "coverage.log"), "a") as coveragelog: 153 ret = self._generate(outdir, coveragelog) 154 if ret == 0: 155 report_log = { 156 "html": "HTML report generated: {}".format( 157 os.path.join(outdir, "coverage", "index.html") 158 ), 159 "lcov": "LCOV report generated: {}".format( 160 os.path.join(outdir, "coverage.info") 161 ), 162 "xml": "XML report generated: {}".format( 163 os.path.join(outdir, "coverage", "coverage.xml") 164 ), 165 "csv": "CSV report generated: {}".format( 166 os.path.join(outdir, "coverage", "coverage.csv") 167 ), 168 "txt": "TXT report generated: {}".format( 169 os.path.join(outdir, "coverage", "coverage.txt") 170 ), 171 "coveralls": "Coveralls report generated: {}".format( 172 os.path.join(outdir, "coverage", "coverage.coveralls.json") 173 ), 174 "sonarqube": "Sonarqube report generated: {}".format( 175 os.path.join(outdir, "coverage", "coverage.sonarqube.xml") 176 ) 177 } 178 for r in self.output_formats.split(','): 179 logger.info(report_log[r]) 180 else: 181 coverage_completed = False 182 logger.debug(f"All coverage data processed: {coverage_completed}") 183 return coverage_completed 184 185 186class Lcov(CoverageTool): 187 188 def __init__(self, jobs=None): 189 super().__init__() 190 self.ignores = [] 191 self.ignore_branch_patterns = [] 192 self.output_formats = "lcov,html" 193 self.version = self.get_version() 194 self.jobs = jobs 195 196 def get_version(self): 197 try: 198 result = subprocess.run( 199 ['lcov', '--version'], 200 capture_output=True, 201 text=True, 202 check=True 203 ) 204 version_output = result.stdout.strip().replace('lcov: LCOV version ', '') 205 return version_output 206 except subprocess.CalledProcessError as e: 207 logger.error(f"Unable to determine lcov version: {e}") 208 sys.exit(1) 209 except FileNotFoundError as e: 210 logger.error(f"Unable to find lcov tool: {e}") 211 sys.exit(1) 212 213 def add_ignore_file(self, pattern): 214 self.ignores.append('*' + pattern + '*') 215 216 def add_ignore_directory(self, pattern): 217 self.ignores.append('*/' + pattern + '/*') 218 219 def add_ignore_branch_pattern(self, pattern): 220 self.ignore_branch_patterns.append(pattern) 221 222 @property 223 def is_lcov_v2(self): 224 return self.version.startswith("2") 225 226 def run_command(self, cmd, coveragelog): 227 if self.is_lcov_v2: 228 # The --ignore-errors source option is added for genhtml as well as 229 # lcov to avoid it exiting due to 230 # samples/application_development/external_lib/ 231 cmd += [ 232 "--ignore-errors", "inconsistent,inconsistent", 233 "--ignore-errors", "negative,negative", 234 "--ignore-errors", "unused,unused", 235 "--ignore-errors", "empty,empty", 236 "--ignore-errors", "mismatch,mismatch", 237 ] 238 239 cmd_str = " ".join(cmd) 240 logger.debug(f"Running {cmd_str}...") 241 return subprocess.call(cmd, stdout=coveragelog) 242 243 def run_lcov(self, args, coveragelog): 244 if self.is_lcov_v2: 245 branch_coverage = "branch_coverage=1" 246 if self.jobs is None: 247 # Default: --parallel=0 will autodetect appropriate parallelism 248 parallel = ["--parallel", "0"] 249 elif self.jobs == 1: 250 # Serial execution requested, don't parallelize at all 251 parallel = [] 252 else: 253 parallel = ["--parallel", str(self.jobs)] 254 else: 255 branch_coverage = "lcov_branch_coverage=1" 256 parallel = [] 257 258 cmd = [ 259 "lcov", "--gcov-tool", self.gcov_tool, 260 "--rc", branch_coverage, 261 ] + parallel + args 262 return self.run_command(cmd, coveragelog) 263 264 def _generate(self, outdir, coveragelog): 265 coveragefile = os.path.join(outdir, "coverage.info") 266 ztestfile = os.path.join(outdir, "ztest.info") 267 268 cmd = ["--capture", "--directory", outdir, "--output-file", coveragefile] 269 self.run_lcov(cmd, coveragelog) 270 271 # We want to remove tests/* and tests/ztest/test/* but save tests/ztest 272 cmd = ["--extract", coveragefile, 273 os.path.join(self.base_dir, "tests", "ztest", "*"), 274 "--output-file", ztestfile] 275 self.run_lcov(cmd, coveragelog) 276 277 if os.path.exists(ztestfile) and os.path.getsize(ztestfile) > 0: 278 cmd = ["--remove", ztestfile, 279 os.path.join(self.base_dir, "tests/ztest/test/*"), 280 "--output-file", ztestfile] 281 self.run_lcov(cmd, coveragelog) 282 283 files = [coveragefile, ztestfile] 284 else: 285 files = [coveragefile] 286 287 for i in self.ignores: 288 cmd = ["--remove", coveragefile, i, "--output-file", coveragefile] 289 self.run_lcov(cmd, coveragelog) 290 291 if 'html' not in self.output_formats.split(','): 292 return 0 293 294 cmd = ["genhtml", "--legend", "--branch-coverage", 295 "--prefix", self.base_dir, 296 "-output-directory", os.path.join(outdir, "coverage")] + files 297 return self.run_command(cmd, coveragelog) 298 299 300class Gcovr(CoverageTool): 301 302 def __init__(self): 303 super().__init__() 304 self.ignores = [] 305 self.ignore_branch_patterns = [] 306 self.output_formats = "html" 307 self.version = self.get_version() 308 309 def get_version(self): 310 try: 311 result = subprocess.run( 312 ['gcovr', '--version'], 313 capture_output=True, 314 text=True, 315 check=True 316 ) 317 version_lines = result.stdout.strip().split('\n') 318 if version_lines: 319 version_output = version_lines[0].replace('gcovr ', '') 320 return version_output 321 except subprocess.CalledProcessError as e: 322 logger.error(f"Unable to determine gcovr version: {e}") 323 sys.exit(1) 324 except FileNotFoundError as e: 325 logger.error(f"Unable to find gcovr tool: {e}") 326 sys.exit(1) 327 328 def add_ignore_file(self, pattern): 329 self.ignores.append('.*' + pattern + '.*') 330 331 def add_ignore_directory(self, pattern): 332 self.ignores.append(".*/" + pattern + '/.*') 333 334 def add_ignore_branch_pattern(self, pattern): 335 self.ignore_branch_patterns.append(pattern) 336 337 @staticmethod 338 def _interleave_list(prefix, list): 339 tuple_list = [(prefix, item) for item in list] 340 return [item for sublist in tuple_list for item in sublist] 341 342 @staticmethod 343 def _flatten_list(list): 344 return [a for b in list for a in b] 345 346 def _generate(self, outdir, coveragelog): 347 coveragefile = os.path.join(outdir, "coverage.json") 348 ztestfile = os.path.join(outdir, "ztest.json") 349 350 excludes = Gcovr._interleave_list("-e", self.ignores) 351 if len(self.ignore_branch_patterns) > 0: 352 # Last pattern overrides previous values, so merge all patterns together 353 merged_regex = "|".join([f"({p})" for p in self.ignore_branch_patterns]) 354 excludes += ["--exclude-branches-by-pattern", merged_regex] 355 356 # Different ifdef-ed implementations of the same function should not be 357 # in conflict treated by GCOVR as separate objects for coverage statistics. 358 mode_options = ["--merge-mode-functions=separate"] 359 360 # We want to remove tests/* and tests/ztest/test/* but save tests/ztest 361 cmd = ["gcovr", "-r", self.base_dir, 362 "--gcov-ignore-parse-errors=negative_hits.warn_once_per_file", 363 "--gcov-executable", self.gcov_tool, 364 "-e", "tests/*"] 365 cmd += excludes + mode_options + ["--json", "-o", coveragefile, outdir] 366 cmd_str = " ".join(cmd) 367 logger.debug(f"Running {cmd_str}...") 368 subprocess.call(cmd, stdout=coveragelog) 369 370 subprocess.call(["gcovr", "-r", self.base_dir, "--gcov-executable", 371 self.gcov_tool, "-f", "tests/ztest", "-e", 372 "tests/ztest/test/*", "--json", "-o", ztestfile, 373 outdir] + mode_options, stdout=coveragelog) 374 375 if os.path.exists(ztestfile) and os.path.getsize(ztestfile) > 0: 376 files = [coveragefile, ztestfile] 377 else: 378 files = [coveragefile] 379 380 subdir = os.path.join(outdir, "coverage") 381 os.makedirs(subdir, exist_ok=True) 382 383 tracefiles = self._interleave_list("--add-tracefile", files) 384 385 # Convert command line argument (comma-separated list) to gcovr flags 386 report_options = { 387 "html": ["--html", os.path.join(subdir, "index.html"), "--html-details"], 388 "xml": ["--xml", os.path.join(subdir, "coverage.xml"), "--xml-pretty"], 389 "csv": ["--csv", os.path.join(subdir, "coverage.csv")], 390 "txt": ["--txt", os.path.join(subdir, "coverage.txt")], 391 "coveralls": ["--coveralls", os.path.join(subdir, "coverage.coveralls.json"), 392 "--coveralls-pretty"], 393 "sonarqube": ["--sonarqube", os.path.join(subdir, "coverage.sonarqube.xml")] 394 } 395 gcovr_options = self._flatten_list( 396 [report_options[r] for r in self.output_formats.split(',')] 397 ) 398 399 return subprocess.call( 400 ["gcovr", "-r", self.base_dir] \ 401 + mode_options + gcovr_options + tracefiles, stdout=coveragelog 402 ) 403 404 405 406def run_coverage(testplan, options): 407 use_system_gcov = False 408 gcov_tool = None 409 410 for plat in options.coverage_platform: 411 _plat = testplan.get_platform(plat) 412 if _plat and (_plat.type in {"native", "unit"}): 413 use_system_gcov = True 414 if not options.gcov_tool: 415 zephyr_sdk_gcov_tool = os.path.join( 416 os.environ.get("ZEPHYR_SDK_INSTALL_DIR", default=""), 417 "x86_64-zephyr-elf/bin/x86_64-zephyr-elf-gcov") 418 if os.environ.get("ZEPHYR_TOOLCHAIN_VARIANT") == "llvm": 419 llvm_path = os.environ.get("LLVM_TOOLCHAIN_PATH") 420 if llvm_path is not None: 421 llvm_path = os.path.join(llvm_path, "bin") 422 llvm_cov = shutil.which("llvm-cov", path=llvm_path) 423 llvm_cov_ext = pathlib.Path(llvm_cov).suffix 424 gcov_lnk = os.path.join(options.outdir, f"gcov{llvm_cov_ext}") 425 try: 426 os.symlink(llvm_cov, gcov_lnk) 427 except OSError: 428 shutil.copy(llvm_cov, gcov_lnk) 429 gcov_tool = gcov_lnk 430 elif use_system_gcov: 431 gcov_tool = "gcov" 432 elif os.path.exists(zephyr_sdk_gcov_tool): 433 gcov_tool = zephyr_sdk_gcov_tool 434 else: 435 logger.error( 436 "Can't find a suitable gcov tool. Use --gcov-tool or set ZEPHYR_SDK_INSTALL_DIR." 437 ) 438 sys.exit(1) 439 else: 440 gcov_tool = str(options.gcov_tool) 441 442 logger.info("Generating coverage files...") 443 logger.info(f"Using gcov tool: {gcov_tool}") 444 coverage_tool = CoverageTool.factory(options.coverage_tool, jobs=options.jobs) 445 coverage_tool.gcov_tool = gcov_tool 446 coverage_tool.base_dir = os.path.abspath(options.coverage_basedir) 447 # Apply output format default 448 if options.coverage_formats is not None: 449 coverage_tool.output_formats = options.coverage_formats 450 coverage_tool.add_ignore_file('generated') 451 coverage_tool.add_ignore_directory('tests') 452 coverage_tool.add_ignore_directory('samples') 453 # Ignore branch coverage on LOG_* and LOG_HEXDUMP_* macros 454 # Branch misses are due to the implementation of Z_LOG2 and cannot be avoided 455 coverage_tool.add_ignore_branch_pattern(r"^\s*LOG_(?:HEXDUMP_)?(?:DBG|INF|WRN|ERR)\(.*") 456 # Ignore branch coverage on __ASSERT* macros 457 # Covering the failing case is not desirable as it will immediately terminate the test. 458 coverage_tool.add_ignore_branch_pattern(r"^\s*__ASSERT(?:_EVAL|_NO_MSG|_POST_ACTION)?\(.*") 459 coverage_completed = coverage_tool.generate(options.outdir) 460 return coverage_completed 461