1# vim: set syntax=python ts=4 :
2#
3# Copyright (c) 2018-2025 Intel Corporation
4# SPDX-License-Identifier: Apache-2.0
5
6import contextlib
7import glob
8import logging
9import os
10import pathlib
11import re
12import shutil
13import subprocess
14import sys
15import tempfile
16
17logger = logging.getLogger('twister')
18logger.setLevel(logging.DEBUG)
19
20supported_coverage_formats = {
21    "gcovr": ["html", "xml", "csv", "txt", "coveralls", "sonarqube"],
22    "lcov":  ["html", "lcov"]
23}
24
25
26class CoverageTool:
27    """ Base class for every supported coverage tool
28    """
29
30    def __init__(self):
31        self.gcov_tool = None
32        self.base_dir = None
33        self.output_formats = None
34        self.coverage_capture = True
35        self.coverage_report = True
36        self.coverage_per_instance = False
37        self.instances = {}
38
39    @staticmethod
40    def factory(tool, jobs=None):
41        if tool == 'lcov':
42            t =  Lcov(jobs)
43        elif tool == 'gcovr':
44            t =  Gcovr()
45        else:
46            logger.error(f"Unsupported coverage tool specified: {tool}")
47            return None
48
49        logger.debug(f"Select {tool} as the coverage tool...")
50        return t
51
52    @staticmethod
53    def retrieve_gcov_data(input_file):
54        logger.debug(f"Working on {input_file}")
55        extracted_coverage_info = {}
56        capture_data = False
57        capture_complete = False
58        with open(input_file) as fp:
59            for line in fp.readlines():
60                if re.search("GCOV_COVERAGE_DUMP_START", line):
61                    capture_data = True
62                    capture_complete = False
63                    continue
64                if re.search("GCOV_COVERAGE_DUMP_END", line):
65                    capture_complete = True
66                    # Keep searching for additional dumps
67                # Loop until the coverage data is found.
68                if not capture_data:
69                    continue
70                if line.startswith("*"):
71                    sp = line.split("<")
72                    if len(sp) > 1:
73                        # Remove the leading delimiter "*"
74                        file_name = sp[0][1:]
75                        # Remove the trailing new line char
76                        hex_dump = sp[1][:-1]
77                    else:
78                        continue
79                else:
80                    continue
81                if file_name in extracted_coverage_info:
82                    extracted_coverage_info[file_name].append(hex_dump)
83                else:
84                    extracted_coverage_info[file_name] = [hex_dump]
85        if not capture_data:
86            capture_complete = True
87        return {'complete': capture_complete, 'data': extracted_coverage_info}
88
89    def merge_hexdumps(self, hexdumps):
90        # Only one hexdump
91        if len(hexdumps) == 1:
92            return hexdumps[0]
93
94        with tempfile.TemporaryDirectory() as dir:
95            # Write each hexdump to a dedicated temporary folder
96            dirs = []
97            for idx, dump in enumerate(hexdumps):
98                subdir = dir + f'/{idx}'
99                os.mkdir(subdir)
100                dirs.append(subdir)
101                with open(f'{subdir}/tmp.gcda', 'wb') as fp:
102                    fp.write(bytes.fromhex(dump))
103
104            # Iteratively call gcov-tool (not gcov) to merge the files
105            merge_tool = self.gcov_tool + '-tool'
106            for d1, d2 in zip(dirs[:-1], dirs[1:], strict=False):
107                cmd = [merge_tool, 'merge', d1, d2, '--output', d2]
108                subprocess.call(cmd)
109
110            # Read back the final output file
111            with open(f'{dirs[-1]}/tmp.gcda', 'rb') as fp:
112                return fp.read(-1).hex()
113
114    def create_gcda_files(self, extracted_coverage_info):
115        gcda_created = True
116        logger.debug(f"Generating {len(extracted_coverage_info)} gcda files")
117        for filename, hexdumps in extracted_coverage_info.items():
118            # if kobject_hash is given for coverage gcovr fails
119            # hence skipping it problem only in gcovr v4.1
120            if "kobject_hash" in filename:
121                filename = (filename[:-4]) + "gcno"
122                with contextlib.suppress(Exception):
123                    os.remove(filename)
124                continue
125
126            try:
127                hexdump_val = self.merge_hexdumps(hexdumps)
128                hex_bytes = bytes.fromhex(hexdump_val)
129                with open(filename, 'wb') as fp:
130                    fp.write(hex_bytes)
131            except ValueError:
132                logger.exception(f"Unable to convert hex data for file: {filename}")
133                gcda_created = False
134            except FileNotFoundError:
135                logger.exception(f"Unable to create gcda file: {filename}")
136                gcda_created = False
137        return gcda_created
138
139    def capture_data(self, outdir):
140        coverage_completed = True
141        for filename in glob.glob(f"{outdir}/**/handler.log", recursive=True):
142            gcov_data = self.__class__.retrieve_gcov_data(filename)
143            capture_complete = gcov_data['complete']
144            extracted_coverage_info = gcov_data['data']
145            if capture_complete:
146                gcda_created = self.create_gcda_files(extracted_coverage_info)
147                if gcda_created:
148                    logger.debug(f"Gcov data captured: {filename}")
149                else:
150                    logger.error(f"Gcov data invalid for: {filename}")
151                    coverage_completed = False
152            else:
153                logger.error(f"Gcov data capture incomplete: {filename}")
154                coverage_completed = False
155        return coverage_completed
156
157    def generate(self, outdir):
158        coverage_completed = self.capture_data(outdir) if self.coverage_capture else True
159        if not coverage_completed or not self.coverage_report:
160            return coverage_completed, {}
161        build_dirs = None
162        if not self.coverage_capture and self.coverage_report and self.coverage_per_instance:
163            build_dirs = [instance.build_dir for instance in self.instances.values()]
164        reports = {}
165        with open(os.path.join(outdir, "coverage.log"), "a") as coveragelog:
166            ret, reports = self._generate(outdir, coveragelog, build_dirs)
167            if ret == 0:
168                report_log = {
169                    "html": "HTML report generated: {}".format(
170                        os.path.join(outdir, "coverage", "index.html")
171                    ),
172                    "lcov": "LCOV report generated: {}".format(
173                        os.path.join(outdir, "coverage.info")
174                    ),
175                    "xml": "XML report generated: {}".format(
176                        os.path.join(outdir, "coverage", "coverage.xml")
177                    ),
178                    "csv": "CSV report generated: {}".format(
179                        os.path.join(outdir, "coverage", "coverage.csv")
180                    ),
181                    "txt": "TXT report generated: {}".format(
182                        os.path.join(outdir, "coverage", "coverage.txt")
183                    ),
184                    "coveralls": "Coveralls report generated: {}".format(
185                        os.path.join(outdir, "coverage", "coverage.coveralls.json")
186                    ),
187                    "sonarqube": "Sonarqube report generated: {}".format(
188                        os.path.join(outdir, "coverage", "coverage.sonarqube.xml")
189                    )
190                }
191                for r in self.output_formats.split(','):
192                    logger.info(report_log[r])
193            else:
194                coverage_completed = False
195        logger.debug(f"All coverage data processed: {coverage_completed}")
196        return coverage_completed, reports
197
198
199class Lcov(CoverageTool):
200
201    def __init__(self, jobs=None):
202        super().__init__()
203        self.ignores = []
204        self.ignore_branch_patterns = []
205        self.output_formats = "lcov,html"
206        self.version = self.get_version()
207        self.jobs = jobs
208
209    def get_version(self):
210        try:
211            result = subprocess.run(
212                ['lcov', '--version'],
213                capture_output=True,
214                text=True,
215                check=True
216            )
217            version_output = result.stdout.strip().replace('lcov: LCOV version ', '')
218            return version_output
219        except subprocess.CalledProcessError as e:
220            logger.error(f"Unable to determine lcov version: {e}")
221            sys.exit(1)
222        except FileNotFoundError as e:
223            logger.error(f"Unable to find lcov tool: {e}")
224            sys.exit(1)
225
226    def add_ignore_file(self, pattern):
227        self.ignores.append('*' + pattern + '*')
228
229    def add_ignore_directory(self, pattern):
230        self.ignores.append('*/' + pattern + '/*')
231
232    def add_ignore_branch_pattern(self, pattern):
233        self.ignore_branch_patterns.append(pattern)
234
235    @property
236    def is_lcov_v2(self):
237        return self.version.startswith("2")
238
239    def run_command(self, cmd, coveragelog):
240        if self.is_lcov_v2:
241            # The --ignore-errors source option is added for genhtml as well as
242            # lcov to avoid it exiting due to
243            # samples/application_development/external_lib/
244            cmd += [
245                "--ignore-errors", "inconsistent,inconsistent",
246                "--ignore-errors", "negative,negative",
247                "--ignore-errors", "unused,unused",
248                "--ignore-errors", "empty,empty",
249                "--ignore-errors", "mismatch,mismatch",
250            ]
251
252        cmd_str = " ".join(cmd)
253        logger.debug(f"Running {cmd_str}...")
254        return subprocess.call(cmd, stdout=coveragelog)
255
256    def run_lcov(self, args, coveragelog):
257        if self.is_lcov_v2:
258            branch_coverage = "branch_coverage=1"
259            if self.jobs is None:
260                # Default: --parallel=0 will autodetect appropriate parallelism
261                parallel = ["--parallel", "0"]
262            elif self.jobs == 1:
263                # Serial execution requested, don't parallelize at all
264                parallel = []
265            else:
266                parallel = ["--parallel", str(self.jobs)]
267        else:
268            branch_coverage = "lcov_branch_coverage=1"
269            parallel = []
270
271        cmd = [
272            "lcov", "--gcov-tool", self.gcov_tool,
273            "--rc", branch_coverage,
274        ] + parallel + args
275        return self.run_command(cmd, coveragelog)
276
277
278    def _generate(self, outdir, coveragelog, build_dirs=None):
279        coveragefile = os.path.join(outdir, "coverage.info")
280        ztestfile = os.path.join(outdir, "ztest.info")
281
282        if build_dirs:
283            files = []
284            for dir_ in build_dirs:
285                files_ = [fname for fname in
286                            [os.path.join(dir_, "coverage.info"),
287                             os.path.join(dir_, "ztest.info")]
288                          if os.path.exists(fname)]
289                if not files_:
290                    logger.debug("Coverage merge no files in: %s", dir_)
291                    continue
292                files += files_
293            logger.debug("Coverage merge %d reports in %s", len(files), outdir)
294            cmd = ["--output-file", coveragefile]
295            for filename in files:
296                cmd.append("--add-tracefile")
297                cmd.append(filename)
298        else:
299            cmd = ["--capture", "--directory", outdir, "--output-file", coveragefile]
300            if self.coverage_per_instance and len(self.instances) == 1:
301                invalid_chars = re.compile(r"[^A-Za-z0-9_]")
302                cmd.append("--test-name")
303                cmd.append(invalid_chars.sub("_", next(iter(self.instances))))
304        ret = self.run_lcov(cmd, coveragelog)
305        if ret:
306            logger.error("LCOV capture report stage failed with %s", ret)
307            return ret, {}
308
309        # We want to remove tests/* and tests/ztest/test/* but save tests/ztest
310        cmd = ["--extract", coveragefile,
311               os.path.join(self.base_dir, "tests", "ztest", "*"),
312               "--output-file", ztestfile]
313        ret = self.run_lcov(cmd, coveragelog)
314        if ret:
315            logger.error("LCOV extract report stage failed with %s", ret)
316            return ret, {}
317
318        files = []
319        if os.path.exists(ztestfile) and os.path.getsize(ztestfile) > 0:
320            cmd = ["--remove", ztestfile,
321                   os.path.join(self.base_dir, "tests/ztest/test/*"),
322                   "--output-file", ztestfile]
323            ret = self.run_lcov(cmd, coveragelog)
324            if ret:
325                logger.error("LCOV remove ztest report stage failed with %s", ret)
326                return ret, {}
327
328            files = [coveragefile, ztestfile]
329        else:
330            files = [coveragefile]
331
332        for i in self.ignores:
333            cmd = ["--remove", coveragefile, i, "--output-file", coveragefile]
334            ret = self.run_lcov(cmd, coveragelog)
335            if ret:
336                logger.error("LCOV remove ignores report stage failed with %s", ret)
337                return ret, {}
338
339        if 'html' not in self.output_formats.split(','):
340            return 0, {}
341
342        cmd = ["genhtml", "--legend", "--branch-coverage",
343               "--prefix", self.base_dir,
344               "-output-directory", os.path.join(outdir, "coverage")]
345        if self.coverage_per_instance:
346            cmd.append("--show-details")
347        cmd += files
348        ret = self.run_command(cmd, coveragelog)
349        if ret:
350            logger.error("LCOV genhtml report stage failed with %s", ret)
351
352        # TODO: Add LCOV summary coverage report.
353        return ret, { 'report': coveragefile, 'ztest': ztestfile, 'summary': None }
354
355
356class Gcovr(CoverageTool):
357
358    def __init__(self):
359        super().__init__()
360        self.ignores = []
361        self.ignore_branch_patterns = []
362        self.output_formats = "html"
363        self.version = self.get_version()
364        # Different ifdef-ed implementations of the same function should not be
365        # in conflict treated by GCOVR as separate objects for coverage statistics.
366        self.options = ["-v", "--merge-mode-functions=separate"]
367
368
369    def get_version(self):
370        try:
371            result = subprocess.run(
372                ['gcovr', '--version'],
373                capture_output=True,
374                text=True,
375                check=True
376            )
377            version_lines = result.stdout.strip().split('\n')
378            if version_lines:
379                version_output = version_lines[0].replace('gcovr ', '')
380                return version_output
381        except subprocess.CalledProcessError as e:
382            logger.error(f"Unable to determine gcovr version: {e}")
383            sys.exit(1)
384        except FileNotFoundError as e:
385            logger.error(f"Unable to find gcovr tool: {e}")
386            sys.exit(1)
387
388    def add_ignore_file(self, pattern):
389        self.ignores.append('.*' + pattern + '.*')
390
391    def add_ignore_directory(self, pattern):
392        self.ignores.append(".*/" + pattern + '/.*')
393
394    def add_ignore_branch_pattern(self, pattern):
395        self.ignore_branch_patterns.append(pattern)
396
397    @staticmethod
398    def _interleave_list(prefix, list):
399        tuple_list = [(prefix, item) for item in list]
400        return [item for sublist in tuple_list for item in sublist]
401
402    @staticmethod
403    def _flatten_list(list):
404        return [a for b in list for a in b]
405
406    def collect_coverage(self, outdir, coverage_file, ztest_file, coveragelog):
407        excludes = Gcovr._interleave_list("-e", self.ignores)
408        if len(self.ignore_branch_patterns) > 0:
409            # Last pattern overrides previous values, so merge all patterns together
410            merged_regex = "|".join([f"({p})" for p in self.ignore_branch_patterns])
411            excludes += ["--exclude-branches-by-pattern", merged_regex]
412
413        # We want to remove tests/* and tests/ztest/test/* but save tests/ztest
414        cmd = ["gcovr", "-r", self.base_dir,
415               "--gcov-ignore-parse-errors=negative_hits.warn_once_per_file",
416               "--gcov-executable", self.gcov_tool,
417               "-e", "tests/*"]
418        cmd += excludes + self.options + ["--json", "-o", coverage_file, outdir]
419        cmd_str = " ".join(cmd)
420        logger.debug(f"Running: {cmd_str}")
421        coveragelog.write(f"Running: {cmd_str}\n")
422        coveragelog.flush()
423        ret = subprocess.call(cmd, stdout=coveragelog, stderr=coveragelog)
424        if ret:
425            logger.error(f"GCOVR failed with {ret}")
426            return ret, []
427
428        cmd = ["gcovr", "-r", self.base_dir] + self.options
429        cmd += ["--gcov-executable", self.gcov_tool,
430                "-f", "tests/ztest", "-e", "tests/ztest/test/*",
431                "--json", "-o", ztest_file, outdir]
432        cmd_str = " ".join(cmd)
433        logger.debug(f"Running: {cmd_str}")
434        coveragelog.write(f"Running: {cmd_str}\n")
435        coveragelog.flush()
436        ret = subprocess.call(cmd, stdout=coveragelog, stderr=coveragelog)
437        if ret:
438            logger.error(f"GCOVR ztest stage failed with {ret}")
439            return ret, []
440
441        return ret, [file_ for file_ in [coverage_file, ztest_file]
442                     if os.path.exists(file_) and os.path.getsize(file_) > 0]
443
444
445    def _generate(self, outdir, coveragelog, build_dirs=None):
446        coverage_file = os.path.join(outdir, "coverage.json")
447        coverage_summary = os.path.join(outdir, "coverage_summary.json")
448        ztest_file = os.path.join(outdir, "ztest.json")
449
450        ret = 0
451        cmd_ = []
452        files = []
453        if build_dirs:
454            for dir_ in build_dirs:
455                files_ = [fname for fname in
456                            [os.path.join(dir_, "coverage.json"),
457                             os.path.join(dir_, "ztest.json")]
458                          if os.path.exists(fname)]
459                if not files_:
460                    logger.debug(f"Coverage merge no files in: {dir_}")
461                    continue
462                files += files_
463            logger.debug(f"Coverage merge {len(files)} reports in {outdir}")
464            ztest_file = None
465            cmd_ = ["--json-pretty", "--json", coverage_file]
466        else:
467            ret, files = self.collect_coverage(outdir, coverage_file, ztest_file, coveragelog)
468            logger.debug(f"Coverage collected {len(files)} reports from: {outdir}")
469
470        if not files:
471            logger.warning(f"No coverage files to compose report for {outdir}")
472            return ret, {}
473
474        subdir = os.path.join(outdir, "coverage")
475        os.makedirs(subdir, exist_ok=True)
476
477        tracefiles = self._interleave_list("--add-tracefile", files)
478
479        # Convert command line argument (comma-separated list) to gcovr flags
480        report_options = {
481            "html": ["--html", os.path.join(subdir, "index.html"), "--html-details"],
482            "xml": ["--xml", os.path.join(subdir, "coverage.xml"), "--xml-pretty"],
483            "csv": ["--csv", os.path.join(subdir, "coverage.csv")],
484            "txt": ["--txt", os.path.join(subdir, "coverage.txt")],
485            "coveralls": ["--coveralls", os.path.join(subdir, "coverage.coveralls.json"),
486                          "--coveralls-pretty"],
487            "sonarqube": ["--sonarqube", os.path.join(subdir, "coverage.sonarqube.xml")]
488        }
489        gcovr_options = self._flatten_list(
490            [report_options[r] for r in self.output_formats.split(',')]
491        )
492
493        cmd = ["gcovr", "-r", self.base_dir] + self.options + gcovr_options + tracefiles
494        cmd += cmd_
495        cmd += ["--json-summary-pretty", "--json-summary", coverage_summary]
496        cmd_str = " ".join(cmd)
497        logger.debug(f"Running: {cmd_str}")
498        coveragelog.write(f"Running: {cmd_str}\n")
499        coveragelog.flush()
500        ret = subprocess.call(cmd, stdout=coveragelog, stderr=coveragelog)
501        if ret:
502            logger.error(f"GCOVR merge report stage failed with {ret}")
503
504        return ret, { 'report': coverage_file, 'ztest': ztest_file, 'summary': coverage_summary }
505
506
507def choose_gcov_tool(options, is_system_gcov):
508    gcov_tool = None
509    if not options.gcov_tool:
510        zephyr_sdk_gcov_tool = os.path.join(
511            os.environ.get("ZEPHYR_SDK_INSTALL_DIR", default=""),
512            "x86_64-zephyr-elf/bin/x86_64-zephyr-elf-gcov")
513        if os.environ.get("ZEPHYR_TOOLCHAIN_VARIANT") == "llvm":
514            llvm_path = os.environ.get("LLVM_TOOLCHAIN_PATH")
515            if llvm_path is not None:
516                llvm_path = os.path.join(llvm_path, "bin")
517            llvm_cov = shutil.which("llvm-cov", path=llvm_path)
518            llvm_cov_ext = pathlib.Path(llvm_cov).suffix
519            gcov_lnk = os.path.join(options.outdir, f"gcov{llvm_cov_ext}")
520            try:
521                os.symlink(llvm_cov, gcov_lnk)
522            except OSError:
523                shutil.copy(llvm_cov, gcov_lnk)
524            gcov_tool = gcov_lnk
525        elif is_system_gcov:
526            gcov_tool = "gcov"
527        elif os.path.exists(zephyr_sdk_gcov_tool):
528            gcov_tool = zephyr_sdk_gcov_tool
529        else:
530            logger.error(
531                "Can't find a suitable gcov tool. Use --gcov-tool or set ZEPHYR_SDK_INSTALL_DIR."
532            )
533            sys.exit(1)
534    else:
535        gcov_tool = str(options.gcov_tool)
536
537    return gcov_tool
538
539
540def run_coverage_tool(options, outdir, is_system_gcov, instances,
541                      coverage_capture, coverage_report):
542    coverage_tool = CoverageTool.factory(options.coverage_tool, jobs=options.jobs)
543    if not coverage_tool:
544        return False, {}
545
546    coverage_tool.gcov_tool = str(choose_gcov_tool(options, is_system_gcov))
547    logger.debug(f"Using gcov tool: {coverage_tool.gcov_tool}")
548
549    coverage_tool.instances = instances
550    coverage_tool.coverage_per_instance = options.coverage_per_instance
551    coverage_tool.coverage_capture = coverage_capture
552    coverage_tool.coverage_report = coverage_report
553    coverage_tool.base_dir = os.path.abspath(options.coverage_basedir)
554    # Apply output format default
555    if options.coverage_formats is not None:
556        coverage_tool.output_formats = options.coverage_formats
557    coverage_tool.add_ignore_file('generated')
558    coverage_tool.add_ignore_directory('tests')
559    coverage_tool.add_ignore_directory('samples')
560    # Ignore branch coverage on LOG_* and LOG_HEXDUMP_* macros
561    # Branch misses are due to the implementation of Z_LOG2 and cannot be avoided
562    coverage_tool.add_ignore_branch_pattern(r"^\s*LOG_(?:HEXDUMP_)?(?:DBG|INF|WRN|ERR)\(.*")
563    # Ignore branch coverage on __ASSERT* macros
564    # Covering the failing case is not desirable as it will immediately terminate the test.
565    coverage_tool.add_ignore_branch_pattern(r"^\s*__ASSERT(?:_EVAL|_NO_MSG|_POST_ACTION)?\(.*")
566    return coverage_tool.generate(outdir)
567
568
569def has_system_gcov(platform):
570    return platform and (platform.type in {"native", "unit"})
571
572
573def run_coverage(options, testplan):
574    """ Summary code coverage over the full test plan's scope.
575    """
576    is_system_gcov = False
577
578    for plat in options.coverage_platform:
579        if has_system_gcov(testplan.get_platform(plat)):
580            is_system_gcov = True
581            break
582
583    return run_coverage_tool(options, options.outdir, is_system_gcov,
584                             instances=testplan.instances,
585                             coverage_capture=False,
586                             coverage_report=True)
587
588
589def run_coverage_instance(options, instance):
590    """ Per-instance code coverage called by ProjectBuilder ('coverage' operation).
591    """
592    is_system_gcov = has_system_gcov(instance.platform)
593    return run_coverage_tool(options, instance.build_dir, is_system_gcov,
594                             instances={instance.name: instance},
595                             coverage_capture=True,
596                             coverage_report=options.coverage_per_instance)
597